RabbitMQ Direct交换机代码实现

RabbitMQ交换机有四种类型:Direct,Fanout,Topic,Header

先简单介绍Direct交换机的代码实现

先创建连接

public class MQHelper
    {
        public IConnection GetConnection()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";
            return factory.CreateConnection();
        }
    }

直流交换机代码:

public class DirectExchange
    {      
        public void DirectPublish()
        {
            MQHelper mh = new MQHelper();
            using (var conn = mh.GetConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare(queue: "DirectLogAll", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "DirectLogError", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机
                    channel.ExchangeDeclare(exchange: "DirectExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //日志类型
                    string[] logtypes = new string[] { "debug", "info", "warn", "error" };

                    foreach(string log in logtypes)
                    {
                        //绑定交换机和队列(所有日志类型队列)
                        channel.QueueBind(queue: "DirectLogAll", exchange: "DirectExchange", routingKey: log);
                    }

                    //错误日志队列
                    channel.QueueBind(queue: "DirectLogError", exchange: "DirectExchange", routingKey: "error");


                    List<LogModel> list = new List<LogModel>();
                    //100条消息
                    for(int i = 1; i <= 100; i++)
                    {
                        if (i % 4 == 0)
                        {
                            list.Add(new LogModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条消息") });
                        }else if(i % 4 == 1)
                        {
                            list.Add(new LogModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条消息") });
                        }
                        else if (i % 4 == 2)
                        {
                            list.Add(new LogModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条消息") });
                        }
                        else
                        {
                            list.Add(new LogModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条消息") });
                        }
                    }

                    //发送
                    foreach(var log in list)
                    {
                        channel.BasicPublish(exchange: "DirectExchange", routingKey: log.LogType, basicProperties: null, body: log.Msg);
                        //记录
                        Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已发送");
                    }

                }
            }
        }  
    }

这是模拟发送100条日志信息到RabbitMQ,四种类型各25条,根据路由键routingKey分别传到两条队列。其中日志消息

public class LogModel
    {
        public string LogType { get; set; }     //日志类型

        public byte[] Msg { get; set; }        //消息正文
    }

最终在RabbitMQ的本地localhost:15672上可以看到

这是发送消息到了RabbitMQ上待消费。

添加Direct消费模块

public class DirectExchangeConsumer
    {
        public void DirectConsume()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";
            
            using(var conn= factory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare(queue: "DirectLogAll", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // channel.QueueDeclare(queue: "DirectLogError", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机
                    channel.ExchangeDeclare(exchange: "DirectExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //日志类型
                    string[] logtypes = new string[] { "debug", "info", "warn", "error" };

                    foreach (string log in logtypes)
                    {
                        //绑定交换机和队列(所有日志类型队列)
                        channel.QueueBind(queue: "DirectLogAll", exchange: "DirectExchange", routingKey: log);
                    }

                    //错误日志队列
                    // channel.QueueBind(queue: "DirectLogError", exchange: "DirectExchange", routingKey: "error");


                    //消费消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"[{message}],写入文本");
                    };
                    //处理消息
                    channel.BasicConsume(queue: "DirectLogAll", autoAck: true, consumer: consumer);
                }
            }
        }
    }

虽然这里重复声明了交换机和队列,但由于连接一样,所以不会重复存在。

调用方法之后可以看到消息全部都被消费

记录编程的点滴,体会学习的乐趣
原文地址:https://www.cnblogs.com/AduBlog/p/14891574.html