Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

KafkaConsumer概念
消费者和消费者群组
  Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费接收主题部分分区的消息。 往群组里增加消费者是横向伸缩消费能力的主要方式。
我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。  
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同主题读取数据的情况。 Kafka 设计的主要目标之,就是要让 Kafka 主题里的数据能
够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取
到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka 消费者和消费者群组并不对性能造成负面影响。

为每一个需要获取个或多个主题全部消息的应用程序创建个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理部分消息。

消费者群组和分区再均衡
  分区的所有权从个消费者转移到另个消费者,这样的行为被称为再均衡 。再均衡为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),
不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组小段时间的不可用。另外,当分区被重新分配给另个消费者时,消
费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢用程序。
  消费者通过向被指派为群组协调器的 broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间
间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会
话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

  如果个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者
时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

创建 Kafka消费者

bootstrap.servers 指定了 Kafka 集群的连接字符串。key.deserializervalue .deserializer使用指定的类把字节数组转成 Java 对象group.id不是必需的。它指定了KafkaConsumer属于哪个消费者群组。
创建不属于任何个群组的消费者也是可以的,只这样不太常见.

订阅主题

可以在调用 subscribe () 方法时传入个正则表达式匹配多个主题, 如果有人创建了新的主题,并且主题的名字与正则表达式匹配 ,那么会立即触发
再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka 和他系统之间复制数据时,
使用正表达式的方式订阅多个主题是很常见的做发
要订阅所有test 相关的主题,可以这样做 consumer.subscribe (“test.*")

轮询
  消息轮询是消费者 API 的核心,通过个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,
发者只需要使用组简单的 API 来处理从分区返回的数据。

  1. 消费者是个长期运行的应用程序,它通过持轮询Kafka 请求数据。
  2. 就像鲨鱼停止移动就会死掉一样,消费者必须持续对 Kafka 行轮询,否则会被认为己经死亡 ,分区会被移交给群组里的其他消费者。传给poll () 方法参数是一个超时时间,
  3. 用于控制 poll () 方法的阻塞时间(在消费者的冲区里没有可用数据会发生阻塞)。如果该参数被设为 0, poll () 会立即返回 ,否则
    它会在指定的毫秒数一直等待 broker 返回数据。
  4. poll () 方能返回个记录列表。每条记录都包含了记录所属主题的信息、记在分区的信息 记录在分区里的偏移量 ,以及记录的键值对。我们般会遍历这个列表 ,逐条处理这些记录。

     poll () 方法有参数 它指定了方法在多久之后可以返回,不管有没有可用数据都要返回。 超时时间的设置取决于应用程序对响应速度的要求,比如要在多长内把权归还给行轮询的线程

         5.把结果保存起来或者对已有的记录进行更新。 

         6.在退出应用程序之前使close () 方法关闭消费者。网络连接和 socket 也会随之关闭,即触发次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,
            因为那样需要更长的间,导致整个群组在
间内无法读取消息
轮询不只是获取数据那么简单。在第次调用新消费者的 poll () 方法时,它会负责查找GroupCoordinator , 然后加入群组,接受分配分区。 果发生了再均衡,整个过程也
在轮询期间进行轮询里发出去的。所以,我们要确保在轮期间所做的任何处理工作都应该尽快完成。

消费者的配置

与消费者的性能和可用性有很大关系属性。
1. fetch.min.bytes
  该属性指定了消费者从服务器获取记录的最小字节数。 broker在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时
才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它在主不是很跃的时候(或者天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用
数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

2. fetch.max.wait.ms
   fetch.Max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms 。如果没有足够的数据流Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 延迟。

果要降潜在的延迟(为了满足 SLA),可以把该参数值设置得小些。如果 fetch.max.wait.ms 被设为 100ms,并且fetch.min.bytes 被设为1MB ,那么 Kafka 在收到消费者的请求后,

要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据 就看哪个条件先得到满足。

3. max.patition.fetch.bytes
  该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB ,KafkaConsumer. poll () 方法从每个分区里返回的记录最多不超过 Max.partition.fecth.bytes
指定的字节。如果个主题有 20 个分区和 5 个消费者,那么每个费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可给它多分配一些,因
为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。 max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的节数大, 则消费者可能无法读取这些消息,导致消费者一直挂起重试。 
在设置该属性时,另个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll () 方法来避免会话过期和发生分区再均衡,如果单次调用 poll () 返回的数据太多,消费者需要更
多的时间处理,可能无法及时进行下个轮询来避免会话过期。如果出现这种情况, 以把max.patition.fetch.bytes值改,或者延长会话过期时间。

4 . session.timeout.ms
  该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认
已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms 紧密相关 。heartbeat.interval.ms 指定了 poll () 方法向协调
发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。所以, 般需要同时修改这两个属性, heartbeat.interval.ms必须比 session.timeout.ms 小,
般是 session.timeout.ms分之。如果 session.timeout.ms是 3s ,那么 heartbeat.interval.ms 应该是 1s 把 session.timeout.ms值设置比默认值小,可以更快地检测和恢
复崩溃节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

5. auto.offset.reset
  该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest ,
思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest ,意思是说,在偏移量无效的情况下,消费者将从
起始位置读取分区的记录。


6. enable.auto.commit
  该属性指定了消费者是否自动提交偏量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false ,由
己控制何时提交偏移量。如果把它设为 true ,还可以通过配置 auto.commit.interval.ms属性来控制提交的频率。

7. partition.assignment.strategy
  分区会被分配给群组里的消费者。partition.assignment.strategy根据给定的消费者和题,决定哪些分区应该被分配给哪个消费者。 Kafka 有两个默认的分配策略
  Range
    该策略会主题若干连续的分区分配给消费者。假设悄费者C1和消费者C2同时订阅了主题 T1和主题 T2 ,并且每个主题有 3 个分区。那么消费者 C1有可能分配到过
    两个主题的分区0和分区1,而消费者 C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第个消费者最后分配到比第个消
    费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就出现这种情况。

  RoundRobin
    该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1和消费者 C2 分配分区,那么消费者 C1将分到主题 Tl 的分区 0 和分区 2 以及主题 T2
    的分区 1 ,消费者 C2 将分配到主题T1的分区1以及主题T1的分区0和分区2。一般来说 ,如果所有消费者都订阅相同的主题(这种情况很常见) , RoundRobin 策略会给所
    有消费者分配相同数量的分区(或最多就差个分区)。

可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,
这个类实现了 Range 策略,不过可以把它改成 。org.apache .kafka . clients.consumer.RoundRobinAssignor。我还可以使用自定
义策略,
 partition.assignment.strategy 属性的值就是自定义类的名字。

8. client.id
  该属性可以是任意字符串 , broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

9. max.poll.records
  该属性用于控制单次调用 call () 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。

10. receive.buffer.bytes 和 send.buffer.bytes
  socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这
些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽

提交和偏移量
  每次调用 poll () 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录 我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。Kafka
不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的个独特之处。消费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。

我们把更新分区当前位置的操作叫作提交

那么消费者是如何提交偏移量的呢?消费者往一个叫作_consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量如果消费者一直处于运行状态,那么偏移量就没有
什么用处。如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续
之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复处理 。
如果提交的偏移量大于客户端处理的最后个消息的偏移量,那么处于两个偏移量之间的
消息将会丢失。

自动提交
  最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true ,那么每过5s钟,消费者会自动把从 poll () 方告接收到的最大偏移量提交上去。提交时间间隔
auto.commit.interval.ms 控制,默认值是 5s。自动提交是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是就会提交从上一次轮询返回的偏移量。
在使用这种简便的方式之前,需要知道它将会带来怎样的结果。假设使用5s 提交时间间隔,在最近次提交之后的 3s 发生了再均衡,再
均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地
提交偏移量,减小可能出现重复消息的时间窗 
  在使用自动提交,每次调用轮询方法上一次调的偏移量交上去,它并不知道具体哪些消息已经被处理了,所以再次调用之前最好有当调用返回消息
都已经处理完毕(在调用 close () 方位之前也行自动提交)。 下不会有么问题,在处理异常或提前退出轮询小心
自动提交虽然方便 但并没有为开发者留有余来避免重复息。

提交当前偏移量
  大部分开发者通过控制偏移量提交时间来消除丢失消息的可性,并在发生再均衡减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式 开发者可要的时候
提交当前偏移量,而不是基于时间间隔。把 auto.commit.offset 设为 false ,让应用程序决定时提交移量。使用commitSync()提交偏移量最简单最可靠。
这个 API 会提交由 poll () 方能返回最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。在理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。

异步提交
  手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会直阻塞,这样会限制应用程序的吞吐量。可以通过降低提交频率来提升吞吐率,但如果发生了
均衡, 会增重复消息的数量。这个时候可以使用异步提交 API只管发送提交请求,无需等待 broker 的响应。
  在成功提交或碰到无怯恢复的错误之前, commitSync () 直重试,但是 commitAsync()不会,这也是 commitAsync() 不好的个地方。它之所以不进行重试,是因为在它收到
服务的时候,可能有个更大的偏移量经提交成功。假设我们发出个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题 ,服务器收不到请求,自然也不会
作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如commitAsync()() 重新尝试提交偏移量 2000 ,它有可能在偏移量 3000 之后提交成功。这个时
候如果发生再均衡,就会出现重复消息。

commitAsync ()也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标, 过如果你要用它来进行重试, 定要注意提交的顺序。

发送提交请求然后继续做其他事情,如果提交失败,错误信息和偏移量会被记录下来

同步和异步组合提交
  组合使用 commitAsync ()和 commitSync  ()

提交特定的偏移量
  调用 commitSync ()或 commitAsync () 它们只会提交最后个偏移量,而此时该批次里的消息还没有处理完。

消费者 API 允许在调用 commitSync ()和 commitAsync ()方法时传进去希望提交分区和偏移量的 map。假设你处理了半个批次的消息 最后个来自主题“customers”
分区 3 的消息的偏移量是 5000 , 你可以调用 commitSync () 方法来提交它。因为消费者可能不只读取个分区, 你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移
的提交会让代码变复杂。

这里调用的是 commitAsync () ,调用 commitSync()也是完全可以的。当然,在提特定偏移量时,仍然要处理可能发生的错误。

再均衡监听器


  消费者在退出和进行分区再均衡之前,会做些清理工作 在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行些应用程序代码,
调用 subscribe () 方法时传进去一 个 ConsumerRebalancelistener实例就可以了。ConsumerRebalancelistener有两个需要实现的方法。

(1) public void onPartitionsRevoked(Collection<TopicPartition> partitions )再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下个接管分区的消费者就知道该从哪里开始读取了。

(2) public  void onPartitionsAssigned(Collection<TopicPartition> partitions )方法会在重新分配分区之后和消费者开始读取消息之前被调用。

从特定偏移量处开始处理记录

  从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息, 用 seekToBeginning(Collection<TopicPartition> tp ) seekToEnd ( Collection<TopicPartitiontp ) 这两个方法。
Kafka 为我们提供了用于查找特定偏移量的 API
应用程序从 Kafka 读取事件(可能是网站的用户点击件流 ),对它们进行处理(可能是使用自动程序清理点击操作井添加会话信息),然后把结果保存到数据库。不想丢失任何数据,也不想在数据库
里多次保存相同的结果。 可以每处理条记录就提交一次偏移量。尽管如此, 在记录被保存到数据库之后以及偏移量被提交之前 ,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里
就会出现重复记录。
可以在同一个事务里把记录和偏移量都写到数据库 ,在消费者启动或分配到新分区时 ,可以使用 seek ()方告找保存在数据库里的偏移量。  
下面的例子大致说明了如何使用 ConsumerRebalancelistener和 seek () 法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。

  1. 使个虚构方能来提交数据库事务。大想怯是这样的 处理完记录之后,将偏移量插数据库,然后在将失去分区有权之前提交事务,确保成功保存了这些信息
  2. 使用另个虚构方怯来数据库获取偏移量 在分配到新分的时候,使用 seek()方住定位到那些录。
  3. 订阅主题之后 开始启动费者 调用poll () 方法,让消费者加入到消费群组里,并获取分配到分区 然后马上调用 seek () 方也定位分区的偏移量。

    seek () 方能只更新我正在使位置,在下次调用 poll () 时就可以获得正确的如果 seek () 发生错误( 如偏移量不存在) , poll () 就会抛出异常。

         4.另个虚构的方法 这次要更新是数据库里于保存偏移量的表。

通过偏移量和记录保存到同部系统来实现单次语义可以有很多种方式,不过它们都需要结合使用 ConsulmerRebalancelistener seek () 方法来确保能够及时保存偏移量,
并保证消费者总是正确位置开始读取消息。


如何退出
  轮询时如果要退出循环,要通过另个线程调用 consumer.wakeup ( )方法。如果循环运行在主线程里,可ShutdownHook 里调用该方法。consumer.wakeup () 是消费
唯一 一个可以从线程里安全调的方法。调用 consumer.wakeup () 可以退出 poll() ,出 wakeupException 异常,或者果调用 consumer.wakeup () 时线程没有等待轮询,
么异常将在下轮调用 poll ()时抛出。我们不需要处理 WakeupException ,因为它只是用出循环的方式。 在退出线程之前调用 consumer.close () 是很有必要的,
会提交任何还没有提交的东西 并向群组调器发送消息,告知自己要离开群组,接下来就会发再均衡 ,而不需要等待会话

独立消费者一一为什么以及怎样使用没有群组的消费者
  如果只需要消费者题的所有分区或者某个特定分区读取数据。这个候就不需要消费者群组均衡了, 只需要把题或者区分配给消费者,然后开始读取消息并提交移量。
如果是这样的话,就不需要订阅题, 而代是为自己分配分区。消费者可以订阅主题(井加入消费者群组),或者为自己分配分区 但不时做这两件事情。
下面的子演示了一个消费者是如何为自分区并分区里读取消息的:

除了不会发生再均衡,也不需要手分区 他的来一正常。不过要记,如果主题增加了新的分区,消费者并不会收到通知。要么周期性地调用 consume.partitionsFor()
方法来检查是否有新分区加入, 要么在添新分区后重启应用程序。



  



  
























 








 










 













原文地址:https://www.cnblogs.com/jixp/p/9715531.html