KafkaConsumer概念
消费者和消费者群组
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 往群组里增加消费者是横向伸缩消费能力的主要方式。
我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。 Kafka 设计的主要目标之一 ,就是要让 Kafka 主题里的数据能
够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取
到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka 消费者和消费者群组并不会对性能造成负面影响。
为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。
消费者群组和分区再均衡
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡 。再均衡为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),
不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消
费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为群组协调器的 broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间
间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会
话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者
时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
创建 Kafka消费者
bootstrap.servers 指定了 Kafka 集群的连接字符串。key.deserializer和 value .deserializer使用指定的类把字节数组转成 Java 对象。group.id不是必需的。它指定了KafkaConsumer属于哪一个消费者群组。
创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见.
订阅主题
可以在调用 subscribe () 方法时传入一个正则表达式匹配多个主题, 如果有人创建了新的主题,并且主题的名字与正则表达式匹配 ,那么会立即触发一次
再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka 和其他系统之间复制数据时,
使用正则表达式的方式订阅多个主题是很常见的做发。
要订阅所有与 test 相关的主题,可以这样做 :consumer.subscribe (“test.*");
轮询
消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,
开发者只需要使用一组简单的 API 来处理从分区返回的数据。
- 消费者是一个长期运行的应用程序,它通过持续轮询向Kafka 请求数据。
- 就像鲨鱼停止移动就会死掉一样,消费者必须持续对 Kafka 进行轮询,否则会被认为己经死亡 ,它的分区会被移交给群组里的其他消费者。传给poll () 方法的参数是一个超时时间,
- 用于控制 poll () 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0, poll () 会立即返回 ,否则
它会在指定的毫秒数内一直等待 broker 返回数据。 - poll () 方能返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息 、 记录在分区里的偏移量 ,以及记录的键值对。我们一般会遍历这个列表 ,逐条处理这些记录。
poll () 方法有一个超时参数 , 它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。 超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。
5.把结果保存起来或者对已有的记录进行更新。
6.在退出应用程序之前使用 close () 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,
因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息
轮询不只是获取数据那么简单。在第一次调用新消费者的 poll () 方法时,它会负责查找GroupCoordinator , 然后加入群组,接受分配的分区。 如果发生了再均衡,整个过程也是
在轮询期间进行的。心跳也是从轮询里发出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。
消费者的配置
与消费者的性能和可用性有很大关系属性。
1. fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。 broker在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时
才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用
数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
2. fetch.max.wait.ms
fetch.Max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms 。如果没有足够的数据流入Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。
如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.