Kafka的Consumer API有两种

High Level Consumer API
Low Level Consumer API
对于开发这来说,选用哪个,首先要弄清楚这两种API的工作原理什么样,分别开放了哪些功能。

High Level Consumer API说明

High Level Consumer API围绕着Consumer Group这个逻辑概念展开。

  • 它自动管理每个Topic的每个Partition的Offset(自动读取zookeeper中该Consumer group的last offset )
  • 在Broker失败转移以及增减Partition、Consumer时的负载均衡(当Partition和Consumer增减时,Kafka自动进行负载均衡)
    对于多个Partition,多个Consumer

注意kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数,如果consumer比partition多,是浪费。
简单来说一个某个topic下的某个partition只能被一个消费实例处理。

  • 如果consumer比partition少,一个consumer会对应于多个partitions。这里要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。
  • 通常做法partiton数目是consumer数量的整数倍,所以partition数量很重要,比如取24,consumer数量可以是3或者6或者8等。
  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同。
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
  • High-level接口中获取不到数据的时候是会block的
  • 关于Offset初始值的问题:先produce一些数据,然后再用consumer读的话,需要加上一句offset读取设置,即如果初始offset非法,那么从最小的offset开始。
props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据  
  • 1

因为初始的offset默认是非法的,这个配置只有当offset非法时才会执行,如果不配置,默认是largest,即最新,所以要读之前produce的数据,必须加上这个配置。并且offset初始化后再加smallest配置是没用的,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset。

Low Level Consumer API说明

Low Level Consumer API控制灵活性高,但使用也相对复杂。
Low Level Consumer API,作为底层的Consumer API,提供了消费Kafka Message更大的控制,如:

  • Read a message multiple times(重复读取)
  • Consume only a subset of the partitions in a topic in a process(跳读)
  • Manage transactions to make sure a message is processed once and only once(消息事务处理)

Low Level Consumer API提供更大灵活控制是以复杂性为代价的:

  • Offset不再透明
  • Broker自动失败转移需要处理
  • 增加Consumer、Partition、Broker需要自己做负载均衡

注意如下:

  • You must keep track of the offsets in your application to know where you left off consuming.(在自己的应用中管理Offset)
  • You must figure out which Broker is the lead Broker for a topic and partition(如果一个Partition有多个副本,那么Lead Partition所在的Broker就称为这个Partition的Lead Broker,得自己判断)
  • You must handle Broker leader changes(Broker Leader变更也得自己来)

使用Low Level Consumer API的步骤

  • Find an active Broker and find out which Broker is the leader for your topic and partition(找到一个活跃的broker,并且找出当前topic和partition的leader broker)
  • Determine who the replica Brokers are for your topic and partition(要决定哪些broker是当前topic和partition的leader broker的副本备份)
  • Build the request defining what data you are interested in(创建请求数据说明)
  • Fetch the data(拉取数据)
  • Identify and recover from leader changes(在leader broker变更后,重新标识和恢复)。

最后

从开放的功能服务看,highlevel的配置与使用相对简单,分布式管理功能由Kafka集群与zookeeper自行管理,消费者只要从头或者从最新处获取数据即可,服务重启,能从上次消费位置开始,leader故障,能够自行rebalance。而lowlevel更为为灵活也更复杂,这些都是开放给自己的应用来管理。

原文地址:https://www.cnblogs.com/liliuguang/p/14627513.html