【RocketMQ】RocketMQ生产者和消费者端的幂等性怎么保证?

RocketMQ 幂等性主要分为生产端和消费端幂等性

备注:这里只讨论生产者 和消费者集群部署下的情况

生产者端幂等性保证:

  1RocketMQ 为消息生产者提供了消息查询的API,在消息发送之前,可以查询该条消息是否发送过,注意但是该方法在2020年5月之后的版本,已经被废掉了;

  eg:

public class DefaultMQProducer extends ClientConfig implements MQProducer { 
    @Deprecated
    @Override
    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
        throws MQClientException, InterruptedException {
        return this.defaultMQProducerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
    }
}

  2:引入第三方存储,例如Redis,在消息发送之后,将发送记录存储在Redis中,下次发送消息之前,在Redis中查询消息是否发送过

消费端幂等性保证:(处理必须唯一) 无论这个业务请求被(consumer)执行多少次,我们的数据库的结果都是唯一的,不可变的。

  1.去重策略:去重表机制,业务拼接去重策略(比如唯一流水号)

  2.建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突。

    高并发下去重:采用Redis去重(key天然支持原子性并要求不可重复),但是由于不在一个事务,要求有适当的补偿策略,但是对于很重要的业务,不应该支持补偿

  3..利用redis事务,主键(我们必须把全量的操作数据都存放在redis里,然后定时去和数据库做数据同步)—-即消费处理后,该处理本来应该保存在数据库的,先保存在redis,再通过一定业务方式从redis中取数据进行db持久化

  4.利用redis和关系型数据库一起做去重机制

  5.拿到这个消息做redis的set的操作.redis就是天然幂等性 

  6.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

  eg: redis唯一key保证幂等性代码如下:

  String idempotentValue = RedisUtil.get(RedisConstant.IDEMPOTENT.concat(msgId), String.class);
   if (!StringUtils.isEmpty(idempotentValue)) {
       log.info(">>>>>>>>>>>>>>>>>该消息已经被消费:【{}】", msgBody);
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
//业务代码.............
 //幂等处理
RedisUtil.setEx(RedisConstant.IDEMPOTENT.concat(msgId), "1", 10, TimeUnit.DAYS);

参考:https://www.cnblogs.com/chx9832/p/12325871.html

原文地址:https://www.cnblogs.com/july-sunny/p/14983312.html