Rabbitmq—基础

1.分布式异步队列

2.队列解读

优点:

以时间换空间:以更长的时间来处理堆集的业务逻辑;

  1. 异步处理;响应很快,增加服务器承载能力;
  2. 削峰,将流量高峰分解到不同的时间段来处理;
  3. 扩展性,UI和业务的解耦,就可以独立演化;
  4. 高可用,处理器发生故障以后,不会影响可用性;

缺点:

  1. 即时性降低,降低了用户的体验---无法避免;业务上来屈服;
  2. 更复杂;
  3. 更加依附于队列了;

3.路由方式

Direct Exchange

【直接交换(按RouteKey)】:
直接按照绑定的RouteKey生产消费。

img

Fanout Exchange

【分发】:
所有绑定fanout exchange队列发送的消息,会被所有绑定fanout exchange队列的消费者消费掉。不按RouteKey处理。

img

Topic Exchange

【模糊匹配】

可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只会匹配到“XiaoChen.money”。

*: 一个单词

#: 多个单词

img

4.优先级(priority)

在RMQ中想要使用优先级特性需要的版本为3.5.0+。

然后我们只需做两件事情:

  1. 将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。

  2. 为优先级消息添加优先级。

生产者

        public static void Make(string queueName, string exchangeName, string routeKey)
        {
            //创建连接工厂
            var factory = new ConnectionFactory()
            {
                UserName = "tangsansan",//用户名
                Password = "123456",//密码
                HostName = "localhost"//rabbitmq ip
            };        
		//创建连接
        var connection = factory.CreateConnection();
        //创建通道
        var channel = connection.CreateModel();
        //定义一个队列
        //channel.QueueDeclare(queueName, false, false, false, null);
        channel.QueueDeclare(queueName, true, false, false, 
            new Dictionary<string, object>() {
                     {"x-max-priority",10 }  //指定队列要支持优先级设置
                   });

        //定义一个Direct类型交换机
        channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);

        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);

        Console.WriteLine("
RabbitMQ连接成功,请输入消息,输入exit退出!");

        string[] questionList =
            {
            "error::This is an error message 1.",
            "info:This is an info message 1.",
            "warning:This is a warning message 1.",
            "error:This is an error message 2.",
            "info:This is an info message 2.",
            "error:This is an error message 3."
        };

        //设置消息优先级
        IBasicProperties props = channel.CreateBasicProperties();
        foreach (string questionMsg in questionList)
        {
            if (questionMsg.StartsWith("error"))
            {
                props.Priority = 9;
                channel.BasicPublish(exchange: exchangeName,
                               routingKey: routeKey,
                               basicProperties: props,
                               body: Encoding.UTF8.GetBytes(questionMsg));
            }
            else
            {
                props.Priority = 1;
                channel.BasicPublish(exchange: exchangeName,
                               routingKey: routeKey,
                               basicProperties: props,
                               body: Encoding.UTF8.GetBytes(questionMsg));
            }

            Console.WriteLine($"{questionMsg} 已发送..!");
        }

        channel.Close();
        connection.Close();
    }

没使用消费者前,去看看 GetMessage...

error全部排到前面来了...

消费者

        public static void Consume(string queueName, string exchangeName, string routeKey)
        {
            //创建连接工厂
            var factory = new ConnectionFactory()
            {
                UserName = "tangsansan",//用户名
                Password = "123456",//密码
                HostName = "localhost"//rabbitmq ip
            };        
        //创建连接
        var connection = factory.CreateConnection();
        //创建通道
        var channel = connection.CreateModel();
        //定义一个队列
        //channel.QueueDeclare(queueName, true, false, false, null);

        //定义一个Direct类型交换机
        //channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);

        //channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);

        Console.WriteLine("
RabbitMQ连接成功,请输入消息,输入exit退出!");

        var consumer = new EventingBasicConsumer(channel);
        string input;
        do
        {
            input = Console.ReadLine();

            consumer.Received += (model, ea) =>
            {
                Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray()));
            };

            //消费消息
            channel.BasicConsume(queueName, true, consumer);

        } while (input.Trim().ToLower() != "exit");

        channel.Close();
        connection.Close();
    }

5.Tx事务模式

channel.TxSelect(); 开启一个事务
channel.TxCommit(); 提交事务
channel.TxRollback(); 事务回滚

try
{
    //开启事务机制
    channel.TxSelect();
    //发送消息
    //同时给多个队列发送消息;要么都成功;要么都失败;
    channel.BasicPublish(exchange: "TxQueueExchange", routingKey: "TxRouteKey01", 
                         basicProperties: null, body: body);
    channel.BasicPublish(exchange: "TxQueueExchange", routingKey: "TxRouteKey02", 
                         basicProperties: null, body: body);
    //事务提交
    channel.TxCommit();
    Console.WriteLine($"发送到Broke成功!");
}
catch (Exception ex)
{
    Console.WriteLine($"发送到Broker失败!");
    channel.TxRollback(); //事务回滚
    throw;
}

6.确认消息

6.1 生产者

保证生产者发送到 Broker

channel.ConfirmSelect() 开启确认模式

(1)WaitForConfirms()

消息发送以后,提供一个回执方法 WaitForConfirms() 返回一个bool 值;

(2)WaitForConfirmsOrDie()

全部执行完,如果失败,就报错。

6.2 消费者

保证消费者被正常消费

6.2.1 自动确认

autoAck: true 自动确认;

在自动确认模式下,消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量(只要消费者能够跟上),不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同,如果消费者的TCP连接或信道在成功投递之前关闭,该消息则会丢失。

使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用,限制信道上未完成(“进行中”)传送的数量。 然而,对于自动确认,根据定义没有这样的限制。 因此,消费者可能会被交付速度所压倒,可能积压在内存中,堆积如山,或者被操作系统终止。 某些客户端库将应用TCP反压(直到未处理的交付积压下降超过一定的限制时才停止从套接字读取)。 因此,只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。

//处理消息 
//autoAck: true 自动确认
//channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);

消费者直接对这8条消息自动确认,不管是否处理,Broker 都会直接删除所有的这8条消息。

6.2.2 手动确认

//处理消息 
//autoAck: false  手动确认
channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: false, consumer: consumer);
void BasicAck(ulong deliveryTag, bool multiple);

void BasicReject(ulong deliveryTag, bool requeue);

void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
  • deliveryTag:可以看作消息的编号,它是一个64位的长整型值,最大值是9223372036854775807。
  • requeue:如果requeue 参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue 参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
  • multiple:在BasicAck中,multiple 参数设置为true 则表示确认deliveryTag编号之前所有已被当前消费者确认的消息。在BasicNack中,multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。
  • BasicReject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。
6.2.2.1 成功确认
 //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
6.2.2.2 失败确认
//否定:告诉Broker,这个消息我没有正常消费;  
//requeue: true:重新写入到队列里去; false:你还是删除掉;
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);

参考代码

using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    #region EventingBasicConsumer
                    //定义消费者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    { 
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        //如果在这里处理消息的手,异常了呢? 
                        //Console.WriteLine($"接收到消息:{message}"); ;

                        if (i < 50)
                        {
                            //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Console.WriteLine(message);
                        }
                        else
                        {
                            //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                        }
                        i++;
                    };
                    Console.WriteLine("消费者准备就绪...."); 
                    {
                        //处理消息 
                        //autoAck: true 自动确认; 
                        //channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);
                    } 
                    {
                        //处理消息 
                        //autoAck: false  手动确认 
                        channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: false, consumer: consumer);
                    }


                    Console.ReadKey();
                    #endregion
                }
            }

参考

分布式系统消息中间件——RabbitMQ的使用思考篇

原文地址:https://www.cnblogs.com/tangge/p/14146597.html