SpringKafka——消息监听

前言

Spring-Kafka中消息监听大致分为两种类型,一种是单条数据消费,一种是批量消费;两者的区别只是在于监听器一次性获取消息的数量。
GenericMessageListener是我们实现消息监听的一个接口,向上扩展的接口有非常多,
比如:单数据消费的MessageListener、批量消费的BatchMessageListener、还有具备ACK机制的AcknowledgingMessageListener和BatchAcknowledgingMessageListener等等。
 
 

GenericMessageListener

这里可以看到GenericMessageListener使用注解标明这是一个函数式接口,默认实现了三种不同参数的onMessage方法。
data就是我们需要接收的数据,Consumer则是消费者类,Acknowledgment则是用来实现Ack机制的类。
这里需要注意一下的是,Consumer对象并不是线程安全的。
@FunctionalInterface
public interface GenericMessageListener<T> {
    void onMessage(T var1);

    default void onMessage(T data, Acknowledgment acknowledgment) {
        throw new UnsupportedOperationException("Container should never call this");
    }

    default void onMessage(T data, Consumer<?, ?> consumer) {
        throw new UnsupportedOperationException("Container should never call this");
    }

    default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        throw new UnsupportedOperationException("Container should never call this");
    }
}

接下来先浏览一下继承了GenericMessageListener接口的类。前缀为Batch的接口都是批处理类型的消息监听接口,里面的参数也都讲解过了

public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

public interface AcknowledgingMessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

public interface BatchAcknowledgingMessageListener<K, V> {
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

@KafkaListener参数讲解

@KafkaListener(id = "demo", topics = "topic.quick.demo")
public void listen(String msgData) {
    log.info("demo receive : "+msgData);
}
使用@KafkaListener这个注解并不局限于这个监听容器是单条数据消费还是批量消费,区分单数据还是多数据消费只需要配置一下注解的containerFactory属性即可,先讲解一下这个监听方法都能接收写什么参数吧。
  • data : 对于data值的类型其实并没有限定,根据KafkaTemplate所定义的类型来决定。data为List集合的则是用作批量消费。
  • ConsumerRecord:具体消费数据类,包含Headers信息、分区信息、时间戳等
  • Acknowledgment:用作Ack机制的接口
  • Consumer:消费者类,使用该类我们可以手动提交偏移量、控制消费速率等功能
public void listen1(String data) 

public void listen2(ConsumerRecord<K,V> data) 

public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 

public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) 

public void listen5(List<String> data) 

public void listen6(List<ConsumerRecord<K,V>> data) 

public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 

public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)

接下来在看看@KafkaListener的注解都提供了什么属性。

  • id:消费者的id,当GroupId没有被配置的时候,默认id为GroupId
  • containerFactory:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
  • topics:需要监听的Topic,可监听多个
  • topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听
  • errorHandler:监听异常处理器,配置BeanName
  • groupId:消费组ID
  • idIsGroup:id是否为GroupId
  • clientIdPrefix:消费者Id前缀
  • beanRef:真实监听容器的BeanName,需要在 BeanName前加 "__"
public @interface KafkaListener {
    String id() default "";

    String containerFactory() default "";

    String[] topics() default {};

    String topicPattern() default "";

    TopicPartition[] topicPartitions() default {};

    String containerGroup() default "";

    String errorHandler() default "";

    String groupId() default "";

    boolean idIsGroup() default true;

    String clientIdPrefix() default "";

    String beanRef() default "__listener";
}

使用ConsumerRecord类消费

用ConsumerRecord类接收的好处是什么呢,ConsumerRecord类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用ConsumerRecord会是个不错的选择。

如果使用具体的类型接收消息体则更加方便,比如说用String类型去接收消息体。

这里我们编写一个consumerListener方法,监听"topic.quick.consumer" Topic,并把ConsumerRecord里面所包含的内容打印到控制台中

@Component
public class SingleListener {

    private static final Logger log = LoggerFactory.getLogger(SingleListener.class);

    @KafkaListener(id = "consumer", topics = "topic.quick.consumer")
    public void consumerListener(ConsumerRecord<Integer, String> record) {
        log.info("topic.quick.consumer receive : " + record.toString());
    }
}

批量消费

  • 重新创建一份新的消费者配置,配置为一次拉取5条消息
  • 创建一个监听容器工厂,设置其为批量消费并设置并发量为5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态
  • 创建一个分区数为8的Topic
  • 创建监听方法,设置消费id为batch,clientID前缀为batch,监听topic.quick.batch,使用batchContainerFactory工厂创建该监听容器
@Component
public class BatchListener {

    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //一次拉取消息数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory listenerContainer() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
        //设置并发量,小于或等于Topic的分区数
        container.setConcurrency(5);
        //设置为批量监听
        container.setBatchListener(true);
        return container;
    }

    @Bean
    public NewTopic batchTopic() {
        return new NewTopic("topic.quick.batch", 8, (short) 1);
    }


    @KafkaListener(id = "batch",clientIdPrefix = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory")
    public void batchListener(List<String> data) {
        log.info("topic.quick.batch  receive : ");
        for (String s : data) {
            log.info(  s);
        }
    }

}

注意:设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。

注解方式获取消息头及消息体

当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式,毕竟get方法代码量还是稍多点的。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为List即可,比如List<String> data , List<Integer> key。

  • @Payload:获取的是消息的消息体,也就是发送内容
  • @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
  • @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
  • @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
  • @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳
    @KafkaListener(id = "anno", topics = "topic.quick.anno")
    public void annoListener(@Payload String data,
                             @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        log.info("topic.quick.anno receive : 
"+
            "data : "+data+"
"+
            "key : "+key+"
"+
            "partitionId : "+partition+"
"+
            "topic : "+topic+"
"+
            "timestamp : "+ts+"
"
        );

    }

使用Ack机制确认消费

Kafka的Ack机制相对于RabbitMQ的Ack机制差别比较大,刚入门Kafka的时候我也被搞蒙了,不过能弄清楚Kafka是怎么消费消息的就能理解Kafka的Ack机制了。

我先说说RabbitMQ的Ack机制,RabbitMQ的消费可以说是一次性的,也就是你确认消费后就立刻从硬盘或内存中删除,

而且RabbitMQ粗糙点来说是顺序消费,像排队一样,一个个顺序消费,未被确认的消息则会重新回到队列中,等待监听器再次消费。

但Kafka不同,Kafka是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,

当第一条消息未被确认,而第二条消息被确认的时候,Kafka会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。

使用Kafka的Ack机制比较简单,只需简单的三步即可:

  1. 设置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交
  2. 设置AckMode=MANUAL_IMMEDIATE
  3. 监听方法加入Acknowledgment ack 参数

怎么拒绝消息呢,只要在监听方法中不调用ack.acknowledge()即可

@Component
public class AckListener {

    private static final Logger log= LoggerFactory.getLogger(AckListener.class);

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
        return factory;
    }


    @KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
        log.info("topic.quick.ack receive : " + record.value());
        ack.acknowledge();
    }
}

重复消费未被Ack的消息

在这段章节开头之初我就讲解了Kafka机制会出现的一些情况,导致没办法重复消费未被Ack的消息,解决办法有如下:

1、重新将消息发送到队列中,这种方式比较简单而且可以使用Headers实现第几次消费的功能,用以下次判断

    @KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) {
        log.info("topic.quick.ack receive : " + record.value());

        //如果偏移量为偶数则确认消费,否则拒绝消费
        if (record.offset() % 2 == 0) {
            log.info(record.offset()+"--ack");
            ack.acknowledge();
        } else {
            log.info(record.offset()+"--nack");
            kafkaTemplate.send("topic.quick.ack", record.value());
        }
    }

2、使用Consumer.seek方法,重新回到该未ack消息偏移量的位置重新消费,这种可能会导致死循环,原因出现于业务一直没办法处理这条数据,但还是不停的重新定位到该数据的偏移量上。

    @KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) {
        log.info("topic.quick.ack receive : " + record.value());

        //如果偏移量为偶数则确认消费,否则拒绝消费
        if (record.offset() % 2 == 0) {
            log.info(record.offset()+"--ack");
            ack.acknowledge();
        } else {
            log.info(record.offset()+"--nack");
            consumer.seek(new TopicPartition("topic.quick.ack",record.partition()),record.offset() );
        }
    }


引用:

https://www.jianshu.com/p/a64defb44a23

https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#message-listeners

原文地址:https://www.cnblogs.com/caoweixiong/p/12988237.html