RabbitMQ 相关概念和方法详解
名词解释
ConnectionFactory: 与RabbitMQ服务器连接的管理器。Connection: 与RabbitMQ服务器的连接。Channel: 与Exchange的连接。Exchange: 接收生产者的消息,并根据消息的RoutingKey和Exchange绑定的BindingKey分配消息。Queue: 存储消费者的消息。RoutingKey: 指定当前消息被谁接收。BindingKey: 指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中。
exchange_declare 方法详解
def exchange_declare(self, callback=None, exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None):
callback如果nowait=True且Exchange.DeclareOk时,调用这个回调方法。exchange交换器名称。exchange_type交换器类型,常见的如fanoutdirecttopic。passive如果为true, 则执行声明或者检查交换器是否存在。durable设置是否持久化。durable设置为true表示持久化,反之非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失消息。auto_delete设置是否自动删除。auto_delete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为:"当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器"。internal设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。nowait如果设置为false, 则不期望RabbitMQ服务器有一个Exchange.DeclareOk这样响应。arguments其他一些结构化参数。比如:alternate-exchange。
exchange_bind 方法详解
def exchange_bind(self, callback=None, destination=None, source=None, routing_key='', nowait=False, arguments=None):
callback如果nowait=True且Exchange.DeclareOk时,调用这个回调方法。destinaction一种交换器。source消息从source交换器转发到destination交换器,从某种程度上看destination交换器可以是一种队列。routing_key用来绑定队列和交换器的路由键。nowait如果设置为false, 则不期望RabbitMQ服务器有一个Exchange.DeclareOk这样响应。arguments其他一些结构化参数。
queue_declare 方法详解
def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None):
queue队列名称。passive如果为true, 则执行声明或者检查队列是否存在。durable设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关的信息。exclusive设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排他队列,该队列对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:- 排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一个连接创建的排他队列。
- "首次" 是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列,这个与普通队列不同。
- 即使该队列是持久化的,一旦连接关闭或者客户端退出,该队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
auto_delete设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误的理解为:"当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。arguments设置队列的其他一些参数,如 x-message-ttl、x-expires、x-max-length。
queue_bind 方法详解
def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
queue队列名称。exchange交换器的名称。routing_key用来绑定队列和交换器的路由键。arguments定义绑定的一些参数。
basic_publish 方法详解
def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False, immediate=False):
exchange交换器的名称,指明消息需要发送到哪个交换器。如果设置为空字符串,则消息会被发生到RabbitMQ默认的交换器中。routing_key路由键,交换器根据路由键将消息存储到相应的队列之中。properties消息的基本属性集,其包含 14 个属性成员,分别有contentType、deliveryMode、proiotity等。body消息体payload,真正需要发送的消息。mandatory当参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令消息返回给生产者。当参数设置为false时,出现上述情况,则消息直接丢失。immediate当参数设置为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
basic_consume 方法详解
def basic_consume(self, consumer_callback, queue='', no_ack=False, exclusive=False, consumer_tag=None, arguments=None):
consumer_callback设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如:DefaultConsumer,使用时需要客户端重写其中的方法。queue队列的名称。no_ack设置是否自动确认。建议设置成false,即不自动确认。exclusive设置是否排他。consumer_tag消费者标签,用来区分多个消费者。arguments设置消费者的其他参数。
exchange_type 模式
fanout 模式
- 任何发送到
fanout exchange的消息都会被转发到与exchange绑定的所有的queue上。 - 不需要指定
routing_key, 即使指定了也是无效的。 - 需要提前将
exchange与queue进行绑定, 一个exchange可以绑定到多个queue, 一个queue也可以同多个exchange进行绑定。 - 接收到消息的
exchange没有与任何queue绑定, 则消息就会被抛弃。
direct 模式
- 发送到
direct exchange的消息都会被转发到routing_key中指定的queue。 - 不需要将
exchange进行任何绑定操作。当然也可以进行绑定操作,可以将不同的routing key与不同的queue进行绑定,不同的queue与不同exchange进行绑定。 - 消息需要传递一个
routing_key。 - 如果消息中不存在
routing_key中绑定的队列名,则该消息就会被抛弃。 - 如果一个
exchange声明为direct,并且绑定中指定了routing_key,那么发生消息时。需要同时指明该exchange和routing_key。
topic 模式
- 这种模式比较复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个
routing_key,exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。 - 需要
routing_key要提前绑定exchange与queue。 - 在进行绑定的时候,要提供一个该队列关心的主题,如:"#.log.#" 表示该队列关心所涉及 log 的消息(一个 routing_key 为 "MQ.log.error" 的消息会被转发到该队列)。
- "#" 表示 0 个或若干个关键字,"" 表示一个关键字。如 "log." 能与 "log.warn.timeout" 匹配,但是 "log.#" 能与上述匹配。
- 如果
exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。
更多精彩

