RabbitMQ(六)——路由模式

RabbitMQ系列

RabbitMQ(一)——简介

RabbitMQ(二)——模式类型

RabbitMQ(三)——简单模式

RabbitMQ(四)——工作队列模式

RabbitMQ(五)——发布订阅模式

RabbitMQ(六)——路由模式

RabbitMQ(七)——主题模式

RabbitMQ(八)——消息确认

RabbitMQ(九)——消息持久化

RabbitMQ(十)——消息优先级

前言

  本章讲解路由模式,路由模式跟发布订阅模式类似,然后在发布订阅模式的基础上改变了类型(fanout => direct),订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由的队列,我们可以看一下下面这张图:

以上的图是info,error,warning为路由,表示日志的等级通过不同的路由发送到不同的队列中,下面开始路由模式start~~~~

发布订阅模式 VS 路由模式

  发布:

    发布订阅模式的类型为fanout,而路由模式类型为direct;

//发布订阅模式
channel.ExchangeDeclare(ExchangeName, "fanout");
//路由模式
channel.ExchangeDeclare(ExchangeName, "direct");

  

    订阅发布模式发布时roukey参数为空字符串,路由模式指定了路由;

//发布订阅模式
channel.BasicPublish(ExchangeName, "", null, body);
//路由模式
channel.BasicPublish(ExchangeName, RouteName, null, body);

  

  接收:

      定义交换器时,与发布时一致,发布订阅模式为fanout,路由模式为direct;

//发布订阅模式
channel.ExchangeDeclare(ExchangeName, "fanout");
//路由模式
channel.ExchangeDeclare(ExchangeName, "direct");

  

      绑定路由时,发布订阅模式的routekey参数为空字符串,表示接受所有消息,而路由模式的routekey参数必须指定某一路由。

//发布订阅模式
channel.QueueBind(queueName, ExchangeName, "");
//路由模式
channel.QueueBind(queueName, ExchangeName, routkey);

  

通过以上对比可以知道,路由模式与发布订阅模式基本一致,唯一差距就是两个参数,exchange类型和 routingKey

 

代码

  生产者:

static void Main(string[] args)
        {
            Console.WriteLine("DirectServer发布服务器启动...");

            //1.创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.创建连接
            using (var conn = factory.CreateConnection())
            {
                //3.创建通道
                using (var channel = conn.CreateModel())
                {
                    //4.声明交换器
                    channel.ExchangeDeclare("directExchange", "direct");

                    string msg = "";
                    for (int i = 0; i < 20; i++)
                    {
                        msg = $"发布消息{i}";
                        string ROUTE_KEY = "";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //模拟向不同路由发送消息
                        if (i % 2 == 0)
                        {
                            ROUTE_KEY = "route1";
                        }
                        else
                        {
                            ROUTE_KEY = "route2";
                        }
                        //5.发布消息
                        channel.BasicPublish("directExchange", ROUTE_KEY, null, body);
                        Console.WriteLine($"向{ROUTE_KEY}发布消息成功:{msg}");

                        Thread.Sleep(1000);
                    }
                    Console.ReadKey();
                }
            }
        }
View Code

  消费者1:

static void Main(string[] args)
        {
            Console.WriteLine("DirectClient接收客户端启动...");
            //1.创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.创建连接
            using (var conn = factory.CreateConnection())
            {
                //3.创建通道
                using (var channel = conn.CreateModel())
                {
                    //3.声明队列
                    var queue = channel.QueueDeclare().QueueName;
                    //4.绑定交换器
                    channel.QueueBind(queue, "directExchange", "route1");
                    //5.声明消费者 消费消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                      {
                          //接收消息
                          var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                          Console.WriteLine($"接收route1消息:{body.ToString()}");
                      };
                    channel.BasicConsume(queue, true, consumer);

                    Console.ReadKey();
                }
            }
        }
View Code

  

  消费者2:

static void Main(string[] args)
        {
            Console.WriteLine("DirectClient接收客户端启动...");
            //1.创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.创建连接
            using (var conn = factory.CreateConnection())
            {
                //3.创建通道
                using (var channel = conn.CreateModel())
                {
                    //3.声明队列
                    var queue = channel.QueueDeclare().QueueName;
                    //4.绑定交换器
                    channel.QueueBind(queue, "directExchange", "route2");
                    //5.声明消费者 消费消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, e) =>
                    {
                        byte[] message = e.Body.ToArray();
                        Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                            //返回消息确认
                            channel.BasicAck(e.DeliveryTag, false);
                    };
                    //消费者开始监听
                    channel.BasicConsume(queue, true, consumer);

                    Console.ReadKey();
                }
            }
        }
View Code

  可以看到生产者发布消息时指定了路由rout1/rout2,随后消息会转发到route1/route2路由上。接收时通过将一个队列与交换器绑定,指定路由route1/route2,这样就能接收到route1/route2路由上的消息。

效果

  消费者定义的随机队列

                                                

  向route1、route2发布消息,两个消费者分别接收route1和route2的消息

效果

  路由模式中生产者发布消息时指定路由,向指定路由发送,消费者绑定交换器与路由即可接收到生产者向此路由发布的消息;

  只有将消费者发送消息的交换器、路由 与生产者指定的交换器、路由一致,消费者才能接收到生产者向指定路由的消费者发送的消息。

  注意:声明路由时类型必须为direct

附上Demo地址:https://github.com/1164887865/RabbitMQDemo

原文地址:https://www.cnblogs.com/zousc/p/12739504.html