RabbitMQ (三) 工作队列之轮询分发

上一篇讲了简单队列,实际工作中,这种队列应该很少用到,因为生产者发送消息的耗时一般都很短,但是消费者收到消息后,往往伴随着对高消息的业务逻辑处理,是个耗时的过程,这势必会导致大量的消息积压在一个消费者手中,从而导致业务的积压.

所以我们需要多个消费者一起消费队列中的消息,模型如下:(为了方便讲解,暂时隐藏掉"交换机")

生产者

复制代码
    public class Producer
    {
        private const string QueueName = "test_work_queue";
        public static void Send()
        {
            //获取一个连接
            using (IConnection connection = ConnectionHelper.GetConnection())
            {
                //从连接中获取一个信道
                using (IModel channel = connection.CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare(QueueName, false, false, false, null);

                    for (int i = 0; i < 50; i++)
                    {
                        //创建消息
                        string msg = "hello world " + i;
                        //发送消息
                        channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                        Console.WriteLine($"{DateTime.Now} : send {msg}");
                    }
                }
            }
        }
    }
复制代码

消费者1

复制代码
    public class Consumer1
    {
        private const string QueueName = "test_work_queue";
        public static void Receive()
        {
            //获取一个连接
            IConnection connection = ConnectionHelper.GetConnection();

            //从连接中获取一个信道
            IModel channel = connection.CreateModel();

            //声明队列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //添加消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注册消费者收消息事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 receive : " + str);
                Thread.Sleep(500);//休息0.5秒
            };

            //开启消费者监听
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }
复制代码

消费者2

只有一点点区别:

                Console.WriteLine("consumer2 receive : " + str);
                Thread.Sleep(1000);//休息1秒

我们这里故意让两个消费者处理消息的耗时不一样,一个0.5秒,一个1秒.

我们来看看结果:

可以非常清楚的看到,尽管两个消费者处理消息的"耗时"不一样,但是处理的"数量"是一样的.

这里有几个细节要说明一下:

1.在生产者和两个消费者中都声明了同一个队列.其实,如果这个队列之前已经存在了,那么生产者和消费者都可以不用再声明了;

2.一定要先启动两个消费者,再启动生产者.原因是,我们上面的代码中,消费者的 BasicConsume 方法的第2个参数传入的是 true,

这个参数就是 autoAck :是否自动确认(上面文章有讲过).

所以如果先开启生产者,那么会瞬间发送完50条消息,这时候启动消费者1,那么会立刻"消费"掉这50条消息.有朋友肯定要问,不是"睡"了0.5秒么?

这里"睡"0.5秒,是对消息的业务逻辑处理耗时,而不是"消费"消息,消息已经在消费者启动的那一刻从队列中"拿"过来了;

同时,由于采用的是"自动确认",所以队列看到50条都被"确认"了,就会将这些消息从队列中移除.

这时候再启动消费者2,则不会收到任何消息.

原文地址:https://www.cnblogs.com/liujunjun/p/14140879.html