RabbitMQ 工作队列

1.工作队列

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Worker
{
    class Program
    {
        public static void Main(string[] args)
        {
            //1.prefetchCount  每个接收者任务上限设置
            //2.autoAck 是否自动确认任务
            Console.WriteLine("接收消息服务启动:");
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //durable 持久性
                channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告诉RabbitMQ一次不要给工人一个以上的消息
                Console.WriteLine(" [*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);

                    Console.WriteLine(" [x] Done");
                    // Note: it is possible to access the channel via
                    //       ((EventingBasicConsumer)sender).Model here
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认当前任务完成 即队列中会删除该条任务
                };
                channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);//autoAck 是否自动确认任务 并删除
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }

            Console.WriteLine("结束");
        }

    }
}
using RabbitMQ.Client;
using System;
using System.Text;

namespace NewTask
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //durable 持久性  设置为true rabbitmq重启后该条消息还会存在,如果为false,重启后将不会保存这条消息
                channel.QueueDeclare(queue: "task_queue1", durable: true, exclusive: false, autoDelete: false, arguments: null);
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;

                channel.BasicPublish(exchange: "",
                                     routingKey: "task_queue1",
                                     basicProperties: properties,
                                     body: body);
            }
        }
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }
    }
}

 1.循环调度

     默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环
2.消息确认

    autoAck: true 时RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果您杀死一个程序,我们将丢失正在处理的消息。我们还将丢失发送给该特定工作人员但尚未处理的所有消息。
    autoAck:false时用channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认当前任务完成

3.讯息持久性

     我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。

RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告知不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久性。

//durable 持久性  设置为true rabbitmq重启后该条消息还会存在,如果为false,重启后将不会保存这条消息
                channel.QueueDeclare(queue: "task_queue1", durable: true, exclusive: false, autoDelete: false, arguments: null);

4.公平派遣

    在有两名工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一位工人将一直忙碌而另一位工人将几乎不做任何工作。好吧,RabbitMQ对此一无所知,并且仍将平均分派消息。

发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。它不会查看消费者的未确认消息数。它只是盲目地将每第n条消息发送给第n个使用者。

为了更改此行为,我们可以将BasicQos方法与 prefetchCount = 1设置一起使用。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序。

channel.BasicQos(0, 1, false);
原文地址:https://www.cnblogs.com/lbonet/p/14462045.html