RabbitMQ学习总结
一、RabbitMQ介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景:
1、任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
为什么使用RabbitMQ呢?
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
二、 RabbitMQ 中的概念模型
消息模型:
所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。最后将消息发送到监听的消费者。
RabbitMQ的基本概念
下图是RabbitMQ的基本结构:
组成部分说明如下:
- Broker :消息队列服务进程,此进程包括两个部分:Exchange和Queue。
- Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方,它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
- Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Connection:网络连接,比如一个TCP连接。
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Virtual Host理解如下图:
相关名词:
包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
三、 下载安装
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open
Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需
要安装Erlang/OTP,并保持版本匹配,如下图:
RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
1)下载erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe,以管理员方式运行此文件,安装。
erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添
加%ERLANG_HOME%\bin;
2)安装RabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3,以管理员方式运行此文件,安装
3)启动
- 安装成功后会自动创建RabbitMQ服务并且启动。
从开始菜单启动RabbitMQ,完成在开始菜单找到RabbitMQ的菜单:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动
2.如果没有开始菜单则进入安装目录下sbin目录手动启动:
1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
3) 注意事项:
1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang
搜索RabbitMQ、ErlSrv,将对应的项全部删除。
四、java操作队列
1、消息队列RabbitMQ的五种形式队列
1).点对点(简单)的队列
2).工作(公平性)队列模式
3.发布订阅模式
4.路由模式Routing
5.通配符模式Topics
2、简单队列
1)功能:一个生产者P发送消息到队列Q,一个消费者C接收
P表示为生产者 、C表示为消费者 红色表示队列。
点对点模式分析:
Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
封装Connection:
/**
* 封装Connection
*/
public class MQConnectionUtils {
public static Connection getConnection(){ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务器地址 factory.setHost("localhost"); //设置端口号 factory.setPort(5672); //设置用户名 factory.setUsername("guest"); //设置密码 factory.setPassword("guest"); //设置vhost factory.setVirtualHost("/admin_yehui"); try { //创建连接 return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
生产者:
public class Producer {
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); String msg = "test_yehui_rabbitmq"; /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送体:"+msg); channel.close(); connection.close(); } }
消费者:
public class Consumer01 {
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明队列 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否删除次队列 * 参数5:队列参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定义消费方法 DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,true,consumer); } }
3、消息队列RabbitMQ应答模式
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。 如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。 没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。 消息应答是默认打开的。
我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
4、工作队列
work queues与简单队列相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
P表示为生产者 、C表示为消费者 红色表示队列。
工作队列分析
均摊消费
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
测试:
1、使用简单队列,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ的公平转发
目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工作量很小,造成的原因就是近当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。 为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。 换句话说,只有在消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完毕并自己对刚刚处理的消息进行确认之后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去清闲。 通过 设置channel.basicQos(1);
生产者
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接
Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建
/** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
for(int i=0;i<10;i++){ String msg = "test_yehui_rabbitmq"+i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } System.out.println("消息发送完毕"); channel.close(); connection.close(); } }
消费者1:
public class Consumer01 { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接
Connection connection = MQConnectionUtils.getConnection(); //创建一个通道
Channel channel = connection.createChannel(); //声明队列
/** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否删除次队列 * 参数5:队列参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 //定义消费方法
DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交换机
String exchange = envelope.getExchange(); //路由key
String routingKey = envelope.getRoutingKey(); //消息id
long deliveryTag = envelope.getDeliveryTag(); //消息内容
String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); try { //睡眠1s
Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列
/** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
消费者2
public class Consumer02 { //队列名称
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接
Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建
/** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
channel.queueDeclare(QUEUE_NAME, false,false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交换机
String exchange = envelope.getExchange(); //路由key
String routingKey = envelope.getRoutingKey(); //消息id
long deliveryTag = envelope.getDeliveryTag(); //消息内容
String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列
/** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
结果;
消费者1比消费者2消费得少
