kafka消费者

1.消费方式

  consumer采用pull(拉)模式从broker中读取数据。

  push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能的以最快速度传递消息,但是这样很容易造成consumer来不及处理消息。典型的表现就是拒绝服务以及网络拥塞,而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

  pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时间为timeout。

2.分区分配策略

  一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

  kafka有两种分配策略,一是RoundRobin,二是Range;

  2.1 分区分配的条件

    1.同一个消费组内消费者的新增,关闭或崩溃

    2.订阅的主题新增分区

  2.2 RoundRobin

    使用RoundRobin策略有两个前提条件必须满足:

      1.同一个consumer group里面的所有消费者的num.streams必须相等;

      2.每个消费者订阅的主题必须相同;

    RoundRobin策略的工作原理:将所有主题的分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,分发给每个消费者。(其实就是按分区名hash排序后平均分配给每一个消费者的线程)

                                                                             

  2.3 Range

    是对每个主题而言。首先按照分区序号排序,然后将消费者排序。分区数/消费者数=m,如果m!=0,前m个消费者多消费一个分区(每个主题)

    

3.offset的维护

  由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到哪个offset,以便故障恢复后继续消费; 

  我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同。

  首先来说说消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector】,我们会配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的       

  【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
1
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

  kafka0.9版本之前,consumer消费者默认将offset保存到zookeeper中,0.9版本开始,consumer默认将offset保存到kafka一个内置的topic中,该topic为_consumer_offsets;

  

原文地址:https://www.cnblogs.com/wnwn/p/12396228.html