Kafka学习笔记(五、Kafka偏移量)

目录:

  • MetaData信息
  • Kafka偏移量
  • 客户端负载均衡

MetaData信息

客户端如何知道该往哪个节点发送请求来获取数据:通过元数据。

元数据(MetaData)是什么:topic、topic的分区、每个分区有哪些副本、哪个副本是leader等信息。

一般情况下客户端会缓存元数据,并直接往目标broker上发送生产和获取请求,并且客户端还会定时的刷新自己的元数据

Kafka偏移量

1、Kafka GUI

说偏移量之前先介绍下Kafka GUI(Kafka graphical user interface),因官方没有提供,所以我们使用社区较为活跃的工具。

  • Kafka Tool地址: http://www.kafkatool.com/download.html
  • Kafka Manager地址: https://github.com/yahoo/kafka-manager
  • KafkaOffsetMonitor地址: https://github.com/Morningstar/kafka-offset-monitor

KafkaOffsetMonitor配置:

KafkaOffsetMonitor启动脚本(如: kafkaoffset_monitor.sh,记得给脚本赋执行权限)

java -cp KafkaOffsetMonitor-assembly 0.46-SNAPSHOT.jar 
         com.quantifind.kafka.offsetapp.OffsetGetterWeb
--offsetStorage kafka 
--kafkaBrokers ip1:port1,ip2:port2,ip3:port3 
--zk ip1:port1,ip2:port2,ip3:port3 
--port 8088
--refresh 10.seconds 
--retain 2.days
  • --offsetStorage:指明offset信息由kafka来保存,而非zookeeper
  • --refresh:多少秒刷新一次信息
  • --retain:信息保存到数据库多少天

——————————————————————————————————————————————————————

2、消费指定偏移量的消息

// 指定分区信息
consumer.assign(Collections.singletonList(new TopicPartition("Topic-02", 0)));
// 从头开始消费消息
consumer.seekToBeginning(Collections.singletonList(new TopicPartition("Topic-02", 0)));
// 按照指定的偏移量消费消息
consumer.seek(new TopicPartition("Topic-05", 1), 9);

——————————————————————————————————————————————————————

客户端负载均衡

消费者发生变化(加入新的消费者或原有消费者宕机)或topic发生变化时会出现再均衡现象(分区的所有权从一个消费者转到另一个消费者)。

再均衡现象会导致消息的重复处理丢失

  • 当提交的偏移量小于客户端处理的偏移量时重复处理消息。
  • 当提交的偏移量大于客户端处理的偏移量时会丢失消息。

——————————————————————————————————————————————————————

为了解决这一问题Kafka提供了再均衡监听器:ConsumerRebalanceListener

private static class CustomerRebalancer implements ConsumerRebalanceListener {
    /**
     * 再均衡开始之前和消费者停止读取消息之后被调用
     * 如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 如果发生再均衡,我们要在即将失去分区所有权时提交偏移量
        // 要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量
        System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }

    /**
     * 在重新分配分区之后和新的消费者开始读取消息之前被调用
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        long committedOffset = -1;
        for (TopicPartition topicPartition : partitions) {
            // 获取该分区已经消费的偏移量
            committedOffset = consumer.committed(topicPartition).offset();
            // 重置偏移量到上一次提交的偏移量的下一个位置处开始消费
            consumer.seek(topicPartition, committedOffset + 1);
        }
    }
}
原文地址:https://www.cnblogs.com/bzfsdr/p/12221623.html