rabbitmq系列——(4 Exchange Type -- Topic)

  // Exchange绑定队列需要制定key,key可以有自己的规则;可以有占位符;

  // *或者#,*匹配一个单词、#匹配多个单词,相当于在Direct基础上加上模糊匹配;
  // 应用:分组,打标签

1. 生产者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using RabbitMQMsgProducer.ExchangeDemo;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Topic
                    // Exchange绑定队列需要制定key,key可以有自己的规则;可以有占位符;
                    // *或者#,*匹配一个单词、#匹配多个单词,相当于在Direct基础上加上模糊匹配;
                    // 应用:分组,打标签
                    ProducerTopicExchage.Send();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQMsgProducer.ExchangeDemo
{
    public class ProducerTopicExchage
    {
        public static void Send()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服务地址
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            string exchangeName = "TopicExchange";
            string queueName001 = "TopicQueue001";
            string queueName002 = "TopicQueue002";
            string allRoutingKey = "all.#";
            string errorRoutingKey = "*.error";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
                    // 声明queue
                    channel.QueueDeclare(queue: queueName001, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: queueName002, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 绑定
                    channel.QueueBind(queue: queueName001, exchange: exchangeName, routingKey: allRoutingKey, arguments: null);
                    channel.QueueBind(queue: queueName002, exchange: exchangeName, routingKey: errorRoutingKey, arguments: null);

                    {
                        string msg = "this is info message.";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //基本发布
                        channel.BasicPublish(exchange: exchangeName, routingKey: "all.info", basicProperties: null, body: body);
                        Console.WriteLine($"the message _ {msg} _ is send .");
                    }
                    {
                        string msg = "this is error message.";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //基本发布
                        channel.BasicPublish(exchange: exchangeName, routingKey: "all.error", basicProperties: null, body: body);
                        Console.WriteLine($"the message _ {msg} _ is send .");
                    }
                    {
                        string msg = "this is warn message.";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //基本发布
                        channel.BasicPublish(exchange: exchangeName, routingKey: "all.warn", basicProperties: null, body: body);
                        Console.WriteLine($"the message _ {msg} _ is send .");
                    }
                    {
                        string msg = "this is error2 message.";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //基本发布
                        channel.BasicPublish(exchange: exchangeName, routingKey: "2.error", basicProperties: null, body: body);
                        Console.WriteLine($"the message _ {msg} _ is send .");
                    }
                    {
                        string msg = "this is debug message.";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //基本发布
                        channel.BasicPublish(exchange: exchangeName, routingKey: "all.debug", basicProperties: null, body: body);
                        Console.WriteLine($"the message _ {msg} _ is send .");
                    }
                    {
                        string msg = "this is trace message.";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //基本发布
                        channel.BasicPublish(exchange: exchangeName, routingKey: "trace.error", basicProperties: null, body: body);
                        Console.WriteLine($"the message _ {msg} _ is send .");
                    }

                }
            }
        }
    }
}

2. 消费者

2.1 消费者001

using RabbitMQMsgConsumer001.ExchangeDemo;
using RabbitMQMsgConsumer001.MessageConsumer;
using System;
using System.Threading.Tasks;

namespace RabbitMQMsgConsumer001
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Topic
                    // Exchange绑定队列需要制定key,key可以有自己的规则;可以有占位符;
                    // *或者#,*匹配一个单词、#匹配多个单词,相当于在Direct基础上加上模糊匹配;
                    // 应用:分组,打标签
                    ConsumerTopicExchage.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer001.ExchangeDemo
{
    public class ConsumerTopicExchage
    {
        public static void Receive()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服务地址
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            string exchangeName = "TopicExchange";
            string queueName001 = "TopicQueue001";
            //string queueName002 = "TopicQueue002";
            string allRoutingKey = "all.#";
            //string errorRoutingKey = "*.error";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
                    // 声明queue
                    channel.QueueDeclare(queue: queueName001, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.QueueDeclare(queue: queueName002, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 绑定
                    channel.QueueBind(queue: queueName001, exchange: exchangeName, routingKey: allRoutingKey, arguments: null);
                    //channel.QueueBind(queue: queueName002, exchange: exchangeName, routingKey: errorRoutingKey, arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"the consumer 001 is receive : {msg} .");
                    };
                    // 处理消息
                    channel.BasicConsume(queue: queueName001, autoAck: true, consumer: consumer);
                    Console.WriteLine($"the message is all.# .");
                }
            }
        }
    }
}

2.2 消费者002

using RabbitMQMsgConsumer002.ExchangeDemo;
using System;

namespace RabbitMQMsgConsumer002
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Topic
                    // Exchange绑定队列需要制定key,key可以有自己的规则;可以有占位符;
                    // *或者#,*匹配一个单词、#匹配多个单词,相当于在Direct基础上加上模糊匹配;
                    // 应用:分组,打标签
                    ConsumerTopicExchage.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer002.ExchangeDemo
{
    public class ConsumerTopicExchage
    {
        public static void Receive()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服务地址
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            string exchangeName = "TopicExchange";
            //string queueName001 = "TopicQueue001";
            string queueName002 = "TopicQueue002";
            //string allRoutingKey = "all.#";
            string errorRoutingKey = "*.error";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
                    // 声明queue
                    //channel.QueueDeclare(queue: queueName001, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: queueName002, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 绑定
                    //channel.QueueBind(queue: queueName001, exchange: exchangeName, routingKey: allRoutingKey, arguments: null);
                    channel.QueueBind(queue: queueName002, exchange: exchangeName, routingKey: errorRoutingKey, arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"the consumer 002 is receive : {msg} .");
                    };
                    // 处理消息
                    channel.BasicConsume(queue: queueName002, autoAck: true, consumer: consumer);
                    Console.WriteLine($"the message is *.error .");
                }
            }
        }
    }
}

3. 结果

原文地址:https://www.cnblogs.com/Fletcher/p/14184541.html