RabbitMQ路由器

目录

  • 1、direct(明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致)
  • 2、fanout (消息广播,将消息分发到exchange上绑定的所有队列上)
  • 3、topic (模式匹配的路由规则:支持通配符)

1.direct(明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致)

  当生产者发送消息的时候,使用direct Exchange:队列可以和key绑定绑定在一起,产生的这个路由键是一个唯一的键,没一个key都对应唯一的队列, 通过这个key和队列可以把消息发送到固定的队列

  

          conModel.QueueDeclare(queue: "DirectAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
          conModel.QueueDeclare(queue: "DirectErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
          conModel.ExchangeDeclare(exchange: "Direct", type: ExchangeType.Direct, durable: false, arguments: null);
          string[] logtypes = new string[] { "debug", "info", "warn", "error" };
          foreach (var item in logtypes)
          {
              conModel.QueueBind(queue: "DirectAllQueue",
                  exchange: "Direct",
                  routingKey: item,
                  arguments: null);
          }
          conModel.QueueBind(queue: "DirectErrorQueue",
                  exchange: "Direct",
                  routingKey: "error",
                  arguments: null);

          List<Info> infos = new List<Info>();
          for (int i = 1; i < 100; i++)
          {
              if (i % 4 == 0)
              {
                  infos.Add(new Info { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
              }
              if (i % 4 == 1)
              {
                  infos.Add(new Info { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug{i}条信息") });
              }
              if (i % 4 == 2)
              {
                  infos.Add(new Info { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn{i}条信息") });
              }
              if (i % 4 == 3)
              {
                  infos.Add(new Info { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error{i}条信息") });
              }
          }
          foreach (var item in infos)
          {
              conModel.BasicPublish(exchange: "Direct",
                  routingKey: item.LogType,
                  basicProperties: null,
                  body: item.Msg);
              Console.WriteLine($"{Encoding.UTF8.GetString(item.Msg)}已经发送");
          }
          Console.ReadKey();
    public class Info
    {
        public string LogType { get; set; }
        public byte[] Msg { get; set; }
    }

  以上代码,声明了两个队列一个Direct路由器,然后存在4中类型的消息,模拟创建100条每种有25条消息。在发送的之前all队列绑定绑定了所有的消息类型,error队列仅仅绑定了错误类型。然后将100笔数据在发送时绑定上数据自身的类型。当数据发送完毕后我
们查看结果。

   可以发现Direct Exchange和all队列绑定了所有类型,他存在有100比数据。Direct Exchange和Error队列绑定了error类型只有25比数据。

  我们在来查看消费者代码

  

       conModel.ExchangeDeclare(exchange: "Direct", type: ExchangeType.Direct, durable: false, arguments: null);
       conModel.QueueBind(queue: "DirectAllQueue",
       exchange: "Direct",
       routingKey: "info"); 
       var consumer = new EventingBasicConsumer(conModel);
       consumer.Received += (model, ea) =>
       {
           var body = ea.Body;
           var msg = System.Text.Encoding.UTF8.GetString(body.ToArray());
           Console.WriteLine($"{msg},接受消息");
       };
       conModel.BasicConsume(queue: "DirectAllQueue",
               autoAck: true,
               consumer: consumer); 

  消费者在这个里使用的info的key运行结果是All队列中所有的消息都被消费完毕

  

2. fanout (消息广播,将消息分发到exchange上绑定的所有队列上)

  

  发布者可以将一笔信息发送到所有绑定fanout路由器的队列中,然后所有的消费者都可以接受到信息,这个可以应用于给用户推送打折活动这种应用场景下。

  3、topic (模式匹配的路由规则:支持通配符)

        opic是direct的升级版,是一种模式匹配的路由机制。它支持使用两种通配符来进行模式匹配:符号#和符号*。其中*匹配一个单词, #则表示匹配0个或多个单词,单词之间用.分割。如下图所示。

    

原文地址:https://www.cnblogs.com/-alvin/p/13411210.html