目录:
- MetaData信息
- Kafka偏移量
- 客户端负载均衡
MetaData信息
客户端如何知道该往哪个节点发送请求来获取数据:通过元数据。
元数据(MetaData)是什么:topic、topic的分区、每个分区有哪些副本、哪个副本是leader等信息。
一般情况下客户端会缓存元数据,并直接往目标broker上发送生产和获取请求,并且客户端还会定时的刷新自己的元数据。
Kafka偏移量
1、Kafka GUI
说偏移量之前先介绍下Kafka GUI(Kafka graphical user interface),因官方没有提供,所以我们使用社区较为活跃的工具。
- Kafka Tool地址: http://www.kafkatool.com/download.html
- Kafka Manager地址: https://github.com/yahoo/kafka-manager
- KafkaOffsetMonitor地址: https://github.com/Morningstar/kafka-offset-monitor
KafkaOffsetMonitor配置:
KafkaOffsetMonitor启动脚本(如: kafkaoffset_monitor.sh,记得给脚本赋执行权限)
java -cp KafkaOffsetMonitor-assembly 0.46-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --kafkaBrokers ip1:port1,ip2:port2,ip3:port3 --zk ip1:port1,ip2:port2,ip3:port3 --port 8088 --refresh 10.seconds --retain 2.days
- --offsetStorage:指明offset信息由kafka来保存,而非zookeeper
- --refresh:多少秒刷新一次信息
- --retain:信息保存到数据库多少天
——————————————————————————————————————————————————————
2、消费指定偏移量的消息
// 指定分区信息 consumer.assign(Collections.singletonList(new TopicPartition("Topic-02", 0))); // 从头开始消费消息 consumer.seekToBeginning(Collections.singletonList(new TopicPartition("Topic-02", 0))); // 按照指定的偏移量消费消息 consumer.seek(new TopicPartition("Topic-05", 1), 9);
——————————————————————————————————————————————————————
客户端负载均衡
当消费者发生变化(加入新的消费者或原有消费者宕机)或topic发生变化时会出现再均衡现象(分区的所有权从一个消费者转到另一个消费者)。
再均衡现象会导致消息的重复处理和丢失。
- 当提交的偏移量小于客户端处理的偏移量时重复处理消息。
- 当提交的偏移量大于客户端处理的偏移量时会丢失消息。
——————————————————————————————————————————————————————
为了解决这一问题Kafka提供了再均衡监听器:ConsumerRebalanceListener
private static class CustomerRebalancer implements ConsumerRebalanceListener { /** * 再均衡开始之前和消费者停止读取消息之后被调用 * 如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了 */ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 如果发生再均衡,我们要在即将失去分区所有权时提交偏移量 // 要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量 System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); } /** * 在重新分配分区之后和新的消费者开始读取消息之前被调用 */ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { long committedOffset = -1; for (TopicPartition topicPartition : partitions) { // 获取该分区已经消费的偏移量 committedOffset = consumer.committed(topicPartition).offset(); // 重置偏移量到上一次提交的偏移量的下一个位置处开始消费 consumer.seek(topicPartition, committedOffset + 1); } } }