kafka学习(三)

kafka 消费者-从kafka读取数据

 
消费者和消费者群里
kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一主题,每个消费者接受主题一部分分区的消息。如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接受到任何消息。往群组里增加消费者是横向伸缩消费能力的主要方式。简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。
 
 

消费者群组和分区再均衡

群组里的消费者共同读取主题的分区。一个新的消费者加入群组时,它读取的原本由其他消费者读取的信息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群里的其他消费者来读取,在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配,分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。
消费者通过向被指派为群组协调器的broker发送心跳来维护它们和群组的从属关系及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活泼的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
 
 

创建kafka消费者

在读取消息之前,需要先创建一个kafkaConsumer对象。创建kafkaConsumer对象与创建kafkaProducer相识。需要3个必要的属性:bootstrap.servers,key.deserializer 和value.deserializer
 
bootstrap.servers 指定了kafka集群连接字符串。
key.deserializer和value.deserializer 把使用指定的类把字节数组转成java对象。
group.id 不是必须的,不过我们现在姑且认为它是必须的,它指定了kafkaConsumer属于哪一个消费者群组。
 
 

订阅主题

subscribe()方法接受一个主题列表作为参数,示例:
consumer.subscribe(Collections.singleonList("test"));//主题为test
 
 

轮询

消费轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调,分区再均衡,发送心跳和获取数据,开发者只需要使用一组简单的API来处理从分区返回的数据。并且轮询不只是获取数据那么简单。在第一次调用新消费者的poll()方法时,它会负责查找GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理都应该尽快完成。
 
 

消费者的配置

除了上述说的bootstrap.servers,group.id,key.deserializer 和value.deserializer之外。其他的参数也会影响到消费者的性能和可用性。
1.fetch.min.bytes 该属性指定了消费者从服务器获取记录的最小字节数。
 
2.fetch.max.wait.ms 我们通过fetch.min.bytes告诉kafka,等到有足够的数据是才把它返回给消费者。
 
3.max.partition.fetch.bytes 该属性指定了服务器从每个分区里返回给消费者的最大字节数。默认是1MB
 
4.session.timeoout.ms 该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s.
 
5.auto.offset.reset 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该如果处理。它的默认值是latest(从最新的消息记录读取数据),另一个值是earliest (从最早的位置开始读取记录)
 
6.enable.auto.commit 该属性指定了消费者是否自动提交偏移量。默认是true 一般是手动提交
 
7.partition.assignment.strategy 分配策略 决定哪些分区应该被分配给那个消费者
1.range 该策略会把主题的若干个连续的分区分配给消费者。
2.RoundRobin 该策略把主题的所有分区逐个分配给消费者。
默认是range.
 
8.client.id 该属性可以是任意字符串,broker用来标识从客户端发送过来的消息,通常被用在日志度量指标和配额里
 
9.max.poll.records 该属性用于控制单次调用call()方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
 
10.receive.buffer.bytes 和send.buffer.bytes socket在读写数据时用到的TCP缓冲区也可以设置大小,如果他们被设置为-1,就使用操作系统的默认值。
 

提交和偏移量

 
每次调用poll()方法,它总会返回由生产者写入kafka但还没有被消费者读取过的几率,我们因此可以追踪到那些记录是被群组里的那个消费者读取的。如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
 
所以提交偏移量的方式对客户端会有很大的影响。kafkaConsumer API提供了很多种方式来提交偏移量。
1.自动提交,消费者会自动把从poll()方法接受到的最大偏移提交上去。提交时间间隔由auto.commit.interval.ms 默认是5s.
 
2.提交当前偏移量,设置不自动提交,使用commitSync() 提交偏移量最简单也最可靠,提交成功会马上返回,失败就抛异常 记住: commitSync() 将会提交由poll()返回的最新偏移量,所以在处理完所有记录后要确保调用了commitSync,否则还是会有丢失消息的风险。
 
3.异步提交
手动提交有一个不足之处,在broker对提交请求作为回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。
在成功提交或者碰到无法恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会,这也是commitAsync()不好的地方。
 
4.同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交会是成功的。
 
这种组合保证偏移量一定提交成功,先用异步提交,如果发生异常在同步。
 
5.提交特定的偏移量,这个看业务需求
 

在均衡监听器

消费者在退出和进行分区再均衡之前,会做一些清理工作。
在订阅主题调用subscribe()方法的时候传进去一个ConsumerRebalanceListener实例就可以了,有两个需要实现的方法。
1.onpartitionsRevoked() 方法会在再均衡开始之前和消费者停止读取消息之后被调用。
2.onpartitionAssigned() 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
 
 

从特定偏移量处开始处理记录

 
如果你想从起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息。可以使用seektobeg() 和 sendToend() kafka也会为我们提供了用于查找特定偏移量的API。使用ConsumerRebalanceListener 接口实现。
 

如何退出

 
如果确定要退出,需要通过另一个线程调用consumer.wakeup()方法,原理线程抛出异常,线程结束,不处理异常信息。
原文地址:https://www.cnblogs.com/Seeasunnyday/p/9231970.html