消息队列 MQ 入门理解
功能特性:






- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证消息的每一次失败重投等逻辑都能路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

- 顺序消息暂不支持广播消费模式。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,MQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,第一次启动时默认从最新消息消费,客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度,所以 MQ 控制台不支持消息堆积查询和堆积报警功能。

- 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
- 消费进度在服务端维护,可靠性高于广播模式。
- 一个云账户所能创建的 Consumer ID 数量是有限制的,具体可以咨询售后技术支持。
- 对于一个 Consumer ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Consumer ID1 部署了三个消费者实例 C1,C2,C3,那么这三个实例将共同分担服务器发送给 Consumer ID1 的消息。同时,实例之间订阅关系必须保持一致。

Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
消费方式-1
消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
消费方式-2
消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
消费方式-3
消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
消费方式-4(错误示例)
同一个消费者多次订阅某 Topic 下的不同 Tag,后者会覆盖前者:
//如下错误代码中,consumer 只能接收到 MQ_TOPIC 下 TagB 的消息,而不能接收 TagA 的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; }
订阅关系一致:
MQ 里的一个 Consumer ID 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer ID 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer ID 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer ID 下所有的实例需在以下两方面均保持一致:
- 订阅的 Topic 必须一致;
- 订阅的 Topic 中的 Tag 必须一致。


Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer 实例1-2:
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_B ", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
【例二】以下例子中,同一个 Consumer ID 下订阅 Topic 的 Tag 不一致。Consumer 实例2-1 订阅了 TagA,而 Consumer 实例2-2 未指定 Tag。 Consumer 实例2-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer 实例2-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A ", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; }
【例三】此例中,错误的原因有俩个:
- 同一个 Consumer ID 下订阅 Topic 个数不一致。
- 同一个 Consumer ID 下订阅 Topic 的 Tag 不一致。
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("jodie_test_B", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer 实例3-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A ", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });

更多精彩