C# 如何使用 RabbitMQ 实现消息收发

本文是基于http://www.cnblogs.com/cheng-lei/articles/7274513.html的项目结构进行搭建的,了解之前请先阅读http://www.cnblogs.com/cheng-lei/category/1047427.html中的前四篇文章。

工具 — Nuget包管理器 —程序包管理器控制台

PM> Install-Package RabbitMQ.Client -Version 5.1.0

PM> Install-Package EasyNetQ -Version 3.2.0

一、项目搭建

1. Weiz.MQ 项目,消息队列的通用处理类库,用于正在的订阅和发布消息。

  1、在BusBuilder.cs中添加了对CreateAdvancedBus函数的实现。

 1 public static IAdvancedBus CreateAdvancedBus()
 2         {
 3             // 消息服务器连接字符串
 4             string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
 5             if (connString == null || connString == string.Empty)
 6             {
 7                 throw new Exception("messageserver connection string is missing or empty");
 8             }
 9 
10             return RabbitHutch.CreateBus(connString).Advanced;
11         }
View Code

  2、在MQHelper.cs中添加了对Send、Receive函数的实现。

 1 public static void Send(MyMessage msg)
 2         {
 3             // 创建消息bus
 4             IBus bus = BusBuilder.CreateMessageBus();
 5 
 6             try
 7             {
 8                 bus.Send(msg.MessageRouter, msg);
 9             }
10             catch (EasyNetQException ex)
11             {
12                 //处理连接消息服务器异常 
13                 Console.WriteLine("Send Error!!!");
14             }
15 
16             bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
17         }
18 
19         public static void Receive(MyMessage msg, IProcessMessage ipro)
20         {
21             // 创建消息bus
22             IBus bus = BusBuilder.CreateMessageBus();
23 
24             try
25             {
26                 bus.Receive<MyMessage>(msg.MessageRouter, message => ipro.ProcessMsg(message));
27             }
28             catch (EasyNetQException ex)
29             {
30                 //处理连接消息服务器异常 
31                 Console.WriteLine("Receive Error!!!");
32             }
33         }
View Code

  3、在MQHelper.cs中添加了对采用Fanout、Direct、Topic交换机类型进行消息收发功能的实现。

 1 public static void ProducerFanoutMessage(MyMessage msg, string exchangeName = "chending.fanout")
 2         {
 3             var advancedBus = BusBuilder.CreateAdvancedBus();
 4 
 5             if (advancedBus.IsConnected)
 6             {
 7                 var exchange = advancedBus.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
 8 
 9                 advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
10             }
11             else
12             {
13                 Console.WriteLine("Can't connect");
14             }
15 
16         }
17 
18         public static void ConsumeFanoutMessage(string exchageName = "chending.fanout", string queueName = "chending.fanout.queue")
19         {
20             var advancedBus = BusBuilder.CreateAdvancedBus();
21             var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Fanout);
22 
23             var queue = advancedBus.QueueDeclare(queueName);
24             advancedBus.Bind(exchange, queue, queueName);
25             advancedBus.Consume(queue, registration =>
26             {
27                 registration.Add<MyMessage>((message, info) => { Console.WriteLine("Fanout Content: {0}", message.Body.MessageBody); });
28             });
29         }
30 
31         public static void ProducerDirectMessage(MyMessage msg, string queueName = "chending.direct.queue")
32         {
33             var advancedBus = BusBuilder.CreateAdvancedBus();
34 
35             if (advancedBus.IsConnected)
36             {
37                 var queue = advancedBus.QueueDeclare(queueName);
38 
39                 advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
40             }
41             else
42             {
43                 Console.WriteLine("Can't connect");
44             }
45 
46         }
47 
48         public static void ConsumeDirectMessage(string exchageName = "chending.direct", string queueName = "chending.direct.queue")
49         {
50             var advancedBus = BusBuilder.CreateAdvancedBus();
51             var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Direct);
52 
53             var queue = advancedBus.QueueDeclare(queueName);
54             advancedBus.Bind(exchange, queue, queueName);
55             advancedBus.Consume(queue, registration =>
56             {
57                 registration.Add<MyMessage>((message, info) => { Console.WriteLine("Direct Content: {0}", message.Body.MessageBody); });
58             });
59         }
60 
61         public static void ProducerTopicMessage(MyMessage msg)
62         {
63             //// 创建消息bus
64             IBus bus = BusBuilder.CreateMessageBus();
65 
66             try
67             {
68                 bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
69             }
70             catch (EasyNetQException ex)
71             {
72                 //处理连接消息服务器异常 
73             }
74 
75             bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
76         }
77 
78         public static void ConsumeTopicMessage(MyMessage msg)
79         {
80             //// 创建消息bus
81             IBus bus = BusBuilder.CreateMessageBus();
82 
83             try
84             {
85                 bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine("Topic Content: {0}", message.MessageBody), x => x.WithTopic(msg.MessageRouter));
86             }
87             catch (EasyNetQException ex)
88             {
89                 //处理连接消息服务器异常 
90             }
91         }
View Code

  4、在ProduceThread.cs中添加了消息发布线程对前面实现的功能进行测试(也可以不作为线程直接调用)。

 1 public class ProduceThread
 2     {
 3         public static void ProduceMessage() {
 4             MyMessage msg1 = new MyMessage();
 5             msg1.MessageID = "0-1";
 6             msg1.MessageBody = DateTime.Now.ToString();
 7             msg1.MessageRouter = "chending.fanout";
 8             msg1.MessageTitle = "0-1";
 9             MyMessage msg2 = new MyMessage();
10             msg2.MessageID = "0-2";
11             msg2.MessageBody = DateTime.Now.ToString();
12             msg2.MessageRouter = "chending.direct";
13             msg2.MessageTitle = "0-2";
14             MyMessage msg3 = new MyMessage();
15             msg3.MessageID = "0-3";
16             msg3.MessageBody = DateTime.Now.ToString();
17             msg3.MessageRouter = "chending.topic.a.b";
18             msg3.MessageTitle = "0-3";
19 
20             //MQHelper.Send(msg1);
21             MQHelper.ProducerFanoutMessage(msg1);
22             MQHelper.ProducerDirectMessage(msg2);
23             MQHelper.ProducerTopicMessage(msg3);
24 
25             for (int i = 0; i < 10; i++) {
26                 MyMessage msg = new MyMessage();
27                 msg.MessageID = (i+1).ToString();
28                 msg.MessageBody = DateTime.Now.ToString();
29                 if (i % 2 == 0)
30                     msg.MessageRouter = "cd.test.demo.a.b";
31                 else
32                     msg.MessageRouter = "cd.test.demo.a";
33                 msg.MessageTitle = (i+1).ToString();
34                 
35                 MQHelper.Publish(msg);
36                 //Console.WriteLine("Message{0} is published!!!", i + 1);
37                 Thread.Sleep(200);
38             }
39         }
View Code

2. Weiz.Producer(生成者)已弃用(改用ProduceThread.cs)

3. Weiz.Consumer 就是Consumer(消费者)

  1、修改OrderProcessMessage.cs,实现不同的消息处理方式。

 1  public class OrderProcessMessage : MQ.IProcessMessage
 2     {
 3         public void ProcessMsg(MQ.MyMessage msg)
 4         {
 5             Console.WriteLine("ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
 6         }
 7     }
 8     public class OrderProcessMessage1:MQ.IProcessMessage
 9     {
10         public void ProcessMsg(MQ.MyMessage msg)
11         {
12             Console.WriteLine("Process1 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
13         }
14     }
15 
16     public class OrderProcessMessage2 : MQ.IProcessMessage
17     {
18         public void ProcessMsg(MQ.MyMessage msg)
19         {
20             Console.WriteLine("Process2 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
21         }
22     }
View Code

  2、对Program.cs中的Main调用进行了修改。

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //OrderProcessMessage order = new OrderProcessMessage();
 6             OrderProcessMessage1 order1 = new OrderProcessMessage1();
 7             OrderProcessMessage2 order2 = new OrderProcessMessage2();
 8 
 9             //MyMessage msg = new MyMessage();
10             MyMessage msg1 = new MyMessage();
11             MyMessage msg2 = new MyMessage();
12             MyMessage msg3 = new MyMessage();
13 
14             //msg.MessageRouter = "cd.test.demo";
15             msg1.MessageRouter = "cd.test.demo.*";
16             msg2.MessageRouter = "cd.test.demo.#";
17             msg3.MessageRouter = "chending.topic.#";
18 
19             //MQHelper.Receive(msg, order);
20             MQHelper.ConsumeFanoutMessage();
21             MQHelper.ConsumeDirectMessage();
22             MQHelper.ConsumeTopicMessage(msg3);
23             MQHelper.Subscribe(msg1, order1);
24             //MQHelper.Subscribe(msg1, order2);
25             MQHelper.Subscribe(msg2, order2);
26 
27             Console.WriteLine("Listening for messages.");
28 
29             ProduceThread.ProduceMessage();
30 
31             //ThreadStart threadStart = ProduceThread.ProduceMessage;
32             //Thread thread = new Thread(threadStart);
33             //thread.Start();
34         }
35     }
View Code

二、项目运行

启动 Weiz.Consumer (消费者),启动消费者,会自动在RabbitMQ 服务器上创建相关的exchange 和 queue ,同时调用的ProduceThread.ProduceMessage函数会发送消息,接收到的信息会在Console命令行中进行显示。

原文地址:https://www.cnblogs.com/lucifer1997/p/lucifer1997.html