1.  添加依赖

  pom.xml如下:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第1张
 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-srvutil</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.23.1-GA</version> </dependency> <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> <version>0.3.0-alpha</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.1</version> </dependency>
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第2张

2. Producer 的开发步骤

  1. 实例化Producer Group,如下:

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
  DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");

  2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:

producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

  3. 调用start()方法启动:

 producer.start();

  4. 发送消息

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第3张
 for (int i = 0; i < 10; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); }
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第4张

  5. 关闭生产者(根据自己需求确定是够需要关闭)

 producer.shutdown();

  完整示例如下:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第5张
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 创建一个消费者 */ public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //1. 实例化一个producer group DefaultMQProducer producer = new DefaultMQProducer("my-producer-group"); //2. 设置namesrvAddr,集群环境多个nameserver用;分割  producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //3. 启动  producer.start(); // 4. 发送消息 for (int i = 0; i < 10; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); } //关闭生产者  producer.shutdown(); } }
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第6张

  使用方式可以说非常简单了。

3. Consumer开发步骤

  1. 实例化Consumer Group,如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");

  2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:

producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

  3. 设置从什么位置开始都

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

  4. 订阅topic.

consumer.subscribe("MyQuickStartTopic", "*");

  5. 注册消息监听器

 consumer.registerMessageListener();

  6. 重写MessageListenerConcurrently接口的consumeMessage()方法

  完整代码如下:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第7张
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 创建一个消费者 */ public class Consumer { public static void main(String[] args) throws MQClientException { //实例化一个consumer组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group"); //设置setNamesrvAddr,同生产者 consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //设置消息读取方式,这里设置的是队尾开始读取  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //设置订阅主题,第二个参数为过滤tabs的条件,可以写为tabA|tabB过滤Tab,*表示接受所有 consumer.subscribe("MyQuickStartTopic", "*"); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { //的到MessageExt MessageExt messageExt = list.get(0); String topic = messageExt.getTopic(); String message = new String(messageExt.getBody(),"UTF-8"); int queueId = messageExt.getQueueId(); System.out.println("收到来自topic:" + topic + ", queueId:" + queueId + "的消息:" + message); } catch (Exception e) { //失败,请求稍后重发 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第8张

  consumeMessage返回一个枚举的两种状态,成功表示接受成功,否则返回稍后重发的状态。这里注意,启动的时候需要consumer先启动,因为它需要在生产者之前先订阅,否则将会收不到生产在consumer生产的消息,造成消息丢失。

  启动consumer,在启动producer

  producer控制台RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第9张

  consumer控制台:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第10张

  rocketmq-console信息:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第11张

可以看到,我们前面部署的集群环境也是能够实现消息的负载均衡的,会使两个broker上都创建topic,且都能够接收生产者生产的消息。

  进入topic,可以看到新增了两个我们自定义的topic

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第12张

可能会出现的问题:

  RemotingTooMuchRequestException: sendDefaultImpl call timeout

  在客户端运行Producer时,可能会出现如上异常,这是因为从 Windows 上开发连接 虚拟机中的 nameServer 时要经过 Linux 系统的防火墙,而防火墙一般都会有超时的机制,在网络连接长时间不传输数据时,会关闭这个 TCP 的会话,关闭后再读写,就有可能导致这个异常。

  解决办法就是关闭防火墙,ubuntu下命令如下:

  RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第13张

  contOS下命令如下:

  systemctl stop firewalld.service #停止firewall
  systemctl disable firewalld.service #禁止firewall开机启动
  firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)

原文 RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer

,

1.  添加依赖

  pom.xml如下:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第14张
 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-srvutil</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.23.1-GA</version> </dependency> <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> <version>0.3.0-alpha</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.1</version> </dependency>
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第15张

2. Producer 的开发步骤

  1. 实例化Producer Group,如下:

  DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");

  2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:

producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

  3. 调用start()方法启动:

 producer.start();

  4. 发送消息

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第16张
 for (int i = 0; i < 10; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); }
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第17张

  5. 关闭生产者(根据自己需求确定是够需要关闭)

 producer.shutdown();

  完整示例如下:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第18张
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 创建一个消费者 */ public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //1. 实例化一个producer group DefaultMQProducer producer = new DefaultMQProducer("my-producer-group"); //2. 设置namesrvAddr,集群环境多个nameserver用;分割  producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //3. 启动  producer.start(); // 4. 发送消息 for (int i = 0; i < 10; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); } //关闭生产者  producer.shutdown(); } }
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第19张

  使用方式可以说非常简单了。

3. Consumer开发步骤

  1. 实例化Consumer Group,如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");

  2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:

producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

  3. 设置从什么位置开始都

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

  4. 订阅topic.

consumer.subscribe("MyQuickStartTopic", "*");

  5. 注册消息监听器

 consumer.registerMessageListener();

  6. 重写MessageListenerConcurrently接口的consumeMessage()方法

  完整代码如下:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第20张
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 创建一个消费者 */ public class Consumer { public static void main(String[] args) throws MQClientException { //实例化一个consumer组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group"); //设置setNamesrvAddr,同生产者 consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //设置消息读取方式,这里设置的是队尾开始读取  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //设置订阅主题,第二个参数为过滤tabs的条件,可以写为tabA|tabB过滤Tab,*表示接受所有 consumer.subscribe("MyQuickStartTopic", "*"); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { //的到MessageExt MessageExt messageExt = list.get(0); String topic = messageExt.getTopic(); String message = new String(messageExt.getBody(),"UTF-8"); int queueId = messageExt.getQueueId(); System.out.println("收到来自topic:" + topic + ", queueId:" + queueId + "的消息:" + message); } catch (Exception e) { //失败,请求稍后重发 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第21张

  consumeMessage返回一个枚举的两种状态,成功表示接受成功,否则返回稍后重发的状态。这里注意,启动的时候需要consumer先启动,因为它需要在生产者之前先订阅,否则将会收不到生产在consumer生产的消息,造成消息丢失。

  启动consumer,在启动producer

  producer控制台RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第22张

  consumer控制台:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第23张

  rocketmq-console信息:

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第24张

可以看到,我们前面部署的集群环境也是能够实现消息的负载均衡的,会使两个broker上都创建topic,且都能够接收生产者生产的消息。

  进入topic,可以看到新增了两个我们自定义的topic

RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第25张

可能会出现的问题:

  RemotingTooMuchRequestException: sendDefaultImpl call timeout

  在客户端运行Producer时,可能会出现如上异常,这是因为从 Windows 上开发连接 虚拟机中的 nameServer 时要经过 Linux 系统的防火墙,而防火墙一般都会有超时的机制,在网络连接长时间不传输数据时,会关闭这个 TCP 的会话,关闭后再读写,就有可能导致这个异常。

  解决办法就是关闭防火墙,ubuntu下命令如下:

  RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer,RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer 随笔 第26张

  contOS下命令如下:

  systemctl stop firewalld.service #停止firewall
  systemctl disable firewalld.service #禁止firewall开机启动
  firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄