RabbitMQ交换机类型分析

先附加下官网文档。RabbitMQ的交换机类型共有四种,是根据其路由过程的不同而划分成的:

  • Direct Exchange(直连交换机)

  • Fanout Exchange(扇型交换机)

  • Topic Exchange(主题交换机)

  • Headers Exchange(头交换机)

一、扇形交换机

1、原理

 

Fanout Exchange(扇型交换机):当一个Msg发送到扇形交换机X上时,则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。 

2、示例

生产者:

using RabbitMQ.Client;
using System;
using System.Text;

namespace Sender1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "111.111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Fanout);                
                channel.BasicPublish(exchange: "ex1",
                                     routingKey: "",
                                     basicProperties: null,
                                     body: Encoding.UTF8.GetBytes("1"));
                Console.WriteLine(" [x] Sent {0}", "1");
            }

            Console.ReadLine();
        }
    }
}

消费者(可以有多个消费者):

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receiver1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "111.111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Fanout);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queueName, "ex1", "");              
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.ReadLine();
            }
        }
    }
}

二、直连交换机

1、原理

直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机,并且具备了选择性接收消息的能力。

这种配置下,我们可以看到有两个队列Q1、Q2绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。

2、示例

生产者:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace Sender1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            //消息路由键列表
            List<string> colors = new List<string> { "black", "orange", "red" };
            var factory = new ConnectionFactory() { HostName = "111.111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Direct);
                foreach (var color in colors)
                {
                    channel.BasicPublish(exchange: "ex1",
                                     routingKey: color,
                                     basicProperties: null,
                                     body: Encoding.UTF8.GetBytes(color));
                    Console.WriteLine(" [x] Sent {0}", color);
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

消费者(可以有多个消费者): 

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace Receiver1
{
    internal class Program
    {
        static void Main(string[] args)
        {var factory = new ConnectionFactory() { HostName = "111.111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Direct);
                var queueName = channel.QueueDeclare().QueueName;
                //队列绑定建 黑色
                channel.QueueBind(queueName, "ex1", "black");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);


                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

可以设置多个消费者,更改下队列绑定建即可。效果如下:

生产者发送三个消息到交换器ex1,消息的路由键分别为:black、orange、red

我这里定义了两个消费者,绑定到交换器ex1,绑定建分别为black和orange。效果如下:

 

三、主题交换机

1、原理

(1)路由键和绑定键命名

  • 消息路由键—发送到主题交换机的消息所携带的路由键(routing_key)不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词,不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。只要不超过255个字节,词的长度由你来定。

  • 绑定键(binding key)也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键(routing key)的消息会被投送给所有绑定键(binding key)与之相匹配的队列。尽管如此,仍然有两条与绑定键相关的特殊情况:

    •   `*` (星号) 能够替代一个单词。

    •   `#` (井号) 能够替代零个或多个单词。

(2)示例解析,如上图:

我们将会发送用来描述动物的多条消息。发送的消息包含带有三个单词(两个点号)的路由键(routing key)。路由键中第一个单词描述速度,第二个单词是颜色,第三个是品种: "<速度>.<颜色>.<品种>"。我们创建三个绑定:Q1通过"*.orange.*"绑定键进行绑定,Q2使用"*.*.rabbit" 和 "lazy.#"。

队列绑定键解释:

  • Q1针对所有的橘色orange动物。
  • Q2针对每一个有关兔子rabbits和慵懒lazy的动物的消息。

消息路由键解释:

  • 一个带有"quick.orange.rabbit"路由键的消息会给两个队列都进行投送。消息"lazy.orange.elephant"也会投送给这两个队列。
  • 另外一方面,"quick.orange.fox" 只会给第一个队列。"lazy.pink.rabbit"虽然与两个绑定键都匹配,但只会给第二个队列投送一遍。"quick.brown.fox" 没有匹配到任何绑定,因此会被丢弃掉。

(3)异常情况

如果我们破坏规则,发送的消息只带有一个或者四个单词,例如 "orange" 或者 "quick.orange.male.rabbit"会发生什么呢?结果是这些消息不会匹配到任何绑定,将会被丢弃。另一方面,“lazy.orange.male.rabbit”即使有四个单词,也会与最后一个绑定匹配,并 被投送到第二个队列。

(4)注意事项

主题交换机非常强大,并且可以表现的跟其他交换机相似。

  • 当一个队列使用"#"(井号)绑定键进行绑定。它会表现的像扇形交换机一样,不理会路由键,接收所有消息。

  • 当绑定当中不包含任何一个 "*" (星号) 和 "#" (井号)特殊字符的时候,主题交换机会表现的跟直连交换机一毛一样。 

2、示例

生产者:

using RabbitMQ.Client;
using System;
using System.Text;

namespace Sender1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "111.111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Topic);
                //第一个消息
                channel.BasicPublish(exchange: "ex1",
                                    routingKey: "quick.orange.rabbit",
                                    basicProperties: null,
                                    body: Encoding.UTF8.GetBytes("quick.orange.rabbit"));
                Console.WriteLine(" [x] Sent {0}", "第一个消息:quick.orange.rabbit");

                //第二个消息
                channel.BasicPublish(exchange: "ex1",
                                    routingKey: "quick.orange.fox",
                                    basicProperties: null,
                                    body: Encoding.UTF8.GetBytes("quick.orange.fox"));
                Console.WriteLine(" [x] Sent {0}", "第二个消息:quick.orange.fox");

                //第三个消息
                channel.BasicPublish(exchange: "ex1",
                                    routingKey: "lazy.pink.rabbit",
                                    basicProperties: null,
                                    body: Encoding.UTF8.GetBytes("lazy.pink.rabbit"));
                Console.WriteLine(" [x] Sent {0}", "第三个消息:lazy.pink.rabbit");
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

消费者1:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace Receiver1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Topic);
                var queueName = channel.QueueDeclare().QueueName;
                //队列绑定建
                channel.QueueBind(queueName, "ex1", "*.orange.*");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);


                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

消费者2:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Receiver2
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "111.111.111", UserName = "admin", Password = "admin" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("ex1", ExchangeType.Topic);
                var queueName = channel.QueueDeclare().QueueName;
                //队列绑定键
                channel.QueueBind(queueName, "ex1", "*.*.rabbit");
                channel.QueueBind(queueName, "ex1", "lazy.#");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

生产者效果:

 消费者1效果:

 消费者2效果:

四、头交换机 

头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。在头交换机里有一个特别的参数”x-match”,当”x-match”的值为“any”时,只需要消息头的任意一个值匹配成功即可,当”x-match”值为“all”时,要求消息头的所有值都需相等才可匹配成功。

原文地址:https://www.cnblogs.com/qtiger/p/15740631.html