C#/.Net集成RabbitMQ,实现点对点模式和发布/订阅模式

RabbitMQ简介

  消息 (Message) 是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、 JSON 等,也可以很复杂,比如内嵌对象。

  消息队列中间件 (Message Queue Middleware,简称为 MQ) 是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

  消息队列中间件,也可以称为消息队列或者消息中间件。它一般有两种传递模式:点对点 (P2P, Point-to-Point) 模式和发布/订阅 (Pub/Sub) 模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。 发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题 (topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用 。

  RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

  关于RabbitMQ的原理及详细介绍百度上很多,这里就不做复制粘贴了。

生产者和消费者

  在使用RabitMQ之前,先对几个概念做一下说明:

生产者(Producer)

  生产者就是投递消息的一方。生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含2个部分:消息体和标签 (Label)。消息体也可以称之为 payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息 ,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者 (Consumer)。

消息中间件的服务节点(Broker)

  对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点 ,或者RabbitMQ服务实例 。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。

  首先生产者将业务方数据进行可能的包装, 之后封装成消息, 发送 (AMQP 协议里这个动 作对应的命令为 Basic . Publish) 到 Broker 中 。 消费者订阅并接收消息 (AMQP 协议里这个动作对应的命令为 Basic.Consurne 或者 Basic. Get),经过可能的解包处理得到原始的数据, 之后再进行业务处理逻辑。这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。 消费者进程可以使用一个线程去接收消息,存入到内存中。业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解稿,提高整个应用的处理效率。

队列(Queue)

  队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息都只能存储在队列中,RabbitMQ的生产者生产消息井最终技递到队列中,消费者可以从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询) 给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。

消费者(Consumer)

  消费者,就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体 (payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只 有消息体,消费者也只会消费到消体,也就不知道消息的生产者是谁,当然消费者也不需要知道。

项目中简单集成MQ点对点模式

首先先安装好RabbitMQ,可以参考我之前写的教程 https://www.cnblogs.com/yindi0712/p/13447814.html

1.这里我创建了一个Mvc项目作为生产者端,一个控制台作为消费者端

2.两个项目分别引用RabbitMQ.Client.dll,可以在官网下载:

下载地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

也可以直接从Nuget中安装,二者效果是一样的

我这边选择用Nuget安装。

3.简单集成,实现消息生产和消费

生产者端

我这边创建了个简单的mvc项目,引用RabbitMQ.Client.dll,在HomeController创建一个名为Send的Action

public ActionResult Send(SendModel model)
        {
            //实例化一个连接工厂和其配置为使用所需的主机,虚拟主机和证书(证书)
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = model.Url;//RabbitMQ主机服务地址
            factory.UserName = model.UserName;//用户名
            factory.Password = model.Password;//密码
            //创建一个AMQP 0-9-1连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个AMQP 0-9-1频道,该对象提供了大部分 的操作(方法)协议。
                using (IModel channel = connection.CreateModel())
                {
                        channel.QueueDeclare(model.Queue, false, false, false, null);//创建一个名称消息队列
                        string message = model.Content; //传递的消息内容
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", model.Queue, null, body); //开始传递
                }
            }
            return RedirectToAction("index");
        }

消费者端

我这里创建了一个控制台应用,同样引用RabbitMQ.Client.dll,消费的逻辑直接写在Main方法里了

static void Main(string[] args)
        {
            //实例化一个连接工厂和其配置为使用所需的主机,虚拟主机和证书(证书)
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";//RabbitMQ主机服务地址
            factory.UserName = "test";//用户名
            factory.Password = "testpwd";//密码
            //创建一个AMQP 0-9-1连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个AMQP 0-9-1频道,该对象提供了大部分 的操作(方法)协议。
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare("getMessage", false, false, false, null);//声明一个名称消息队列
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//创建一个消费者实现c#事件处理程序实例
                    channel.BasicConsume("getMessage", true, consumer);//开始消费
                    //消息到达消费者时触发的事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine("已接收: {0}", message);
                    };
                    Console.ReadLine();
                }
            }
        }

总结:

1.首先不管是消费者还是生产者都要先创建一个连接工厂ConnectionFactory配置连接的基本信息;

2.通过ConnectionFactory.CreateConnection创建一个IConnection对象表示创建一个AMQP 0-9-1连接;

3.IConnection.CreateModel创建一个AMQP 0-9-1频道,该对象提供了大部分 的操作(方法)协议;

4.使用IModel.QueueDeclare声明一个消息队列,生产者和消费者的队列名称要一致;

5.生产者端:使用IModel.BasicPublish发送消息到消息队列;

6.消费者端:创建一个消费者事件对象EventingBasicConsumer;使用IModel.BasicConsume将该消费者启动,在消息接收触发事件EventingBasicConsumer.Received做后续操作。

划重点:Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程问共享, 应用程序应该为每一个线程开辟一个 Channel。某些情况下 Channel 的操作可以并发运行,但 是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响友送方确认( publisher confrrm)机制的运行,所以多线程问共享 Channel 实例是非线程安全的

至此一个简单的消费者和生产者端就搭建好了,下面我们运行起来看一下效果:

上面是我本地的配置信息,下面我创建一个getMessage队列并发送一条123的消息。

我们直接进管理界面看吧,就不敲命令了,如果本地装了管理插件直接访问http://127.0.0.1:15672/,我们先进到Queues,我们可以看到刚刚创建的getMessage队列

然后进入到队列内部,去点一下GetMessages按钮:

我们可以看到刚才发送的123已经在队列中了,现在我们运行消费者端

我们可以看到这个消息已经输出出来了,这时候再去点击GetMessages,这时候就会提示Queue is empty,表示该消息已经消费了。可以把两个项目同时运行,多发几条消息基本上是实时的。

发布订阅模式

交换器(Exchange)

  我们暂时可以理解成生产者将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。这里可以将RabbitMQ中的交换器看作一个简单的实体。

  交换器有四种类型fanout、direct、topic、headers。我这边使用了fanout,它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

 路由键(RoutingKey):

  生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。

绑定(Binding)

RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。

  生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。 BindingKey并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

上代码,这次我创建了一个生产者两个消费者。

生产者:

private void PublishSend(SendModel model)
        {
            //实例化一个连接工厂和其配置为使用所需的主机,虚拟主机和证书(证书)
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = model.Url;//RabbitMQ主机服务地址
            factory.UserName = model.UserName;//用户名
            factory.Password = model.Password;//密码
            //创建一个AMQP 0-9-1连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个AMQP 0-9-1频道,该对象提供了大部分 的操作(方法)协议。
                using (IModel channel = connection.CreateModel())
                {
                    //创建两个队列
                    channel.QueueDeclare("PublishSubscrib01", true, false, false, null);
                    channel.QueueDeclare("PublishSubscrib02", true, false, false, null);
                    //声明一个交换器
                    channel.ExchangeDeclare("PublishSubscribExchange", ExchangeType.Fanout, true, false, null);
                    //将交换器与队列绑定
                    channel.QueueBind("PublishSubscrib01", "PublishSubscribExchange", "RoutingKey", null);
                    channel.QueueBind("PublishSubscrib02", "PublishSubscribExchange", "RoutingKey", null);
                    string message = model.Content;
                    byte[] body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish("PublishSubscribExchange", "RoutingKey", null,body);
                }
            }
        }

消费者1

private static void Subcribe()
        {
            //实例化一个连接工厂和其配置为使用所需的主机,虚拟主机和证书(证书)
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";//RabbitMQ主机服务地址
            factory.UserName = "test";//用户名
            factory.Password = "testpwd";//密码
            //创建一个AMQP 0-9-1连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个AMQP 0-9-1频道,该对象提供了大部分 的操作(方法)协议。
                using (IModel channel = connection.CreateModel())
                {
                    #region 定义队列、交换器并绑定
                    channel.QueueDeclare("PublishSubscrib01", true, false, false, null);
                    channel.ExchangeDeclare("PublishSubscribExchange", ExchangeType.Fanout, true, false, null);
                    channel.QueueBind("PublishSubscrib01", "PublishSubscribExchange", "RoutingKey", null);
                    #endregion
                    Console.WriteLine("订阅者01 已经准备就绪~~");
                    //消费者
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"订阅者01收到消息:{message} ~");
                    };
                    //自动ack消费
                    channel.BasicConsume("PublishSubscrib01", true,consumer);
                    Console.ReadLine();
                }
            }
        }

消费者2就不放了,跟1是一样的,复制出来把1换成2就好了(实际生产环境业务逻辑可能不同)。


下面我们同时启动三个项目发送一条消息看看

 我们可以看到两个消费者端同时都接收到了消息,下面我们去RabbitMQ管理界面去看一下。

首先,我们看到Exchange菜单下出现了我们刚刚新增的交换器

 点击进去可以看到交换器和我们声明的两个队列的绑定

 说明我们这次RabbitMQ发布/订阅的部署时成功的。

————————————————————————————————————————————————————

参考书籍:RabbitMQ实战指南。

原文地址:https://www.cnblogs.com/yindi0712/p/13447836.html