RabbitQM(消息duilie)
前言:RabbitMQ 是实现 AMQP(Advanced Message Queuing Protocol 高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。一:相关概念
通常我们谈到消息队列服务时, 会有三个对象: 即消息发送者(producer生产者)、消息接受的队列(queue)、消息接受者(customer消费者)。三者的关系如下图
而RabbitMQ 在这些基本概念之上, 多做了一层抽象, 即在发消息者和 队列之间, 加入了交换器 (Exchange),关系如下图
由图可知RabbitQM的消息队列模型为:生产者把消息推送到交换器,然后交换器把消息分发到相应的消息队列(注意与图1的不同),最后消息队列中的消息才会被消费者消费。由此引出了交换机和交换机与消息队列的关系(绑定)。
交换机:什么是交换机
rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange(即交换机)是做消息分发,producer(生产者)甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchagne的4种type去定义的。
交换机如何分发消息到消息队列:
在RabbitQM模型中,它定义了4种类型的交换机,而且给消息添加了一个Rounting_Key属性,最后交换机绑定消息队列和Rounting_Key属性。这样交换机接受的消息,就可通过交换机的类型以及消息的Rounting_Key属性,来分发到符合要求的指定队列中。
备注:一个交换机可以绑定多个消息队列和Rounting_Key(根据交换机的类型)。
绑定:exchange和queue通过routing-key关联,这两者之间的关系是就是binding。
Spring Boot定义消息队列和交换机,并绑定示例
@Configuration public class XxtzRabbitConfig { final static String XXTZ = "xxtz"; @Bean public Queue queueXxtz() { //定义一个消息队列,并指定队列的名称 return new Queue(XxtzRabbitConfig.XXTZ); } @Bean DirectExchange xxtzExchange() { //定义一个交换机(注意是DirectExchange一种交换机的类型) return new DirectExchange("xxtzExchange"); } @Bean Binding bindingExchangeMessageXxtz(Queue queueXxtz, DirectExchange xxtzExchange) { //把消息队列queueXxtz 绑定到交换机xxtzExchange中,并指定一个 Routing_Key属性 return BindingBuilder.bind(queueXxtz).to(xxtzExchange).with("xxtz"); } }
Spring Boot 推送消息到交换机示例
@Component public class XxtzSend { @Autowired private AmqpTemplate rabbitTemplate; public void sendXxtz(int i) { String context = "那个谁: id = " + i; System.out.println("Sender : " + context); //推送消息context到交换机xxtzExchange 并指定消息的Rounting_Key属性xxtz this.rabbitTemplate.convertAndSend("xxtzExchange", "xxtz", context); } }
二:四种交换机类型
2.1 fanout exchange(Publish/Subscribe)
基于广播和订阅模式:该类型交换机只是将收到的所有消息广播到它知道的所有队列中,即它不会关心消息的Rounting_Key属性。
Spring Boot代码片段
@Bean public Queue CMessage() { return new Queue("fanout.A3"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //注意:绑定的时候并没Rounting_key属性 @Bean Binding bindingExchangeA(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }
2.2 Direct exchange(Rounting_Key精确匹配)
该类型的交换机,在分配消息到消息队列时,该消息的Rounting_key属性必须精确配置绑定时绑定到queue的Rounting_Key属性。该类型的支持的绑定如小图
代码示例在相关概念中有
2.3 Topic exchange(可支持Rounting_Key模糊匹配)
该类型的交换机在与queue绑定时Rounting_Key模糊匹配,即Rounting_key字符串中可含有星号'*'和井号‘#’并且该符号有特定意义。
- * (星号)可以替代一个单词。
- #(井号)可以替换零个或多个单词。
图解:
Q1 可分配到Rounting_Key为 A.orange.B C.orange.D ....中间单词是orange的所有信息
Q2 可分配到Rounting_Key为 A.B.rabbit lazy.A .....类比Q1
Spring Boot示例
@Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { //创建一个名为topic.message的消息队列 return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { //创建一个名为topic.messages的消息队列 return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { //创建一个TopicExchange类型的交换机 return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { //绑定消息模糊Rounting_Key 即开头为topic的Rounting_key的消息都会推送到队列queueMessages中 根据规则发送到queueMessage中的信息也会发送到这里 return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
Spring Boot 发送示例
@Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message all"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context); } public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); // this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
备注:Topic exchange是一个强大的交换器它可以表现的像fanout Topic 一样工作。
2.4 Header Exchange
忽略
三:持久化策略
由RabbitQM的模型分析,假设RabbitQM服务器因某种原因突然关闭(showdown)了,那我们之前建立的交换机对象和消息队列对象以及队列中的没有处理的消息数据是否还会纯在?因此我们需要考虑的持久化对象问题就是交换机和消息队列以及队列中的消息。
交换机持久化 :
消息队列持久化:
消息持久化 :
幸运的是spring boot已经默认给我们实现了该持久化策略,即以上三种持久化策略
四:分配策略
即队列中的消息如何分配给不同消费者,假设这样的场景,一个消息队列中有多条消息,rabbitQM服务器发现了一个消费者,它是一次性的把所有消息分配给该消费者呢?还是该消费者消费了一个消息(假设消费一个消息需要10秒钟)后在分配另一个消息呢?如果先发现一个消费者后又发现一个消费者怎么分配?如果一次发现多个消费者消息又如何分配呢?。。。。。
分配策略一:
分配策略二:
测试Spring Boot默认的分配策略?
场景一:一次性发现两(多)个消费者。
结论:消息队列中的消息平均分配给两个消费者,即一个消费者只消费偶数消息,另一个消费者只消费奇数消息
场景二: 一次性发现两(多)个消费者。在消费时其中一个消费者showdown了,怎么消费?
结论:
场景三:先发现一个消费者后又发现一个消费者,怎么消费?
结论:如果第一个消费者不死,第二个消费者不会消费到消息
小结: 真对Spring Boot的默认分配策略分析,rabbitqm服务器在分配队列中的消息时,会先查看监控该队列的消费者,然后制定分配策略。分配策略一旦制定如果消费者不出意外,就按这个策略执行。但如果消费者发生意外导致不能正确相应服务器(Message acknowledgment 策略)。先前分配给该消费者的消息会重新被rabbitqm服务器分配(即一种递归策略)
参考资料
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://www.cnblogs.com/ityouknow/p/6120544.html
https://www.zouyesheng.com/rabbitmq.html
https://blog.csdn.net/hellozpc/article/details/81436980
