Spring-Kafka —— AckMode介绍和手动提交分析总结

前言

本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项,及手动提交分析总结。

AckMode

RECORD
每处理一条commit一次
BATCH(默认) 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
TIME 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
COUNT 累积达到ackCount次的ack去commit
COUNT_TIME ackTime或ackCount哪个条件先满足,就commit
MANUAL listener负责ack,但是背后也是批量上去
MANUAL_IMMEDIATE listner负责ack,每调用一次,就立即commit

Manual Commit

  • 消费端手动提交offset代码如下:
  /**
     * 这是手动提交的消费方式
     * @param record
     * @param ack
     * @throws Exception
     */
    @KafkaListener(topics = TopicConstants.COMMON_PAY,groupId = "写自己的消费组 id")
    public void listenXXXPay(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception {
        String msg = JSONObject.parseObject(record.value(), String.class);
        System.out.println(msg);
        if (new Random().nextInt(100)<50){
            logger.info(String.format("kafka 综合收费消费消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value()));
            ack.acknowledge();
        }        
    }

前提要配置AckMode:

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
  • 接下来问题来了, 如果代码中没有进行ack.acknowledge(),会怎么办呢??

消费者在消费消息的过程中,配置参数设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?

1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。

2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。

3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。

原文地址:https://www.cnblogs.com/caoweixiong/p/11181497.html