RocketMQ消费者-重复消费的问题解决

重复消费的问题的一个可能的问题:消费者消费消息时产生了异常,并没有返回CONSUME_SUCCESS标志。

重复消费的消息和第一次消费的消息不同,多了一些重复消费的信息:
reconsumeTimes=1,2,…10
REAL_TOPIC也会是:%RETRY%XXXXX
这就是因为消息处理异常导致的消息重新消费,无路时重启服务端,还是通过mqadmin删除都没用,RocketMQ可以很好的保持消息,一定要消费成功才可以!


官方对comsumerMessage方法的实现建议是:

It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure

无论如何,都不要抛出异常,如果需要重新消费,可以返回RECONSUME_LATER主动要求重新消费。

要加入catch Exception根据异常来捕获业务处理的异常。

 1 consumer.registerMessageListener(new MessageListenerConcurrently() {
 2                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
 3                     ConsumeConcurrentlyContext context) {
 4                     logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
 5                     MessagePack msgpack = new MessagePack();
 6                     for (MessageExt msg : msgs){
 7                         byte[] data = msg.getBody();
 8                         try {
 9                             RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);
10                             logger.debug("Receive a message:" + rtmsg);
11                             anlysisRTMsgPack(rtmsg, engine);
12                         } catch (IOException e) {
13                             logger.error("Unpack RTMsg:", e);
14                         } catch (Exception e1){
15                             logger.warn("Unexcepted exception.", e1);
16                         }
17                     }
18                     logger.debug("RETURN CONSUME SUCCESS.");
19                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
20                 }
21             });

 

原文地址:https://www.cnblogs.com/penphy/p/12581288.html