RabbitMQ消息队列实现30分钟订单自动取消功能(C#)

目录:

功能介绍

消息队列简介及原理

代码与实现

消息队列常见问题

功能介绍

一 . 简单介绍一下要做的功能,用户前台下单之后,如果用户未支付,30分钟后订单会自动取消,订单状态和库存变回原来状态和库存,我们的后台使用asp.net core 2.0开发,而asp.net core后台的定时任务 需要添加服务 services.AddHostedService<DeadListener>(); 实现类直接继承IHostedService接口,接口会调用 启动 方法 StartAsync 。好了,不多介绍启动定时任务,(主要介绍的是消息队列RabbitMQ)然后 简单的逻辑就是 用户下单会把一条消息插入生产队列中,当然消息队列的配置是30分钟,30分钟之内如果用户支付,就会调用消费者接口,将消息消费掉,如果30分钟没有支付,超时消息会到死信队列中,然后后台任务会检查到死信队列中的消息,将消息消费掉,过程中会改订单状态等

消息队列简介及原理

二 . 再简单介绍一下消息队列吧,我也是第一次用,网上太多帖子了,哈哈

  • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
  • Channel(信道):消息推送使用的通道;
  • Exchange(交换器):用于接受、分配消息;
  • Queue(队列):用于存储生产者的消息;
  • RoutingKey(路由键):用于把生成者的数据分配到交换器上;
  • BindingKey(绑定键):用于把交换器的消息绑定到队列上;

 

三. 工作机制

生产者、消费者和代理

在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。

生产者:消息的创建者,负责创建和推送数据到消息服务器;

消费者:消息的接收方,用于处理数据和确认消息;

代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

消息发送原理

首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?

你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。

信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

为什么不通过TCP直接发送命令?

对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

四.消息持久化

Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。

当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:

  1. 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
  2. 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
  3. 消息已经到达持久化交换器上;
  4. 消息已经到达持久化的队列;

持久化工作原理

Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

持久化的缺点

消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

所以使用者要根据自己的情况,选择适合自己的方式。

代码与实现

1.声明队列的参数说明

1 //声明队列
2  channel.QueueDeclare
3  (
4      queue: QueueName, //队列名称
5      durable: false, //队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!!
6       exclusive: false, //队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除.
7      autoDelete: true, //如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完)
8      arguments: null //队列的配置
9  );
1 //加载消息队列(订单超时)
2 //定时任务触发器
3  services.AddHostedService<DeadListener>();
4 或者
5  services.AddTransient<IHostedService, DeadListener>();
View Code

2.消息队列(生产者)

 1        /// <summary>
 2         /// 订单超时未处理消息队列(生产者)
 3         /// </summary>
 4         /// <param name="routeKey"></param>
 5         /// <returns></returns>
 6         public Task PublisherOrder(string routeKey)
 7         {
 8             const string routingKeyDead = "queue-dead-routing-jd"; //死信队列路由
 9             var routingKeyDelay = "queue-delay-" + routeKey;//消息队列路由
10             const string orderQueueName = "zzhelloJd"; //定义消息队列名
11             const string orderQueueDeadName = "zzhello_dead_Jd"; //定义一个死信消息队列名
12 
13             var factory = new ConnectionFactory
14             {
15                 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用户名
16                 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密码
17                 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip
18             };
19             using (var connection = factory.CreateConnection())
20             {
21                 using (var channel = connection.CreateModel())
22                 {
23                     //定义死信交换机
24                     channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null);
25                     //创建一个名叫"zzhello_dead"的消息队列
26                     channel.QueueDeclare(orderQueueDeadName, true, false, false, null);
27                     //将死信队列绑定到死信交换机
28                     channel.QueueBind(orderQueueDeadName, "exchange-D", routingKeyDead);
29                     var dic = new Dictionary<string, object>
30                     {
31                         {"x-message-ttl", 1800000},//队列上消息过期时间,应小于队列过期时间 60000 1800000
32                         //{"x-message-ttl", 120000},//队列上消息过期时间,应小于队列过期时间 60000 1800000
33                         {"x-dead-letter-exchange", "exchange-D"},//过期消息转向路由
34                         {"x-dead-letter-routing-key", routingKeyDead}//过期消息转向路由相匹配routingkey
35                     };
36                     channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定义一个Direct类型交换机
37                     //创建一个名叫"zzhello"的消息队列
38                     channel.QueueDeclare(orderQueueName, true, false, false, dic);
39                     //将队列绑定到交换机
40                     channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic);
41                     var body = Encoding.UTF8.GetBytes(routeKey.ToString());
42                     //向该消息队列发送消息message
43                     channel.BasicPublish("exchange-L",
44                             routingKeyDelay,
45                             null,
46                             body);
47                 }
48             }
49             return Task.CompletedTask;
50         }

3.消息队列(消费者)

 1      /// <summary>
 2         /// 支付成功后处理消费者
 3         /// </summary>
 4         /// <returns></returns>
 5         [Obsolete]
 6         public Task ConsumerOrder(string routeKey)
 7         {
 8             const string orderQueueName = "zzhelloJd"; //定义消息队列名
 9             var routingKeyDelay = "queue-delay-" + routeKey;//消息队列路由
10             const string routingKeyDead = "queue-dead-routing-jd"; //死信队列路由
11             var factory = new ConnectionFactory
12             {
13                 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用户名
14                 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密码
15                 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip
16             };
17             using (var connection = factory.CreateConnection())
18             {
19                 using (var channel = connection.CreateModel())
20                 {
21                     var dic = new Dictionary<string, object>
22                     {
23                         {"x-message-ttl", 1800000},//队列上消息过期时间,应小于队列过期时间 60000 1800000
24                         {"x-dead-letter-exchange", "exchange-D"},//过期消息转向路由
25                         {"x-dead-letter-routing-key", routingKeyDead}//过期消息转向路由相匹配routingkey
26                     };
27                     channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定义一个Direct类型交换机
28                     //创建一个名叫"zzhello"的消息队列
29                     channel.QueueDeclare(orderQueueName, true, false, false, dic);
30                     //将队列绑定到交换机
31                     channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic);
32                     //回调,当consumer收到消息后会执行该函数
33                     //var consumer = new EventingBasicConsumer(channel);
34                     //consumer.Received += (model, ea) =>
35                     //{
36                     //    var body = ea.Body;
37                     //    var message = Encoding.UTF8.GetString(body);
38                     //};
39 
40                     ////消费队列"hello"中的消息
41                     //channel.BasicConsume(queue: name,
42                     //                     autoAck: true,
43                     //                     consumer: consumer);
44 
45                     var consumer = new QueueingBasicConsumer(channel);
46                     //消费队列,并设置应答模式为程序主动应答
47                     channel.BasicConsume(orderQueueName, false, consumer);
48 
49                     //阻塞函数,获取队列中的消息
50                     var ea = consumer.Queue.Dequeue();
51                     var bytes = ea.Body;
52                     var str = Encoding.UTF8.GetString(bytes);
53                     Console.WriteLine("队列消息:" + str);
54                     //回复确认
55                     channel.BasicAck(ea.DeliveryTag, false);
56 
57                 }
58             }
59             return Task.CompletedTask;
60         }

4.消费死信队列

 1     public class DeadListener : RabbitListener
 2     {
 3 
 4         #region Fileds
 5 
 6         // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
 7         // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
 8         private readonly IServiceProvider _services;
 9         private readonly ILogger<RabbitListener> _logger;
10 
11         #endregion
12 
13 
14         #region Ctors
15 
16         public DeadListener(IServiceProvider services, IConfiguration configuration, ILogger<RabbitListener> logger) : base(configuration)
17         {
18             RouteKey = "queue-dead-routing-jd";
19             QueueName = "zzhello_dead_Jd";
20             _logger = logger;
21             _services = services;
22         }
23 
24         #endregion
25 
26 
27         #region Methods
28 
29         protected override bool Process(string message)
30         {
31             var taskMessage = message;
32             if (taskMessage == null)
33             {
34                 // 返回false 的时候回直接驳回此消息,表示处理不了
35                 return false;
36             }
37             try
38             {
39                 using (var scope = _services.CreateScope())
40                 {
41                     var xxxService = scope.ServiceProvider.GetRequiredService<IOrderService>();
42                     //_logger.LogInformation($"开始更新订单状态:UpdateOrderCancel,message:{message}");
43                     //LoggerHelper.Write($"开始更新订单状态:UpdateOrderCancel,message:{message}");
44                     var re= xxxService.UpdateOrderCancel(Guid.Parse(taskMessage)).Result;
45                     //_logger.LogInformation($"结束更新订单状态:UpdateOrderCancel,message:{message},result:{re}");
46                     //LoggerHelper.Write($"结束更新订单状态:UpdateOrderCancel,message:{message},result:{re}");
47                     if (re)
48                     {
49                         return true;
50                     }
51                     else
52                     {
53                         return false;
54                     }
55                 }
56             }
57             catch (Exception ex)
58             {
59                 _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
60                 _logger.LogError(-1, ex, "Process fail");
61                 LoggerHelper.Write($"DeadListener 自动更新订单状态报错,错误提示  :{ex}");
62                 return false;
63             }
64         }
65         #endregion
66     }
 1     public class RabbitListener : IHostedService
 2     {
 3         private readonly IConnection _connection;
 4         private readonly IModel _channel;
 5 
 6         protected RabbitListener(IConfiguration configuration)
 7         {
 8             try
 9             {
10                 var factory = new ConnectionFactory
11                 {
12                     // 这是我这边的配置,自己改成自己用就好
13                     UserName = configuration["RabbitMQConfig:RabbitUserName"],//用户名
14                     Password = configuration["RabbitMQConfig:RabbitPassword"],//密码
15                     HostName = configuration["RabbitMQConfig:RabbitHost"]//rabbitmq ip
16                     //Port = options.Value.RabbitPort,
17                 };
18                 _connection = factory.CreateConnection();
19                 _channel = _connection.CreateModel();
20             }
21             catch (Exception ex)
22             {
23                 Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
24             }
25         }
26 
27         public Task StartAsync(CancellationToken cancellationToken)
28         {
29             Register();
30             return Task.CompletedTask;
31         }
32         protected string RouteKey;
33         protected string QueueName;
34 
35         // 处理消息的方法
36         protected virtual bool Process(string message)
37         {
38             throw new NotImplementedException();
39         }
40 
41         // 注册消费者监听在这里
42         private void Register()
43         {
44             Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
45             // channel.ExchangeDeclare(exchange: "exchange-D", type: "topic");
46             _channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null);
47             _channel.QueueDeclare(QueueName, true, false, false, null);
48             _channel.QueueBind(QueueName, "exchange-D", RouteKey);
49 
50             //启用QoS,每次预取10条,避免消费不过来导致消息堆积在本地缓存
51             _channel.BasicQos(0, 10, false);
52             var consumer = new EventingBasicConsumer(_channel);
53             consumer.Received += (model, ea) =>
54             {
55                 var body = ea.Body;
56                 var message = Encoding.UTF8.GetString(body);
57                 var result = Process(message);
58                 if (result)
59                 {
60                     _channel.BasicAck(ea.DeliveryTag, false);//启用手动ack机制后,没有及时ack导致的队列异常(Unacked过多)
61                 }
62                 else
63                 {
64                     _channel.BasicNack(ea.DeliveryTag, false, true);// 启用nack+重入队 机制后,导致的死循环(Ready过多)
65                 }
66 
67             };
68             _channel.BasicConsume(queue: QueueName, consumer: consumer);
69         }
70 
71         public void DeRegister()
72         {
73             _connection.Close();
74         }
75 
76 
77         public Task StopAsync(CancellationToken cancellationToken)
78         {
79             _connection.Close();
80             return Task.CompletedTask;
81         }
82     }

消息队列常见问题

  在这里我先说一下我遇到的问题吧!不知道什么原因会产生异常消息,也就是业务失败产生的unasked消息,这个问题该如何处理

  处理方式是启用nack+重入队 机制后,但是这种方式会 导致的死循环(Ready过多),所以要启用Qos和ack机制后,没有及时ack导致的队列堵塞

  启用QoS,每次预取5条消息,避免消息处理不过来,全部堆积在本地缓存里

  channel.BasicQos(0, 5, false);

  开启QoS,当RabbitMQ的队列达到5条Unacked消息时,不会再推送消息给Consumer;

  这样问题就解决了!!!!!

  其他常见问题参考     https://www.cnblogs.com/sw008/p/11054331.html

原文地址:https://www.cnblogs.com/zhao987/p/12532295.html