使用redis实现MQ中的消息在生产者-消费者模式下被唯一消费

不论使用Nsq,还是使用RabbitMQ,直接使用消息队列会存在一条消息被多次消费的可能性,也存在生产重复消息的可能性,因此一定要做幂等性处理。

本文仅分析避免被多次消费的场景,对于此场景大家常用的解决方案是为消息生成一个全局唯一ID(简称MGuid),然后再将消息发送到消息队列,消费者根据消息的全局唯一ID状态判断是否进行处理。

解决方案1:使用redis来确保消息队列在生产者-消费者模式下消息被唯一消费

A.设计规则
1.使用redis数据类型hash来管理数据
2.hash类型的key表示topic
3.使用key的字典管理消息ID和消息state
4.key的字典field表示消息ID
5.field的value表示消息state
6.消息状态:0表示初始状态,1表示被第一个消费者锁定,-1表示结果成功,-2表示结果失败,其他数字无效
B.编码流程
1.生产者:为消息生成全局唯一ID
2.生产者:将消息存储到redis指定key的hash集合里,消息状态默认为整数0
3.生产者:将消息发送给MQ
4.消费者:依据自身处理能力,从MQ拉取消息
5.消费者:解析消息得到消息ID和消息有效期限,判断是否有效,无效告知redis删除消息(删除前判断状态是否大于0,不大于0才删除),有效继续
6.消费者:依据消息ID到redis尝试锁定消息获得独有权,即使用increment给消息state+1得到返回值,返回值等于1表示成功继续,不等于1表示失败退出
7.消费者:把消息分配给后台线程处理,自己再去拉取消息
8.线程:执行业务逻辑处理,处理完毕告知redis,修改消息state值,等于-1表示成功,-2表示失败。记录结果到日志系统。
9.生产者:定时从redis查询消息状态,并删除redis中的数据记录,输出结果
C.业务规则
1.大粒度:一个商品表示一个topic
2.小粒度:一个商家的商品表示一个topic,或一个商品的区域买家表示一个topic
3.成功:引导执行下一步,或完毕
4.失败:从日志系统读取原因,通知用户

解决方案2:使用关系数据库事务来确保消息队列在生产者-消费者模式下消息被唯一消费

1.创建数据表MQ_Table
2.整个设计与实现过程与方案1相同,请参考方案1。
3.特别说明:此方案是临时方案,不建议用于生产环境,因为会涉及硬盘IO操作,相比起内存操作的能效,硬盘IO操作不值一提。
 
源码是RabbitMQ与NSQ的Demo:https://github.com/kinbor/RabbitMQAndNSQ
原文地址:https://www.cnblogs.com/Jkinbor/p/12771388.html