kafka学习笔记(三)kafka的使用技巧

概述

上一篇随笔主要介绍了kafka的基本使用包括集群参数,生产者基本使用,consumer基本使用,现在来介绍一下kafka的使用技巧。

分区机制

我们在使用 Apache Kafka 生产和消费消息的时候,肯定是希望能够将数据均匀地分配到所有服务器上。比如很多公司使用 Kafka 收集应用服务器的日志数据,这种数据都是很多的,特别是对于那种大批量机器组成的集群环境,每分钟产生的日志量都能以 GB 数,因此如何将这么大的数据量均匀地分配到 Kafka 的各个 Broker 上,就成为一个非常重要的问题。Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。官网上的这张图非常清晰地展示了 Kafka 的三级结构,如下所示:

现在我抛出一个问题你可以先思考一下:你觉得为什么 Kafka 要做这样的设计?为什么使用分区的概念而不是直接使用多个主题呢?其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

分区策略

下面我们说说 Kafka 生产者的分区策略。所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

1 int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种,下面我来详细介绍一下。

轮训策略:

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

随机策略:

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

按消息建保存策略:

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。

其他分区策略:

上面这几种分区策略都是比较基础的策略,除此之外你还能想到哪些有实际用途的分区策略?其实还有一种比较常见的,即所谓的基于地理位置的分区策略。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。

压缩算法

说起压缩(compression),我相信你一定不会感到陌生。它秉承了用时间去换空间的经典 trade-off 思想,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。今天我就来跟你分享一下 Kafka 中压缩的那些事儿。

怎么压缩

 Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

那么社区引入 V2 版本的目的是什么呢?V2 版本主要是针对 V1 版本的一些弊端做了修正,和我们今天讨论的主题相关的修正有哪些呢?先介绍一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。

我来举个例子。原来在 V1 版本中,每条消息都需要执行 CRC 校验,但有些情况下消息的 CRC 值是会发生变化的。比如在 Broker 端可能会对消息时间戳字段进行更新,那么重新计算之后的 CRC 值也会相应更新;再比如 Broker 端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来 CRC 值的变化。鉴于这些情况,再对每条消息都执行 CRC 校验就有点没必要了,不仅浪费空间还耽误 CPU 时间,因此在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。

V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。

何时压缩

 在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。

生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:

1  Properties props = new Properties();
2  props.put("bootstrap.servers", "localhost:9092");
3  props.put("acks", "all");
4  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
5  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
6  // 开启GZIP压缩
7  props.put("compression.type", "gzip");
8  
9  Producer<String, String> producer = new KafkaProducer<>(props);

在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。

情况一:Broker 端指定了和 Producer 端不同的压缩算法。

你看,这种情况下 Broker 接收到 GZIP 压缩消息后,只能解压缩然后使用 Snappy 重新压缩一遍。如果你翻开 Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type,和上面那个例子中的同名。但是这个参数的默认值是 producer,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。

情况二:Broker 端发生了消息格式转换。

所谓的消息格式转换主要是为了兼容老版本的消费者程序。还记得之前说过的 V1、V2 版本吧?在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。

何时解压缩

有压缩必有解压缩!通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。

那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。除了在 Consumer 端解压缩,Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。我们必须承认这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言。

无消息丢失配置

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署 Kafka Broker 服务器了。现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说 Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。总结一下,Kafka 是能做到不丢失消息的,只不过这些消息必须是已提交的消息,而且还要满足一定的条件。当然,说明这件事并不是要为 Kafka 推卸责任,而是为了在出现该类问题时我们能够明确责任边界。

生产端丢失消息

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。

实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的 callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

消费者丢失消息

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。

比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。

如果Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。这里的关键在于 Consumer 自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。在这里我要提醒你一下,单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。

最佳实践

看完这两个案例之后,我来分享一下 Kafka 无消息丢失的配置,每一个其实都能对应上面提到的问题。

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。

设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。

确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

拦截器

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息。

当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。拿上面的例子来说,假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:

1 Properties props = new Properties();
2 List<String> interceptors = new ArrayList<>();
3 interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
4 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
5 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
6 ……

现在问题来了,我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?其实很简单,这两个类以及你自己编写的所有 Producer 端拦截器实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法。

onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。

onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。

同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。

onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。

onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

一定要注意的是,指定拦截器类时要指定它们的全限定名,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。

典型使用场景

Kafka 拦截器都能用在哪些地方呢?其实,跟很多拦截器的用法相同,Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。

今天 Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。从技术上来说,我们可以在客户端程序中增加这样的统计逻辑,但是对于那些将 Kafka 作为企业级基础架构的公司来说,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是 Kafka 拦截器的一个非常典型的使用场景。我们再来看看消息审计(message audit)的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。

kafka如何管理TCP连接

Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他协议。无论是生产者、消费者,还是 Broker 之间的通信都是如此。你可能会问,为什么 Kafka 不使用 HTTP 作为底层的通信协议呢?其实这里面的原因有很多,但最主要的原因在于 TCP 和 HTTP 之间的区别。从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。

所谓的多路复用请求,即 multiplexing request,是指将两个或多个数据流合并到底层单一物理连接中的过程。TCP 的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。其实严格来说,TCP 并不能多路复用,它只是提供可靠的消息交付语义保证,比如自动重传丢失的报文。

更严谨地说,作为一个基于报文的协议,TCP 能够被用于多路复用连接场景的前提是,上层的应用协议(比如 HTTP)允许发送多条消息。不过,我们今天并不是要详细讨论 TCP 原理,因此你只需要知道这是社区采用 TCP 的理由之一就行了。除了 TCP 提供的这些高级功能有可能被 Kafka 客户端的开发人员使用之外,社区还发现,目前已知的 HTTP 库在很多编程语言中都略显简陋。基于这两个原因,Kafka 社区决定采用 TCP 协议作为所有请求通信的底层协议。

Java生产者程序管理tcp

Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。

第 1 步:构造生产者对象所需的参数对象。

第 2 步:利用第 1 步的参数对象,创建 KafkaProducer 对象实例。

第 3 步:使用 KafkaProducer 的 send 方法发送消息。

第 4 步:调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。

上面这 4 步写成 Java 代码的话大概是这个样子:

1 Properties props = new Properties ();
2 props.put(“参数1”, “参数1的值”);
3 props.put(“参数2”, “参数2的值”);
4 ……
5 try (Producer<String, String> producer = new KafkaProducer<>(props)) {
6             producer.send(new ProducerRecord<String, String>(……), callback);
7   ……
8 }

这段代码使用了 Java 7 提供的 try-with-resource 特性,所以并没有显式调用 producer.close() 方法。无论是否显式调用 close 方法,所有生产者程序大致都是这个路数。现在问题来了,当我们开发一个 Producer 应用时,生产者会向 Kafka 集群中指定的主题(Topic)发送消息,这必然涉及与 Kafka Broker 创建 TCP 连接。那么,Kafka 的 Producer 客户端是如何管理这些 TCP 连接的呢?

要回答上面这个问题,我们首先要弄明白生产者代码是什么时候创建 TCP 连接的。

首先,生产者应用在创建 KafkaProducer 实例时是会建立与 Broker 的 TCP 连接的。其实这种表述也不是很准确,应该这样说:在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。我截取了一段测试环境中的日志来说明这一点:

 你也许会问:怎么可能是这样?如果不调用 send 方法,这个 Producer 都不知道给哪个主题发消息,它又怎么能知道连接哪个 Broker 呢?难不成它会连接 bootstrap.servers 参数指定的所有 Broker 吗?嗯,是的,Java Producer 目前还真是这样设计的。我在这里稍微解释一下 bootstrap.servers 参数。它是 Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。请注意,这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,我并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3~4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。

针对 TCP 连接何时创建的问题,目前我们的结论是这样的:TCP 连接是在创建 KafkaProducer 实例时建立的。那么,我们想问的是,它只会在这个时候被创建吗?当然不是!TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。为什么说是可能?因为这两个地方并非总是创建 TCP 连接。当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。

接下来,我们来看看 Producer 更新集群元数据信息的两个场景。

场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。

场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。

说完了 TCP 连接的创建,我们来说说它们何时被关闭。Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。

我们先说第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

Java消费者程序管理tcp

我们先从消费者创建 TCP 连接开始讨论。消费者端主要的程序入口是 KafkaConsumer 类。和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,也就是说,当你执行完 new KafkaConsumer(properties) 语句后,你会发现,没有 Socket 连接被创建出来。这一点和 Java 生产者是有区别的,主要原因就是生产者入口类 KafkaProducer 在构建实例的时候,会在后台默默地启动一个 Sender 线程,这个 Sender 线程负责 Socket 连接的创建。从这一点上来看,我个人认为 KafkaConsumer 的设计比 KafkaProducer 要好。就像我在第 13 讲中所说的,在 Java 构造函数中启动线程,会造成 this 指针的逃逸,这始终是一个隐患。如果 Socket 不是在构造函数中创建的,那么是在 KafkaConsumer.subscribe 或 KafkaConsumer.assign 方法中创建的吗?严格来说也不是。我还是直接给出答案吧:TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。

1.发起 FindCoordinator 请求时。

还记得消费者端有个组件叫协调者(Coordinator)吗?它驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。不过,消费者应该向哪个 Broker 发送这类请求呢?理论上任何一个 Broker 都能回答这个问题,也就是说消费者可以发送 FindCoordinator 请求给集群中的任意服务器。在这个问题上,社区做了一点点优化:消费者程序会向集群中当前负载最小的那台 Broker 发送请求。负载是如何评估的呢?其实很简单,就是看消费者连接的所有 Broker 中,谁的待发送请求最少。当然了,这种评估显然是消费者端的单向评估,并非是站在全局角度,因此有的时候也不一定是最优解。不过这不并影响我们的讨论。总之,在这一步,消费者会创建一个 Socket 连接。

2.连接协调者时。

Broker 处理完上一步发送的 FindCoordinator 请求之后,会返还对应的响应结果(Response),显式地告诉消费者哪个 Broker 是真正的协调者,因此在这一步,消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。

3.消费数据时。

消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。举个例子,假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接。

通常来说,消费者程序会创建 3 类 TCP 连接:确定协调者和获取集群元数据。连接协调者,令其执行组成员管理操作。执行实际的消息获取。

和生产者类似,消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭。主动关闭是指你显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9;而 Kafka 自动关闭是由消费者端参数 connection.max.idle.ms 控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。

不过,和生产者有些不同的是,如果在编写消费者程序时,你使用了循环的方式来调用 poll 方法消费消息,那么上面提到的所有请求都会被定期发送到 Broker,因此这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。针对上面提到的三类 TCP 连接,你需要注意的是,当第三类 TCP 连接成功创建后,消费者程序就会废弃第一类 TCP 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。也就是说,最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在。

从理论上说,Kafka Java 消费者管理 TCP 资源的机制我已经说清楚了,但如果仔细推敲这里面的设计原理,还是会发现一些问题。我们刚刚讲过,第一类 TCP 连接仅仅是为了首次获取元数据而创建的,后面就会被废弃掉。最根本的原因是,消费者在启动时还不知道 Kafka 集群的信息,只能使用一个“假”的 ID 去注册,即使消费者获取了真实的 Broker ID,它依旧无法区分这个“假”ID 对应的是哪台 Broker,因此也就无法重用这个 Socket 连接,只能再重新创建一个新的连接。为什么会出现这种情况呢?主要是因为目前 Kafka 仅仅使用 ID 这一个维度的数据来表征 Socket 连接信息。这点信息明显不足以确定连接的是哪台 Broker,也许在未来,社区应该考虑使用 < 主机名、端口、ID> 三元组的方式来定位 Socket 资源,这样或许能够让消费者程序少创建一些 TCP 连接。

幂等生产者和事务生产者

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

最多一次(at most once):消息可能会丢失,但绝不会被重复发送。

至少一次(at least once):消息不会丢失,但有可能被重复发送。

精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。我们说过消息“已提交”的含义,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。它们分别是什么机制?两者是一回事吗?要回答这些问题,我们首先来说说什么是幂等性。

幂等性

“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。我来举几个简单的例子说明一下。比如在乘法运算中,让数字乘以 1 就是一个幂等操作,因为不管你执行多少次这样的运算,结果都是相同的。再比如,取整函数(floor 和 ceiling)是幂等函数,那么运行 1 次 floor(3.4) 和 100 次 floor(3.4),结果是一样的,都是 3。相反地,让一个数加 1 这个操作就不是幂等的,因为执行一次和执行多次的结果必然不同。

在计算机领域中,幂等性的含义稍微有一些不同:在命令式编程语言(比如 C)中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。在函数式编程语言(比如 Scala 或 Haskell)中,很多纯函数(pure function)天然就是幂等的,它们不执行任何的 side effect。

幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

看上去,幂等性 Producer 的功能很酷,使用起来也很简单,仅仅设置一个参数就能保证消息不重复了,但实际上,我们必须要了解幂等性 Producer 的作用范围。首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

事务

Kafka 的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。

 当然,在实际场景中各家数据库对 ACID 的实现各不相同。特别是 ACID 本身就是一个有歧义的概念,比如对隔离性的理解。大体来看,隔离性非常自然和必要,但是具体到实现细节就显得不那么精确了。通常来说,隔离性表明并发执行的事务彼此相互隔离,互不影响。经典的数据库教科书把隔离性称为可串行化 (serializability),即每个事务都假装它是整个数据库中唯一的事务。

提到隔离级别,这种歧义或混乱就更加明显了。很多数据库厂商对于隔离级别的实现都有自己不同的理解,比如有的数据库提供 Snapshot 隔离级别,而在另外一些数据库中,它们被称为可重复读(repeatable read)。好在对于已提交读(read committed)隔离级别的提法,各大主流数据库厂商都比较统一。所谓的 read committed,指的是当读取数据库时,你只能看到已提交的数据,即无脏读。同时,当写入数据库时,你也只能覆盖掉已提交的数据,即无脏写。Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。下面我们就来看看 Kafka 中的事务型 Producer。

事务型Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。设置事务型 Producer 的方法也很简单,满足两个要求即可:和幂等性 Producer 一样,开启 enable.idempotence = true。设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:

1 producer.initTransactions();
2 try {
3             producer.beginTransaction();
4             producer.send(record1);
5             producer.send(record2);
6             producer.commitTransaction();
7 } catch (KafkaException e) {
8             producer.abortTransaction();
9 }

这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

消费者组

消费者组,即 Consumer Group,应该算是 Kafka 比较有亮点的设计了。那么何谓 Consumer Group 呢?用一句话概括就是:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。个人认为,理解 Consumer Group 记住下面这三个特性就好了。

Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。

Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。

Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。

Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。再加上 Broker 端的消息留存机制,Kafka 的 Consumer Group 完美地规避了上面提到的伸缩性差的问题。可以这么说,Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

在了解了 Consumer Group 以及它的设计亮点之后,你可能会有这样的疑问:在实际使用场景中,我怎么知道一个 Group 下该有多少个 Consumer 实例呢?理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。举个简单的例子,假设一个 Consumer Group 订阅了 3 个主题,分别是 A、B、C,它们的分区数依次是 1、2、3,那么通常情况下,为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。你可能会问,我能设置小于或大于 6 的实例吗?当然可以!如果你有 3 个实例,那么平均下来每个实例大约消费 2 个分区(6 / 3 = 2);如果你设置了 8 个实例,那么很遗憾,有 2 个实例(8 – 6 = 2)将不会被分配任何分区,它们永远处于空闲状态。因此,在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例。设置多余的实例只会浪费资源,而没有任何好处。

Kafka 有新旧客户端 API 之分,那自然也就有新旧 Consumer 之分。老版本的 Consumer 也有消费者组的概念,它和我们目前讨论的 Consumer Group 在使用感上并没有太多的不同,只是它管理位移的方式和新版本是不一样的。老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。

不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。于是,在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是让人既爱又恨的 __consumer_offsets。我会在专栏后面的内容中专门介绍这个神秘的主题。不过,现在你需要记住新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中。最后,我们来说说 Consumer Group 端大名鼎鼎的重平衡,也就是所谓的 Rebalance 过程。我形容其为“大名鼎鼎”,从某种程度上来说其实也是“臭名昭著”,因为有关它的 bug 真可谓是此起彼伏,从未间断。这里我先卖个关子,后面我会解释它“遭人恨”的地方。我们先来了解一下什么是 Rebalance。

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。

组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。

订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。

订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

下面来说说reblance“遭人恨”的地方,首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。最后,Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。

避免消费者组平衡

具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。

第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

好了,我们说回 Rebalance。既然我们今天要讨论的是如何避免 Rebalance,那就说明 Rebalance 这个东西不好,或者说至少有一些弊端需要我们去规避。那么,Rebalance 的弊端是什么呢?总结起来有以下 3 点:

Rebalance 影响 Consumer 端 TPS。总之就是,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。

Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点。还记得那个国外用户的例子吧?他的 Group 下有几百个 Consumer 实例,Rebalance 一次要几个小时。在那种场景下,Consumer Group 的 Rebalance 已经完全失控了。

Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。

关于第 3 点,我们来举个简单的例子。比如一个 Group 下有 10 个成员,每个成员平均消费 5 个分区。假设现在有一个成员退出了,此时就需要开启新一轮的 Rebalance,把这个成员之前负责的 5 个分区“转移”给其他成员。显然,比较好的做法是维持当前 9 个成员消费分区的方案不变,然后将 5 个分区随机分配给这 9 个成员,这样能最大限度地减少 Rebalance 对剩余 Consumer 成员的冲击。遗憾的是,目前 Kafka 并不是这样设计的。在默认情况下,每次 Rebalance 时,之前的分配方案都不会被保留。就拿刚刚这个例子来说,当 Rebalance 开始时,Group 会打散这 50 个分区(10 个成员 * 5 个分区),由当前存活的 9 个成员重新分配它们。显然这不是效率很高的做法。基于这个原因,社区于 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。不过有些遗憾的是,这个策略目前还有一些 bug,而且需要升级到 0.11.0.0 才能使用,因此在实际生产环境中用得还不是很多。

就我个人经验而言,在真实的业务场景中,很多 Rebalance 都是计划外的或者说是不必要的。我们应用的 TPS 大多是被这类 Rebalance 拖慢的,因此避免这类 Rebalance 就显得很有必要了。下面我们就来说说如何避免 Rebalance。要避免 Rebalance,还是要从 Rebalance 发生的时机入手。

我们在前面说过,Rebalance 发生的时机有三个:

组成员数量发生变化

订阅主题数量发生变化

订阅主题的分区数发生变化

Consumer 程序时,实际上就向这个 Group 添加了一个新的 Consumer 实例。此时,Coordinator 会接纳这个新实例,将其加入到组中,并重新分配分区。通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要 Rebalance”。我们更在意的是 Group 下实例数减少这件事。如果你就是要停掉某些 Consumer 实例,那自不必说,关键是在某些情况下,Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance,我们就不能不管了。Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?这个绝对是需要好好讨论的话题,我们来详细说说。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timeout.ms 决定了 Consumer 存活性的时间间隔。

除了这个参数,Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

kafka管理位移的主题

__consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。

但是,ZooKeeper 其实并不适用于这种高频的写操作,因此,Kafka 社区自 0.8.2.x 版本开始,就在酝酿修改这种设计,并最终在新版本 Consumer 中正式推出了全新的位移管理机制,自然也包括这个新的位移主题。新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。你千万不要自己写个 Producer 随意向该主题发送消息。你可能会好奇,这个主题存的到底是什么格式的消息呢?所谓的消息格式,你可以简单地理解为是一个 KV 对。Key 和 Value 分别表示消息的键值和消息体,在 Kafka 中它们就是字节数组而已。

位移主题的 Key 中应该保存 3 部分内容:<GROUP ID,主题名,分区号>。接下来,我们再来看看消息体的设计。也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。

位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。创建位移主题当然是为了用的,那么什么地方会用到位移主题呢?我们前面一直在说 Kafka Consumer 提交位移时会写入该主题,那 Consumer 是怎么提交位移的呢?目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。国内很多文献都将其翻译成压缩,我个人是有一点保留意见的。在英语中,压缩的专有术语是 Compression,它的原理和 Compaction 很不相同,我更倾向于翻译成压实,或干脆采用 JVM 垃圾回收中的术语:整理。

不管怎么翻译,Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

consumer位移提交

Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。这可能和你以前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移。提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

这一点特别关键。因为位移提交非常灵活,你完全可以提交任何位移值,但由此产生的后果你也要一并承担。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲,位移介于 11~19 之间的消息是有可能丢失的;相反地,如果你提交的位移值是 5,那么位移介于 5~9 之间的消息就有可能被重复消费。所以,我想再强调一下,位移提交的语义保障是由你来负责的,Kafka 只会“无脑”地接受你提交的位移。你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,Kafka,特别是 KafkaConsumer API,提供了多种提交位移的方法。从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

我们先来说说自动提交和手动提交。所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事;而手动提交,则是指你要自己提交位移,Kafka Consumer 压根不管。开启自动提交位移的方法很简单。Consumer 端有个参数 enable.auto.commit,把它设置为 true 或者压根不设置它就可以了。因为它的默认值就是 true,即 Java Consumer 默认就是自动提交位移的。如果启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。为了把这个问题说清楚,我给出了完整的 Java 代码。这段代码展示了设置自动提交位移的方法。有了这段代码做基础,今天后面的讲解我就不再展示完整的代码了。

 1 Properties props = new Properties();
 2      props.put("bootstrap.servers", "localhost:9092");
 3      props.put("group.id", "test");
 4      props.put("enable.auto.commit", "true");
 5      props.put("auto.commit.interval.ms", "2000");
 6      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 7      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 8      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 9      consumer.subscribe(Arrays.asList("foo", "bar"));
10      while (true) {
11          ConsumerRecords<String, String> records = consumer.poll(100);
12          for (ConsumerRecord<String, String> record : records)
13              System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
14      }

上面的第 3、第 4 行代码,就是开启自动提交位移的方法。总体来说,还是很简单的吧。和自动提交相反的,就是手动提交了。开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是,仅仅设置它为 false 还不够,因为你只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。下面这段代码展示了 commitSync() 的使用方法:

 1 while (true) {
 2             ConsumerRecords<String, String> records =
 3                         consumer.poll(Duration.ofSeconds(1));
 4             process(records); // 处理消息
 5             try {
 6                         consumer.commitSync();
 7             } catch (CommitFailedException e) {
 8                         handle(e); // 处理提交失败异常
 9             }
10 }

可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况。那么你可能会问,自动提交位移就不会出现消费数据丢失的情况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,我们必须要深入地了解一下自动提交位移的顺序。一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。反观手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。

鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法:

while (true) {
            ConsumerRecords<String, String> records = 
  consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            consumer.commitAsync((offsets, exception) -> {
  if (exception != null)
  handle(exception);
  });
}

commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。

我们不希望程序总处于阻塞状态,影响 TPS。

我们来看一下下面这段代码,它展示的是如何将两个 API 方法结合使用进行手动提交。

 1    try {
 2            while(true) {
 3                         ConsumerRecords<String, String> records = 
 4                                     consumer.poll(Duration.ofSeconds(1));
 5                         process(records); // 处理消息
 6                         commitAysnc(); // 使用异步提交规避阻塞
 7             }
 8 } catch(Exception e) {
 9             handle(e); // 处理异常
10 } finally {
11             try {
12                         consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
13   } finally {
14        consumer.close();
15 }
16 }

CommitFailedException异常

我相信用过 Kafka Java Consumer 客户端 API 的你一定不会感到陌生。所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的。

我们就来讨论下该异常是什么时候被抛出的。从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。从使用场景来说,有两种典型的场景可能遭遇该异常。

场景一

我们先说说最常见的场景。当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。这是该异常最“正宗”的登场方式。你只需要写一个 Consumer 程序,使用 KafkaConsumer.subscribe 方法随意订阅一个主题,之后设置 Consumer 端参数 max.poll.interval.ms=5 秒,最后在循环调用 KafkaConsumer.poll 方法之间,插入 Thread.sleep(6000) 和手动提交位移,就可以成功复现这个异常了。

如果要防止这种场景下抛出异常,你需要简化你的消息处理逻辑。具体来说有 4 种方法。

1,缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。

2,增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。

3,减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。

4,下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。在专栏后面的内容中,我将着重和你讨论一下多线程消费的实现方案。

除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。还拿刚才的例子来说,你可以设置 max.poll.records 值为 150,甚至更少,这样每批消息的总消费时长不会超过 300 秒(150*2=300),即 max.poll.interval.ms 的默认值 5 分钟。这种减少 max.poll.records 值的做法就属于上面提到的方法 3。

场景二

Kafka Java Consumer 端还提供了一个名为 Standalone Consumer 的独立消费者。它没有消费者组的概念,每个消费者实例都是独立工作的,彼此之间毫无联系。不过,你需要注意的是,独立消费者的位移提交机制和消费者组是一样的,因此独立消费者的位移提交也必须遵守之前说的那些规定,比如独立消费者也要指定 group.id 参数才能提交位移。你可能会觉得奇怪,既然是独立消费者,为什么还要指定 group.id 呢?没办法,谁让社区就是这么设计的呢?总之,消费者组和独立消费者在使用之前都要指定 group.id。

现在问题来了,如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。

消费者消费进度监控

对于 Kafka 消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度。这个滞后程度有个专门的名称:消费者 Lag 或 Consumer Lag。所谓滞后程度,就是指消费者当前落后于生产者的程度。比方说,Kafka 生产者向某主题成功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。通常来说,Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。

既然消费进度这么重要,我们应该怎么监控它呢?简单来说,有 3 种方法。

使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。

使用 Kafka Java Consumer API 编程。

使用 Kafka 自带的 JMX 监控指标。

接下来,我们分别来讨论下这 3 种方法。

kafka自带命令

我们先来了解下第一种方法:使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)。kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。当然,除了监控 Lag 之外,它还有其他的功能。今天,我们主要讨论如何使用它来监控 Lag。使用 kafka-consumer-groups 脚本很简单。该脚本位于 Kafka 安装目录的 bin 子目录下,我们可以通过下面的命令来查看某个给定消费者的 Lag 值:

1 $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。我举个实际的例子来说明具体的用法,请看下面这张图的输出:

在运行命令时,我指定了 Kafka 集群的连接信息,即 localhost:9092。另外,我还设置了要查询的消费者组名:testgroup。kafka-consumer-groups 脚本的输出信息很丰富。首先,它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。

Kafka Java Consumer API

下面这段代码展示了如何利用 Consumer 端 API 监控给定消费者组的 Lag 值:

 1 public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
 2         Properties props = new Properties();
 3         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 4         try (AdminClient client = AdminClient.create(props)) {
 5             ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
 6             try {
 7                 Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
 8                 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
 9                 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
10                 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
11                 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
12                 try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
13                     Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
14                     return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
15                             entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
16                 }
17             } catch (InterruptedException e) {
18                 Thread.currentThread().interrupt();
19                 // 处理中断异常
20                 // ...
21                 return Collections.emptyMap();
22             } catch (ExecutionException e) {
23                 // 处理ExecutionException
24                 // ...
25                 return Collections.emptyMap();
26             } catch (TimeoutException e) {
27                 throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
28             }
29         }
30     }

这段代码送给你,你可以将 lagOf 方法直接应用于你的生产环境,以实现程序化监控消费者 Lag 的目的。不过请注意,这段代码只适用于 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中没有 AdminClient.listConsumerGroupOffsets 方法。

Kafka JMX 监控指标

上面这两种方式,都可以很方便地查询到给定消费者组的 Lag 信息。但在很多实际监控场景中,我们借助的往往是现成的监控框架。如果是这种情况,以上这两种办法就不怎么管用了,因为它们都不能集成进已有的监控框架中,如 Zabbix 或 Grafana。下面我们就来看第三种方法,使用 Kafka 默认提供的 JMX 监控指标来监控消费者的 Lag 值。当前,Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。

试想一下,监控到 Lag 越来越大,可能只会给你一个感受,那就是消费者程序变得越来越慢了,至少是追不上生产者程序了,除此之外,你可能什么都不会做。毕竟,有时候这也是能够接受的。但反过来,一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了。

为什么?我们知道 Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。

总结

以后关于kafka系列的总结大部分来自Geek Time的课件,大家可以自行关键字搜索。

原文地址:https://www.cnblogs.com/boanxin/p/13463659.html