Kafka 0.8源码分析—ZookeeperConsumerConnector

1.HighLevelApi

High Level Api是多线程的应用程序,以Topic的Partition数量为中心。消费的规则如下:

  • 一个partition只能被同一个ConsumersGroup的一个线程所消费.
  • 线程数小于partition数,某些线程会消费多个partition.
  • 线程数等于partition数,一个线程正好消费一个线程.
  • 当添加消费者线程时,会触发rebalance,partition的分配发送变化.
  • 同一个partition的offset保证消费有序,不同的partition消费不保证顺序.

image

关于与ZK的几个参数意思解释

  • zookeeper.connect: ZK连接。
  • group.id: Consumer消费ID。
  • zookeeper.session.timeout.ms: kafka节点与ZK会话的超时时间。
  • zookeeper.sync.time.ms: zk的follower与leader的同步时间间隔。
  • auto.commit.interval.ms: Consumer offset自动提交给Zookeeper的时间。

Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.(由于记录Offset是基于时间的,所以当Consumer发生错误的时候,有可能会收到重复的消息。)

消费者的代码

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threads));

Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(threads);

for (KafkaStream<byte[], byte[]> stream : streams) {
    executor.submit(new KafkaConsumerWorker(stream));
}

2.ZookeeperConsumerConnector

一个Consumer会创建一个ZookeeperConsumerConnector,代表一个消费者进程.

  • fetcher: 消费者获取数据, 使用ConsumerFetcherManager fetcher线程抓取数据
  • zkUtils: 消费者要和ZK通信, 除了注册自己,还有其他信息也会写到ZK中
  • topicThreadIdAndQueues: 消费者会指定自己消费哪些topic,并指定线程数, 所以topicThreadId都对应一个队列
  • messageStreamCreated: 消费者会创建消息流, 每个队列都对应一个消息流
  • offsetsChannel: offset可以存储在ZK或者kafka中,如果存在kafka里,像其他请求一样,需要和Broker通信
  • 还有其他几个Listener监听器,分别用于topicPartition的更新,负载均衡,消费者重新负载等

当Broker挂掉的时候,在这个Broker上的所有Partition都丢失了,而Partition是给消费者服务的.
所以Broker挂掉后在做迁移的时候,会将其上的Partition转移到其他Broker上,因此消费者要消费的Partition也跟着变化.

2.1 init

在创建ZookeeperConsumerConnector时,有几个初始化方法需要事先执行.

  • 消费者要和ZK通信,所以connectZk会确保连接上ZooKeeper
  • 消费者要消费数据,需要有抓取线程,所有的抓取线程交给ConsumerFetcherManager统一管理
  • 由消费者客户端自己保存offset,而消费者会消费多个topic的多个partition.
  • 多个partition的offset管理类OffsetManager是一个GroupCoordinator
  • 定时提交线程会使用OffsetManager建立的通道定时提交offset到zk或者kafka.
    image

2.2 createMessageStreams

ConsumerConnector创建消息流,需要指定解码器,因为要将日志反序列化(生产者写消息时对消息序列化到日志文件).

在kafka的运行过程中,会有其他的线程将数据放入partition对应的queue中. 而queue是用于KafkaStream的.
一旦数据添加到queue后,KafkaStream的阻塞队列就有数据了,消费者就可以从队列中消费消息.

  • createMessageStreams: 返回KafkaStream, 每个Topic都对应了多个KafkaStream. 数量和topicCount中的count一样.
例子解释

假设消费者C1声明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.
topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5个线程,queuesAndStreams也有5个元素.

consumerThreadIdsPerTopicMap = {
    topic1: [C1_1, C1_2],
    topic2: [C1_1, C1_2, C1_3]
}
topicThreadIds.values = [
    [C1_1, C1_2],
    [C1_1, C1_2, C1_3]
]
threadIdSet循环[C1_1, C1_2]时, 生成两个queue->stream pair. 
threadIdSet循环[C1_1, C1_2, C1_3]时, 生成三个queue->stream pair. 
queuesAndStreams = [
    (LinkedBlockingQueue_1,KafkaStream_1),      //topic1:C1_1
    (LinkedBlockingQueue_2,KafkaStream_2),      //topic1:C1_2
    (LinkedBlockingQueue_3,KafkaStream_3),      //topic2:C1_1
    (LinkedBlockingQueue_4,KafkaStream_4),      //topic2:C1_2
    (LinkedBlockingQueue_5,KafkaStream_5),      //topic2:C1_3
]
  • 客户端关注的是我的每个线程都对应了一个队列,每个队列都是一个消息流就可以了.
  • 客户端的每个线程实际上是针对Partition级别的,一个线程对应一个或多个partition。

2.3 registerConsumerInZK

消费者需要向ZK注册一个临时节点,路径为:/consumers/[group_id]/ids/[consumer_id],内容为订阅的topic.

问题:什么时候这个节点会被删除掉呢? Consumer进程挂掉时,或者Session失效时删除临时节点. 重连时会重新创建.
由于是临时节点,一旦创建节点的这个进程挂掉了,临时节点就会自动被删除掉. 这是由zk机制决定的,不是由消费者完成的.

2.4 reinitializeConsumer listener

当前Consumer在ZK注册之后,需要重新初始化Consumer.对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.

  • 将topic对应的消费者线程id及对应的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放数据的queue
  1. 注册sessionExpirationListener,监听状态变化事件.在session失效重新创建session时调用
  2. 向/consumers/[group_id]/ids注册Child变更事件的loadBalancerListener,当消费组下的消费者发生变化时调用
  3. 向/brokers/topics/[topic]注册Data变更事件的topicPartitionChangeListener,在topic数据发生变化时调用
  • 显式调用loadBalancerListener.syncedRebalance(), 会调用reblance方法进行consumer的初始化工作
private def reinitializeConsumer[K,V](topicCount: TopicCount, 
  queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
  val dirs = new ZKGroupDirs(config.groupId)
  // ② listener to consumer and partition changes
  if (loadBalancerListener == null) {
    val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
    loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString, 
      topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
  }
  // ① create listener for session expired event if not exist yet
  if (sessionExpirationListener == null) sessionExpirationListener = 
    new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
  // ③ create listener for topic partition change event if not exist yet
  if (topicPartitionChangeListener == null) 
    topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)

  // listener to consumer and partition changes
  zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
  zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
  // register on broker partition path changes.
  topicStreamsMap.foreach { topicAndStreams => 
    zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
  }

  // explicitly trigger load balancing for this consumer
  loadBalancerListener.syncedRebalance()
}

ZKRebalancerListener传入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它们都会使用ZKRebalancerListener完成自己的工作.

2.5 ZKSessionExpireListener

当Session失效时,新的会话建立时,立即进行rebalance操作.

2.6 ZKTopicPartitionChangeListener

当topic的数据变化时,通过触发的方式启动rebalance操作.

2.7 ZKRebalancerListener watcher

image

image

class ZKRebalancerListener(val group: String, val consumerIdString: String,
                           val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
  extends IZkChildListener {
  private var isWatcherTriggered = false
  private val lock = new ReentrantLock
  private val cond = lock.newCondition()

  private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
    override def run() {
      var doRebalance = false
      while (!isShuttingDown.get) {
          lock.lock()
          try {
            // 如果isWatcherTriggered=false,则不会触发syncedRebalance. 等待1秒后,继续判断
            if (!isWatcherTriggered)
              cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
          } finally {
            // 不管isWatcherTriggered值是多少,在每次循环时,都会执行. 如果isWatcherTriggered=true,则会执行syncedRebalance
            doRebalance = isWatcherTriggered
            // 重新设置isWatcherTriggered=false, 因为其他线程触发一次后就失效了,想要再次触发,必须再次设置isWatcherTriggered=true
            isWatcherTriggered = false
            lock.unlock()
          }
          if (doRebalance) syncedRebalance        // 只有每次rebalanceEventTriggered时,才会调用一次syncedRebalance
      }
    }
  }
  watcherExecutorThread.start()

  // 触发rebalance开始进行, 修改isWatcherTriggered标志位,触发cond条件运行
  def rebalanceEventTriggered() {
    inLock(lock) {
      isWatcherTriggered = true
      cond.signalAll()
    }
  }

3.ZKRebalancerListener rebalance

因为消费者加入/退出时,消费组的成员会发生变化,而消费组中的所有存活消费者负责消费可用的partitions.
可用的partitions或者消费组中的消费者成员一旦发生变化,都要重新分配partition给存活的消费者.下面是一个示例.

当然分配partition的工作绝不仅仅是这么简单的,还要处理与之相关的线程,并重建必要的数据:

  1. 关闭数据抓取线程,获取之前为topic设置的存放数据的queue并清空该queue
  2. 释放partition的ownership,删除partition和consumer的对应关系
  3. 为各个partition重新分配threadid
    获取partition最新的offset并重新初始化新的PartitionTopicInfo(queue存放数据,两个offset为partition最新的offset)
  4. 重新将partition对应的新的consumer信息写入zookeeper
  5. 重新创建partition的fetcher线程

image

private def rebalance(cluster: Cluster): Boolean = {
  val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, 
    zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
  val brokers = zkUtils.getAllBrokersInCluster()
  if (brokers.size == 0) {
    zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
    true
  } else {
    // ① 停止fetcher线程防止数据重复.如果当前调整失败了,被释放的partitions可能被其他消费者拥有.
    // 而没有先停止fetcher的话,原先的消费者仍然会和新的拥有者共同消费同一份数据.  
    closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
    // ② 释放topicRegistry中topic-partition的owner
    releasePartitionOwnership(topicRegistry)
    // ③ 为partition重新分配消费者....
    // ④ 为partition添加consumer owner
    if(reflectPartitionOwnershipDecision(partitionAssignment)) {
        allTopicsOwnedPartitionsCount = partitionAssignment.size
        topicRegistry = currentTopicRegistry
        // ⑤ 创建拉取线程
        updateFetcher(cluster)
        true
    }
  }
}

rebalance操作涉及了以下内容:

  • PartitionOwnership: Partition的所有者(ownership)的删除和重建
  • AssignmentContext: 分配信息上下文
  • PartitionAssignor: 为Partition分配Consumer的算法
  • PartitionAssignment: Partition分配之后的上下文
  • PartitionTopicInfo: Partition的最终信息
  • Fetcher: 完成了rebalance,消费者就可以重新开始抓取数据

3.1 核心:PartitionAssignor

将可用的partitions以及消费者线程排序, 将partitions处于线程数,表示每个线程(不是消费者数量)平均可以分到几个partition.

如果除不尽,剩余的会分给前面几个消费者线程. 比如有两个消费者,每个都是两个线程,一共有5个可用的partitions: (p0-p4).

每个消费者线程(一共四个线程)可以获取到至少一共partition(5/4=1),剩余一个(5%4=1)partition分给第一个线程.
最后的分配结果为: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1

image

关闭Fetcher时要注意:

  • 先提交offset,然后才停止消费者. 因为在停止消费者的时候当前的数据块中还会有点残留数据.
  • 因为这时候还没有释放partiton的ownership(即partition还归当前consumer所有),强制提交offset,
    这样拥有这个partition的下一个消费者线程(rebalance后),就可以使用已经提交的offset了,确保不中断.
  • 因为fetcher线程已经关闭了(stopConnections),这是消费者能得到的最后一个数据块,以后不会有了,直到平衡结束,fetcher重新开始
  1. topicThreadIdAndQueues来自于topicThreadIds,所以它的topic应该都在relevantTopicThreadIdsMap的topics中.
    为什么还要过滤呢? 注释中说到在本次平衡之后,只需要清理可能不再属于这个消费者的队列(部分的topicPartition抓取队列).

  2. 问题:新创建的ZKRebalancerListener中kafkaMessageAndMetadataStreams(即这里的messageStreams)为空的Map.
    如何清空里面的数据? 实际上KafkaStream只是一个迭代器,在运行过程中会有数据放入到这个流中,这样流就有数据了.

4.ConsumerFetcherManager

Fetcher线程要抓取数据关心的是PartitionTopicInfo,首先要找出Partition Leader(因为只向Leader Partition发起抓取请求).
初始时假设所有topicInfos(PartitionTopicInfo)都找不到Leader,即同时加入partitionMap和noLeaderPartitionSet.
在LeaderFinderThread线程中如果找到Leader,则从noLeaderPartitionSet中移除.

ConsumerFetcherManager管理了当前Consumer的所有Fetcher线程.

image

5.小结

high level的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。
每个Consumer被创建时会触发Consumer Group的Rebalance,具体的启动流程是:

  1. (High Level)Consumer启动时将其ID注册到其Consumer Group下 (registerConsumerInZK)
  2. 在/consumers/[group_id]/ids上和/brokers/ids上分别注册Watch (reinitializeConsumer->Listener)
  3. 强制自己在其Consumer Group内启动Rebalance流程 (ZKRebalancerListener.rebalance)

在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发Consumer Rebalance。
因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,
当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。

该方式有如下缺陷:

  • Herd effect(羊群效应): 任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance
  • Split Brain(脑裂): 每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,
    那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。
  • 调整结果不可控: 所有的Consumer都并不知道其它Consumer的Rebalance是否成功,这可能会导致Kafka工作在一个不正确的状态。

根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功.

参考这篇博文,将这个类好好总结一下。
涉及到元数据信息的不一致问题,还有rebalance的问题。

原文地址:https://www.cnblogs.com/byrhuangqiang/p/6372114.html