RabbitMQ-Direct模式
简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息中间件,基于Erlang语言编写。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
P:(producling)生产者,生产只意味着发送消息。
Q: (queue_name)队列,队列是位于rabbitmq中的post box的名称
C: (Consuming)消费者,消费者主要是等待接收消息的程序
开发准备
- netCoreTset.core:该工程主要封装了RabbitMQ的公用方法
- RabbitMQClient :该工程为生产者
- RabbitMQServer :该工程为消费者
1.创建netCoreTset.core类库项目
1.1 安装项目依赖
2.定义接口
using netCoreTest.core.Model; using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.Iserver { public interface IConnectionServer { /// <summary> /// 连接服务 /// </summary> void Connection(); /// <summary> /// 创建消息队列 /// </summary> /// <param name="queName">队列名称</param> void CreateQueueDir(); /// <summary> /// 关闭连接 /// </summary> void CloseConnection(); /// <summary> /// 关闭通道 /// </summary> void CloseChannel(); } }
using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.Iserver { public interface IMessageService { /// <summary> /// 发送消息 /// </summary> /// <param name="msg">消息内容</param> void SendMsg(string msg); /// <summary> /// 获取消息 /// </summary> /// <returns></returns> string GetMsg(); } }
using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.Iserver { public interface IRabbitMqService:IMessageService,IConnectionServer { } }
3.编写RabbitMQ辅助类
using netCoreTest.core.Iserver; using netCoreTest.core.Model; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core { public class RabbitMQModel : IRabbitMqService { private readonly ConnectionFactory factory = null; private IModel channel; private IConnection connetction; readonly string exchangeName;//交换机名称 readonly string routeKey;//路由名称 readonly string queueName;///队列名称 public RabbitMQModel(HostModel model) { /// <summary> /// 创建连接工厂 /// </summary> factory = new ConnectionFactory { UserName = model.UserName, Password = model.PassWord, HostName = "localhost", Port = model.Port, }; exchangeName = model.ExChangeModel.ExChangeName; routeKey = model.ExChangeModel.RouteKey; queueName = model.ExChangeModel.QueueName; } /// <summary> /// 创建连接 /// </summary> public void Connection() { try { //创建连接 connetction = factory.CreateConnection(); //创建信道 channel = connetction.CreateModel(); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } public void CreateQueueDir() { //定义一个direct类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定义一个队列 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定交换机 channel.QueueBind(queueName, exchangeName, routeKey, null); }public void SendMsg(string msg) { var sendBytes = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } public void CloseChannel() { channel.Close(); } public void CloseConnection() { connetction.Close(); } public string GetMsg() { //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); string msg = null; //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); msg = message; Console.WriteLine($"收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); }; //启动消费者 设置为手动应答消息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); CloseConnection(); CloseChannel(); return msg; } } }
4.创建direct模式发送类
using netCoreTest.core.Model; using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.ExchangeTypeModel { /// <summary> /// Direct模式发送 /// </summary> public class DirectPost { RabbitMQModel rabbitMQModel; public DirectPost() { HostModel hostModel = new HostModel(); hostModel.UserName = "admin"; hostModel.PassWord = "admin"; hostModel.Host = "127.0.0.1"; hostModel.Port = 5672; hostModel.ExChangeModel =new ExChangeModel { ExChangeName = "ClentName", QueueName = "Clent", RouteKey = "ClentRoute" }; rabbitMQModel = new RabbitMQModel(hostModel); rabbitMQModel.Connection(); } public void CreateQueue() { rabbitMQModel.CreateQueueDir(); } public void SendMsg(string msg) { rabbitMQModel.SendMsg(msg); } public void GetMsg() { rabbitMQModel.GetMsg(); } } }
5.创建RabbitMQClient控制台应用程序
using netCoreTest.core; using netCoreTest.core.ExchangeTypeModel; using netCoreTest.core.Model; using RabbitMQ.Client; using System; namespace RabbitMQClient { class Program { static void Main(string[] args) { Console.WriteLine("消息生产者开始生产数据!"); Console.WriteLine("输入exit退出!"); DirectPost directPost = new DirectPost(); directPost.CreateQueue(); string input; do { input = Console.ReadLine(); directPost.SendMsg(input); } while (input.Trim().ToLower() != "exit"); } } }
6.创建RabbitMQService控制台应用程序
using netCoreTest.core; using netCoreTest.core.ExchangeTypeModel; using netCoreTest.core.Model; using System; using System.Text; namespace RabbitMQServer { class Program { static void Main(string[] args) { Console.WriteLine("Hello World!"); DirectPost directPost = new DirectPost(); directPost.GetMsg(); } } }
7.执行RabbitMQclient和RabbitMQserver

更多精彩