RabbitMQ 入门汇总
目录
一、RabbitMQ 是什么?
RabbitMQ是一个用Erlang语言开发的高级消息队列协议(AMQP: Advanved Message Queue)的开源实现,消息队列是“消费者-生产者模型”的典型代表,“生产者”可以往消息队列中不断写入消息(Message),而“生产者”可以从消息队列中读取或者订阅消息。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。二 RabbitMQ 是要解决什么问题?
RabbitMQ 有以下几个应用场景:
1. 异步处理
在项目中,将一些无需即时返回且耗时的操作提取出来,进行异步操作,而这种异步处理的方式大大的节省了服务器的请求时间,从而提高了系统的吞吐量。而且不影响服务器做其他响应,不独占服务器资源,充分利用了服务器的处理能力。
2. 应用解耦
例如电商系统中的订单处理。
传统处理模式是:下订单的时候,订单系统可能会调用库存系统的接口,可以看出这样订单系统和库存系统高耦合,如果库存系统出现问题,那么整个订单处理流程都会受到影响,甚至导致订单失败,造成直接的收益损失。
当引入消息队列,用户下完订单,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。然后库存系统订阅消息队列,依自身能力读取订单信息,完成订单处理操作。这样当库存系统出现问题,也可以在库存系统修复好后,继续从消息队列里订阅订单信息,不会导致消息丢失。
3. 流量削峰
例如各种秒杀活动以及抢票活动。
引入消息队列后,可以控制活动人数,超过一定数值的订单直接丢弃掉。另外,可以将这些订单信息写入消息队列中,应用可以按自身的最大能力处理订单,缓解了高峰流量对应用处理能力的挑战。
三、RabbitMQ 的基本功能和原理是什么?
简介
RabbitMQ是一个消息代理:它接受和转发消息。你可以将其视为邮局:当你将把发布的邮件放在邮箱中时,你可以确定邮件先生或邮件女士最终会将邮件发送给您的收件人。在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。
RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是处理(接受,存储和转发)二进制数据也就是消息(Message)。
下面用一些术语来描述以下RabbitMQ 和 消息 都是些什么:
队列
是RabbitMQ
中的邮箱。尽管消息流经了RabbitMQ
和我们的应用,但是消息只能存储在队列
中。队列
仅仅受到主机内存和磁盘容量的限制,它本质上是一个很大的消息缓冲器。多个生产者可以发送消息到一个队列里,多个消费者可以尝试从一个队列里接收数据。我们可以这样表示队列消费者
和接收者有着类似的意义。消费者通常是一个程序,主要等待接收消息。
值得注意的是,消费者和生产者不一定要在同一主机上,实际上大多数的应用中都不在同一主机。另外统一个应用程序可以同时作为生产者和消费者。
简单的小例子
我们用一个简单的例子来表示下他们之间的关系:一个生产者发送消息,一个消费者接收并打印消息,这个例子中的消息是"hello",中间就是队列(也就是消息缓冲器)
生产者发送 "hello" 消息到队列中,消费者从队列中接收 "hello" 消息
工作队列(Work Quenes)
Work Quenes 是被设计用来分发耗时的任务给多个Worker,避免即时进行资源密集型的任务,图示如下:
循环调度(Round-robin dispatching)
使用任务队列or工作队列的一个优势是并行工作能力。如果我们有大量的积压工作,我们可以添加更多 Worker,容易拓展。
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者,平均每个消费者将获得相同数量的消息,这种分发消息的方式称为循环法。
消息确认(Message acknowledgment)
执行任务可能需要一些时间。你可能想知道,如果其中一个消费者开始了一项漫长的任务,但是只部分完成了任务的情况下会发生什么。在没有使用消息确认时,RabbitMQ 将消息传递给消费者,它立即将其标记为删除。在这种情况下,如果你杀死一个Worker,我们不仅会丢失在处理的消息,我们还将丢失所有已发送到此Worker但尚未处理的消息。
但我们不想丢失任何信息,如果一个Worker die 掉了,我们希望把任务交给另一个 Worker。为了确保消息永远不会丢失,RabbitMQ 支持消息确认(Message acknowledgment)。消费者会发送一个 ack ((acknowledgment)), 告诉 RabbitMQ 已收到、处理了特定消息, 并且 RabbitMQ 可以随意删除它。
如果使用者在不发送 ack 的情况下死亡 (其通道已关闭、连接关闭或 TCP 连接丢失), RabbitMQ 将了解消息未被完全处理, 并将重新排队。如果同时有其他消费者在等待, 那么它将迅速将其重新传递给另一个消费者。这样, 即使Worker偶尔会die掉, 你也可以确保不会丢失任何信息。
使用消息确认时,没有任何消息会超时,当消费者死亡时,RabbitMQ 将重新传递消息。即使处理消息需要非常非常长的时间, 也可以。
消息持久性(Message durability)
通过消息确认,我们可以确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止, 我们的任务仍将丢失。有种方案可以保证消息不会丢失:保证队列和里面的消息都是持久。
首先我们要确保我们的队列不会丢失,所以我们要声明这个队列是持久的,在 Python 中我们可以这么做:
channel.queue_declare(queue='hello', durable=True)
紧接着我们要使消息是持久的,在 Python 中我们可以将delivery_mode
设为2
即可:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
消息持久化注意事项:
消息持久化并不能确切保证消息不会被丢失,尽管它告诉 RabbitMQ 要把它存到磁盘,但在 RabbitMQ 收到消息之前仍然有一小段时间,这样会导致没有进行存储操作。另外,消息可能是被写到缓存里,而非磁盘里。但是,总归是比没有持久化操作好,对简单应用来说是足够了。如果需要更强的保证,可以参阅发布确认
进行持久化后,甚至是 RabbitMQ 重启,任务也不会丢失。
公平调度(Fair dispatch)
原来的调度方法仍然不能完全按照我们的要求工作。例如,在有两个Worker的情况下,当所有的奇数序号的信息都很繁重,偶数序号的信息都很轻松的时候,一个Worker就会不断地忙碌,另一个Worker几乎不会做任何工作。然后RabbitMQ 对此一无所知,仍然会均匀地调度消息。
发生这种情况的原因是,RabbitMQ 只是在消息进入队列时调度这条消息。它不查看消费者未确认的消息的数量。它只是盲目地将每一条信息发送给第 n 个消费者。
为了避免这种情况,在 Python 中我们可以使用 basic.qos
方法,并设置prefetch_count=1
,例如:
channel.basic_qos(prefetch_count=1)
这告诉 RabbitMQ 不要一次向Worker发出多条消息。或者,换句话说,在Worker处理并确认了前一条消息之前, 不要向其发送新消息。相反, 它将把它调度到下一个还不忙的Worker。
四、有什么同类技术,之间的优缺点如何?
现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,下面对选择 RabbitMQ 进行说明:
- 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
- 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
- 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,性能好;
- 集群部署简单,Erlang使得RabbitMQ集群部署变得简单;
- 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;
五、有哪些好的tutorial
Reference
