kafka学习笔记(六)kafka的controller模块

概述

今天我们主要看一下kafka的controller的代码,controller代码是kafka的非常重要的代码,需要我们深入学习。从某种意义上来说,它是kafka最核心的组件,一方面,他要为集群中的所有主题分区选取领导者副本;另一方面,它还承载着集群的全部元数据信息,并负责讲这些元数据信息同步到其他broker上。下面我们来一一讲解controller组件。

集群元数据

事实上,集群 Broker 是不会与 ZooKeeper 直接交互去获取元数据的。相反地,它们总是与 Controller 进行通信,获取和更新最新的集群数据。而且社区已经打算把 ZooKeeper“干掉”了,以后 Controller 将承担更重要的工作。我们总说元数据,那么,到底什么是集群的元数据,或者说,Kafka 集群的元数据都定义了哪些内容呢?我用一张图给你完整地展示一下,当前 Kafka 定义的所有集群元数据信息。

在了解具体的元数据之前,我要先介绍下 ControllerContext 类。刚刚我们提到的这些元数据信息全部封装在这个类里。应该这么说,这个类是 Controller 组件的数据容器类。

ControllerContext

它定义了前面提到的所有元数据信息,以及许多实用的工具方法。比如,获取集群上所有主题分区对象的 allPartitions 方法、获取某主题分区副本列表的 partitionReplicaAssignment 方法,等等。首先,我们来看下 ControllerContext 类的定义,如下所示:

 1 class ControllerContext {
 2   val stats = new ControllerStats // Controller统计信息类 
 3   var offlinePartitionCount = 0   // 离线分区计数器
 4   val shuttingDownBrokerIds = mutable.Set.empty[Int]  // 关闭中Broker的Id列表
 5   private val liveBrokers = mutable.Set.empty[Broker] // 当前运行中Broker对象列表
 6   private val liveBrokerEpochs = mutable.Map.empty[Int, Long]   // 运行中Broker Epoch列表
 7   var epoch: Int = KafkaController.InitialControllerEpoch   // Controller当前Epoch值
 8   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion  // Controller对应ZooKeeper节点的Epoch值
 9   val allTopics = mutable.Set.empty[String]  // 集群主题列表
10   val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]  // 主题分区的副本列表
11   val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]  // 主题分区的Leader/ISR副本信息
12   val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]  // 正处于副本重分配过程的主题分区列表
13   val partitionStates = mutable.Map.empty[TopicPartition, PartitionState] // 主题分区状态列表 
14   val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]  // 主题分区的副本状态列表
15   val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]  // 不可用磁盘路径上的副本列表
16   val topicsToBeDeleted = mutable.Set.empty[String]  // 待删除主题列表
17   val topicsWithDeletionStarted = mutable.Set.empty[String]  // 已开启删除的主题列表
18   val topicsIneligibleForDeletion = mutable.Set.empty[String]  // 暂时无法执行删除的主题列表
19   ......
20 }

这些元数据理解起来还是比较简单的,掌握了它们之后,你在理解 MetadataCache,也就是元数据缓存的时候,就容易得多了。比如,接下来我要讲到的 liveBrokers 信息,就是 Controller 通过 UpdateMetadataRequest 请求同步给其他 Broker 的 MetadataCache 的。

ControllerStats

顾名思义,它表征的是 Controller 的一些统计信息。目前,源码中定义了两大类统计指标:UncleanLeaderElectionsPerSec 和所有 Controller 事件状态的执行速率与时间。其中,前者是计算 Controller 每秒执行的 Unclean Leader 选举数量,通常情况下,执行 Unclean Leader 选举可能造成数据丢失,一般不建议开启它。一旦开启,你就需要时刻关注这个监控指标的值,确保 Unclean Leader 选举的速率维持在一个很低的水平,否则会出现很多数据丢失的情况。后者是统计所有 Controller 状态的速率和时间信息,单位是毫秒。当前,Controller 定义了很多事件,比如,TopicDeletion 是执行主题删除的 Controller 事件、ControllerChange 是执行 Controller 重选举的事件。ControllerStats 的这个指标通过在每个事件名后拼接字符串 RateAndTimeMs 的方式,为每类 Controller 事件都创建了对应的速率监控指标。由于 Controller 事件有很多种,对应的速率监控指标也有很多,有一些 Controller 事件是需要你额外关注的。举个例子,IsrChangeNotification 事件是标志 ISR 列表变更的事件,如果这个事件经常出现,说明副本的 ISR 列表经常发生变化,而这通常被认为是非正常情况,因此,你最好关注下这个事件的速率监控指标。

offlinePartitionCount

该字段统计集群中所有离线或处于不可用状态的主题分区数量。所谓的不可用状态,就是“Leader=-1”的情况。ControllerContext 中的 updatePartitionStateMetrics 方法根据给定主题分区的当前状态和目标状态,来判断该分区是否是离线状态的分区。如果是,则累加 offlinePartitionCount 字段的值,否则递减该值。该方法首先要判断,此分区所属的主题当前是否处于删除操作的过程中。如果是的话,Kafka 就不能修改这个分区的状态,那么代码什么都不做,直接返回。否则,代码会判断该分区是否要转换到离线状态。如果 targetState 是 OfflinePartition,那么就将 offlinePartitionCount 值加 1,毕竟多了一个离线状态的分区。相反地,如果 currentState 是 offlinePartition,而 targetState 反而不是,那么就将 offlinePartitionCount 值减 1。

shuttingDownBrokerIds

顾名思义,该字段保存所有正在关闭中的 Broker ID 列表。当 Controller 在管理集群 Broker 时,它要依靠这个字段来甄别 Broker 当前是否已关闭,因为处于关闭状态的 Broker 是不适合执行某些操作的,如分区重分配(Reassignment)以及主题删除等。另外,Kafka 必须要为这些关闭中的 Broker 执行很多清扫工作,Controller 定义了一个 onBrokerFailure 方法,它就是用来做这个的。该方法接收一组已终止运行的 Broker ID 列表,首先是更新 Controller 元数据信息,将给定 Broker 从元数据的 replicasOnOfflineDirs 和 shuttingDownBrokerIds 中移除,然后为这组 Broker 执行必要的副本清扫工作,也就是 onReplicasBecomeOffline 方法做的事情。该方法主要依赖于分区状态机和副本状态机来完成对应的工作。在后面的课程中,我们会专门讨论副本状态机和分区状态机,这里你只要简单了解下它要做的事情就行了。后面等我们学完了这两个状态机之后,你可以再看下这个方法的具体实现原理。这个方法的主要目的是把给定的副本标记成 Offline 状态,即不可用状态。具体分为以下这几个步骤:利用分区状态机将给定副本所在的分区标记为 Offline 状态;将集群上所有新分区和 Offline 分区状态变更为 Online 状态;将相应的副本对象状态变更为 Offline。

liveBrokers

该字段保存当前所有运行中的 Broker 对象。每个 Broker 对象就是一个 <id,endpoint,机架信息>的三元组。ControllerContext 中定义了很多方法来管理该字段,如 addLiveBrokersAndEpochs、removeLiveBrokers 和 updateBrokerMetadata 等。我拿 updateBrokerMetadata 方法进行说明,每当新增或移除已有 Broker 时,ZooKeeper 就会更新其保存的 Broker 数据,从而引发 Controller 修改元数据,也就是会调用 updateBrokerMetadata 方法来增减 Broker 列表中的对象。

liveBrokerEpochs

该字段保存所有运行中 Broker 的 Epoch 信息。Kafka 使用 Epoch 数据防止 Zombie Broker,即一个非常老的 Broker 被选举成为 Controller。另外,源码大多使用这个字段来获取所有运行中 Broker 的 ID 序号,liveBrokerEpochs 的 keySet 方法返回 Broker 序号列表,然后从中移除关闭中的 Broker 序号,剩下的自然就是处于运行中的 Broker 序号列表了。

epoch & epochZkVersion

这两个字段一起说,因为它们都有“epoch”字眼,放在一起说,可以帮助你更好地理解两者的区别。epoch 实际上就是 ZooKeeper 中 /controller_epoch 节点的值,你可以认为它就是 Controller 在整个 Kafka 集群的版本号,而 epochZkVersion 实际上是 /controller_epoch 节点的 dataVersion 值。Kafka 使用 epochZkVersion 来判断和防止 Zombie Controller。这也就是说,原先在老 Controller 任期内的 Controller 操作在新 Controller 不能成功执行,因为新 Controller 的 epochZkVersion 要比老 Controller 的大。另外,你可能会问:“这里的两个 Epoch 和上面的 liveBrokerEpochs 有啥区别呢?”实际上,这里的两个 Epoch 值都是属于 Controller 侧的数据,而 liveBrokerEpochs 是每个 Broker 自己的 Epoch 值。

allTopics

该字段保存集群上所有的主题名称。每当有主题的增减,Controller 就要更新该字段的值。比如 Controller 有个 processTopicChange 方法,从名字上来看,它就是处理主题变更的。

partitionAssignments

该字段保存所有主题分区的副本分配情况。在我看来,这是 Controller 最重要的元数据了。事实上,你可以从这个字段衍生、定义很多实用的方法,来帮助 Kafka 从各种维度获取数据。比如,如果 Kafka 要获取某个 Broker 上的所有分区,

Controller如何管理发送请求

我们就正式进入到 Controller 请求发送管理部分的学习。你可能会问:“Controller 也会给 Broker 发送请求吗?”当然!Controller 会给集群中的所有 Broker(包括它自己所在的 Broker)机器发送网络请求。发送请求的目的,是让 Broker 执行相应的指令。当前,Controller 只会向 Broker 发送三类请求,分别是 LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest。注意,这里我使用的是“当前”!我只是说,目前仅有这三类,不代表以后不会有变化。事实上,我几乎可以肯定,以后能发送的 RPC 协议种类一定会变化的。因此,你需要掌握请求发送的原理。毕竟,所有请求发送都是通过相同的机制完成的。

LeaderAndIsrRequest:最主要的功能是,告诉 Broker 相关主题各个分区的 Leader 副本位于哪台 Broker 上、ISR 中的副本都在哪些 Broker 上。在我看来,它应该被赋予最高的优先级,毕竟,它有令数据类请求直接失效的本领。试想一下,如果这个请求中的 Leader 副本变更了,之前发往老的 Leader 的 PRODUCE 请求是不是全部失效了?因此,我认为它是非常重要的控制类请求。

StopReplicaRequest:告知指定 Broker 停止它上面的副本对象,该请求甚至还能删除副本底层的日志数据。这个请求主要的使用场景,是分区副本迁移和删除主题。在这两个场景下,都要涉及停掉 Broker 上的副本操作。

UpdateMetadataRequest:顾名思义,该请求会更新 Broker 上的元数据缓存。集群上的所有元数据变更,都首先发生在 Controller 端,然后再经由这个请求广播给集群上的所有 Broker。在我刚刚分享的案例中,正是因为这个请求被处理得不及时,才导致集群 Broker 无法获取到最新的元数据信息。

现在,社区越来越倾向于将重要的数据结构源代码从服务器端的 core 工程移动到客户端的 clients 工程中。这三类请求 Java 类的定义就封装在 clients 中,它们的抽象基类是 AbstractControlRequest 类,这个类定义了这三类请求的公共字段。

1 public abstract class AbstractControlRequest extends AbstractRequest {
2     public static final long UNKNOWN_BROKER_EPOCH = -1L;
3     public static abstract class Builder<T extends AbstractRequest> extends AbstractRequest.Builder<T> {
4         protected final int controllerId;
5         protected final int controllerEpoch;
6         protected final long brokerEpoch;
7         ......
8 }

区别于其他的数据类请求,抽象类请求必然包含 3 个字段。controllerId:Controller 所在的 Broker ID。controllerEpoch:Controller 的版本信息。brokerEpoch:目标 Broker 的 Epoch。后面这两个 Epoch 字段用于隔离 Zombie Controller 和 Zombie Broker,以保证集群的一致性。

RequestSendThread

Kafka 源码非常喜欢生产者 - 消费者模式。该模式的好处在于,解耦生产者和消费者逻辑,分离两者的集中性交互。学完了“请求处理”模块,现在,你一定很赞同这个说法吧。还记得 Broker 端的 SocketServer 组件吗?它就在内部定义了一个线程共享的请求队列:它下面的 Processor 线程扮演 Producer,而 KafkaRequestHandler 线程扮演 Consumer。对于 Controller 而言,源码同样使用了这个模式:它依然是一个线程安全的阻塞队列,Controller 事件处理线程负责向这个队列写入待发送的请求,而一个名为 RequestSendThread 的线程负责执行真正的请求发送。Controller 会为集群中的每个 Broker 都创建一个对应的 RequestSendThread 线程。Broker 上的这个线程,持续地从阻塞队列中获取待发送的请求。那么,Controller 往阻塞队列上放什么数据呢?这其实是由源码中的 QueueItem 类定义的。

每个 QueueItem 的核心字段都是 AbstractControlRequest.Builder 对象。你基本上可以认为,它就是阻塞队列上 AbstractControlRequest 类型。需要注意的是这里的“<:”符号,它在 Scala 中表示上边界的意思,即字段 request 必须是 AbstractControlRequest 的子类,也就是上面说到的那三类请求。这也就是说,每个 QueueItem 实际保存的都是那三类请求中的其中一类。如果使用一个 BlockingQueue 对象来保存这些 QueueItem,那么,代码就实现了一个请求阻塞队列。这就是 RequestSendThread 类做的事情。接下来,我们就来学习下 RequestSendThread 类的定义。我给一些主要的字段添加了注释。

 1 class RequestSendThread(val controllerId: Int, // Controller所在Broker的Id
 2     val controllerContext: ControllerContext, // Controller元数据信息
 3     val queue: BlockingQueue[QueueItem], // 请求阻塞队列
 4     val networkClient: NetworkClient, // 用于执行发送的网络I/O类
 5     val brokerNode: Node, // 目标Broker节点
 6     val config: KafkaConfig, // Kafka配置信息
 7     val time: Time, 
 8     val requestRateAndQueueTimeMetrics: Timer,
 9     val stateChangeLogger: StateChangeLogger,
10     name: String) extends ShutdownableThread(name = name) {
11     ......
12 }

其实,RequestSendThread 最重要的是它的 doWork 方法,也就是执行线程逻辑的方法:

总体上来看,doWork 的逻辑很直观。它的主要作用是从阻塞队列中取出待发送的请求,然后把它发送出去,之后等待 Response 的返回。在等待 Response 的过程中,线程将一直处于阻塞状态。当接收到 Response 之后,调用 callback 执行请求处理完成后的回调逻辑。需要注意的是,RequestSendThread 线程对请求发送的处理方式与 Broker 处理请求不太一样。它调用的 sendAndReceive 方法在发送完请求之后,会原地进入阻塞状态,等待 Response 返回。只有接收到 Response,并执行完回调逻辑之后,该线程才能从阻塞队列中取出下一个待发送请求进行处理。

ControllerChannelManager

了解了 RequestSendThread 线程的源码之后,我们进入到 ControllerChannelManager 类的学习。这个类和 RequestSendThread 是合作共赢的关系。在我看来,它有两大类任务。管理 Controller 与集群 Broker 之间的连接,并为每个 Broker 创建 RequestSendThread 线程实例;将要发送的请求放入到指定 Broker 的阻塞队列中,等待该 Broker 专属的 RequestSendThread 线程进行处理。由此可见,它们是紧密相连的。ControllerChannelManager 类最重要的数据结构是 brokerStateInfo。

这是一个 HashMap 类型,Key 是 Integer 类型,其实就是集群中 Broker 的 ID 信息,而 Value 是一个 ControllerBrokerStateInfo。你可能不太清楚 ControllerBrokerStateInfo 类是什么,我先解释一下。它本质上是一个 POJO 类,仅仅是承载若干数据结构的容器。

它有三个非常关键的字段。

brokerNode:目标 Broker 节点对象,里面封装了目标 Broker 的连接信息,比如主机名、端口号等。

messageQueue:请求消息阻塞队列。你可以发现,Controller 为每个目标 Broker 都创建了一个消息队列。

requestSendThread:Controller 使用这个线程给目标 Broker 发送请求。

你一定要记住这三个字段,因为它们是实现 Controller 发送请求的关键因素。为什么呢?我们思考一下,如果 Controller 要给 Broker 发送请求,肯定需要解决三个问题:发给谁?发什么?怎么发?“发给谁”就是由 brokerNode 决定的;messageQueue 里面保存了要发送的请求,因而解决了“发什么”的问题;最后的“怎么发”就是依赖 requestSendThread 变量实现的。

Controller 主要通过 ControllerChannelManager 类来实现与其他 Broker 之间的请求发送。其中,ControllerChannelManager 类中定义的 RequestSendThread 是主要的线程实现类,用于实际发送请求给集群 Broker。除了 RequestSendThread 之外,ControllerChannelManager 还定义了相应的管理方法,如添加 Broker、移除 Broker 等。通过这些管理方法,Controller 在集群扩缩容时能够快速地响应到这些变化,完成对应 Broker 连接的创建与销毁。

ControllerEventManager

在 0.11.0.0 版本之前,Controller 组件的源码非常复杂。集群元数据信息在程序中同时被多个线程访问,因此,源码里有大量的 Monitor 锁、Lock 锁或其他线程安全机制,这就导致,这部分代码读起来晦涩难懂,改动起来也困难重重,因为你根本不知道,变动了这个线程访问的数据,会不会影响到其他线程。同时,开发人员在修复 Controller Bug 时,也非常吃力。鉴于这个原因,自 0.11.0.0 版本开始,社区陆续对 Controller 代码结构进行了改造。其中非常重要的一环,就是将多线程并发访问的方式改为了单线程的事件队列方式。这里的单线程,并非是指 Controller 只有一个线程了,而是指对局部状态的访问限制在一个专属线程上,即让这个特定线程排他性地操作 Controller 元数据信息。这样一来,整个组件代码就不必担心多线程访问引发的各种线程安全问题了,源码也可以抛弃各种不必要的锁机制,最终大大简化了 Controller 端的代码结构。这部分源码非常重要,它能够帮助你掌握 Controller 端处理各类事件的原理,这将极大地提升你在实际场景中处理 Controller 各类问题的能力。因此,我建议你多读几遍,彻底了解 Controller 是怎么处理各种事件的。

基本术语和概念

Controller 端有多个线程向事件队列写入不同种类的事件,比如,ZooKeeper 端注册的 Watcher 线程、KafkaRequestHandler 线程、Kafka 定时任务线程,等等。而在事件队列的另一端,只有一个名为 ControllerEventThread 的线程专门负责“消费”或处理队列中的事件。这就是所谓的单线程事件队列模型。参与实现这个模型的源码类有 4 个。

ControllerEventProcessor:Controller 端的事件处理器接口。

ControllerEvent:Controller 事件,也就是事件队列中被处理的对象。

ControllerEventManager:事件处理器,用于创建和管理 ControllerEventThread。

ControllerEventThread:专属的事件处理线程,唯一的作用是处理不同种类的 ControllEvent。这个类是 ControllerEventManager 类内部定义的线程类。

ControllerEventProcessor

这个接口位于 controller 包下的 ControllerEventManager.scala 文件中。它定义了一个支持普通处理和抢占处理 Controller 事件的接口,该接口定义了两个方法,分别是 process 和 preempt。process:接收一个 Controller 事件,并进行处理。preempt:接收一个 Controller 事件,并抢占队列之前的事件进行优先处理。目前,在 Kafka 源码中,KafkaController 类是 Controller 组件的功能实现类,它也是 ControllerEventProcessor 接口的唯一实现类。对于这个接口,你要重点掌握 process 方法的作用,因为它是实现 Controller 事件处理的主力方法。你要了解 process 方法处理各类 Controller 事件的代码结构是什么样的,而且还要能够准确地找到处理每类事件的子方法。至于 preempt 方法,你仅需要了解,Kafka 使用它实现某些高优先级事件的抢占处理即可,毕竟,目前在源码中只有两类事件(ShutdownEventThread 和 Expire)需要抢占式处理,出镜率不是很高。

ControllerEvent

这就是前面说到的 Controller 事件,在源码中对应的就是 ControllerEvent 接口。该接口定义在 KafkaController.scala 文件中,本质上是一个 trait 类型,每个 ControllerEvent 都定义了一个状态。Controller 在处理具体的事件时,会对状态进行相应的变更。这个状态是由源码文件 ControllerState.scala 中的抽象类 ControllerState 定义的,每类 ControllerState 都定义一个 value 值,表示 Controller 状态的序号,从 0 开始。另外,rateAndTimeMetricName 方法是用于构造 Controller 状态速率的监控指标名称的。比如,TopicChange 是一类 ControllerState,用于表示主题总数发生了变化。为了监控这类状态变更速率,代码中的 rateAndTimeMetricName 方法会定义一个名为 TopicChangeRateAndTimeMs 的指标。当然,并非所有的 ControllerState 都有对应的速率监控指标,比如,表示空闲状态的 Idle 就没有对应的指标。

ControllerEventManager

在 Kafka 中,Controller 事件处理器代码位于 controller 包下的 ControllerEventManager.scala 文件下。该文件主要由 4 个部分组成。

ControllerEventManager Object:保存一些字符串常量,比如线程名字。

ControllerEventProcessor:前面讲过的事件处理器接口,目前只有 KafkaController 实现了这个接口。

QueuedEvent:表征事件队列上的事件对象。

ControllerEventManager Class:ControllerEventManager 的伴生类,主要用于创建和管理事件处理线程和事件队列。就像我前面说的,这个类中定义了重要的 ControllerEventThread 线程类,还有一些其他值得我们学习的重要方法。

QueuedEvent

每个 QueuedEvent 对象实例都裹挟了一个 ControllerEvent。另外,每个 QueuedEvent 还定义了 process、preempt 和 awaitProcessing 方法,分别表示处理事件、以抢占方式处理事件,以及等待事件处理。其中,process 方法和 preempt 方法的实现原理,就是调用给定 ControllerEventProcessor 接口的 process 和 preempt 方法,非常简单。在 QueuedEvent 对象中,我们再一次看到了 CountDownLatch 的身影。Kafka 源码非常喜欢用 CountDownLatch 来做各种条件控制,比如用于侦测线程是否成功启动、成功关闭,等等。

ControllerEventThread

了解了 QueuedEvent,我们来看下消费它们的 ControllerEventThread 类。首先是这个类的定义代码:

1 class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
2   logIdent = s"[ControllerEventThread controllerId=$controllerId] "
3   ......
4 }

这个类就是一个普通的线程类,继承了 ShutdownableThread 基类,而后者是 Kafka 为很多线程类定义的公共父类。该父类是 Java Thread 类的子类,其线程逻辑方法 run 的主要代码如下:

 1 def doWork(): Unit
 2 override def run(): Unit = {
 3   ......
 4   try {
 5     while (isRunning)
 6       doWork()
 7   } catch {
 8     ......
 9   }
10   ......
11 }

可见,这个父类会循环地执行 doWork 方法的逻辑,而该方法的具体实现则交由子类来完成。作为 Controller 唯一的事件处理线程,我们要时刻关注这个线程的运行状态。因此,我们必须要知道这个线程在 JVM 上的名字,这样后续我们就能有针对性地对其展开监控。这个线程的名字是由 ControllerEventManager Object 中 ControllerEventThreadName 变量定义的。

现在我们看看 ControllerEventThread 类的 doWork 是如何实现的。

 1 override def doWork(): Unit = {
 2   // 从事件队列中获取待处理的Controller事件,否则等待
 3   val dequeued = queue.take()
 4   dequeued.event match {
 5     // 如果是关闭线程事件,什么都不用做。关闭线程由外部来执行
 6     case ShutdownEventThread =>
 7     case controllerEvent =>
 8       _state = controllerEvent.state
 9       // 更新对应事件在队列中保存的时间
10       eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)
11       try {
12         def process(): Unit = dequeued.process(processor)
13         // 处理事件,同时计算处理速率
14         rateAndTimeMetrics.get(state) match {
15           case Some(timer) => timer.time { process() }
16           case None => process()
17         }
18       } catch {
19         case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
20       }
21       _state = ControllerState.Idle
22   }
23 }

大体上看,执行逻辑很简单。首先是调用 LinkedBlockingQueue 的 take 方法,去获取待处理的 QueuedEvent 对象实例。注意,这里用的是 take 方法,这说明,如果事件队列中没有 QueuedEvent,那么,ControllerEventThread 线程将一直处于阻塞状态,直到事件队列上插入了新的待处理事件。一旦拿到 QueuedEvent 事件后,线程会判断是否是 ShutdownEventThread 事件。当 ControllerEventManager 关闭时,会显式地向事件队列中塞入 ShutdownEventThread,表明要关闭 ControllerEventThread 线程。如果是该事件,那么 ControllerEventThread 什么都不用做,毕竟要关闭这个线程了。相反地,如果是其他的事件,就调用 QueuedEvent 的 process 方法执行对应的处理逻辑,同时计算事件被处理的速率。

Controller选举

在一个 Kafka 集群中,某段时间内只能有一台 Broker 被选举为 Controller。随着时间的推移,可能会有不同的 Broker 陆续担任过 Controller 的角色,但是在某一时刻,Controller 只能由一个 Broker 担任。那选择哪个 Broker 充当 Controller 呢?当前,Controller 的选举过程依赖 ZooKeeper 完成。ZooKeeper 除了扮演集群元数据的“真理之源”角色,还定义了 /controller 临时节点(Ephemeral Node),以协助完成 Controller 的选举。下面这段代码展示的是一个双 Broker 的 Kafka 集群上的 ZooKeeper 中 /controller 节点:

 1 {"version":1,"brokerid":0,"timestamp":"1585098432431"}
 2 cZxid = 0x1a
 3 ctime = Wed Mar 25 09:07:12 CST 2020
 4 mZxid = 0x1a
 5 mtime = Wed Mar 25 09:07:12 CST 2020
 6 pZxid = 0x1a
 7 cversion = 0
 8 dataVersion = 0
 9 aclVersion = 0
10 ephemeralOwner = 0x100002d3a1f0000
11 dataLength = 54
12 numChildren = 0

有两个地方的内容,你要重点关注一下。Controller Broker Id 是 0,表示序号为 0 的 Broker 是集群 Controller。ephemeralOwner 字段不是 0x0,说明这是一个临时节点。既然是临时节点,那么,一旦 Broker 与 ZooKeeper 的会话终止,该节点就会消失。Controller 选举就依靠了这个特性。每个 Broker 都会监听 /controller 节点随时准备应聘 Controller 角色。

集群上所有的 Broker 都在实时监听 ZooKeeper 上的这个节点。这里的“监听”有两个含义。监听这个节点是否存在。倘若发现这个节点不存在,Broker 会立即“抢注”该节点,即创建 /controller 节点。创建成功的那个 Broker,即当选为新一届的 Controller。监听这个节点数据是否发生了变更。同样,一旦发现该节点的内容发生了变化,Broker 也会立即启动新一轮的 Controller 选举。

KafkaController 文件的代码结构如下图所示:

整体而言,该文件大致由五部分组成。

选举触发器(ElectionTrigger):这里的选举不是指 Controller 选举,而是指主题分区副本的选举,即为哪些分区选择 Leader 副本。后面在学习副本管理器和分区管理器时,我们会讲到它。

KafkaController Object:KafkaController 伴生对象,仅仅定义了一些常量和回调函数类型。

ControllerEvent:定义 Controller 事件类型。上节课我们详细学习过 Controller 事件以及基于事件的单线程事件队列模型。这部分的代码看着很多,但实际上都是千篇一律的。你看懂了一个事件的定义,其他的也就不在话下了。

各种 ZooKeeper 监听器:定义 ZooKeeper 监听器,去监听 ZooKeeper 中各个节点的变更。今天,我们重点关注监听 /controller 节点的那个监听器。

KafkaController Class:定义 KafkaController 类以及实际的处理逻辑。这是我们今天的重点学习对象。

KafkaController 类

这个类大约有 1900 行代码,里面定义了非常多的变量和方法。这些方法大多是处理不同 Controller 事件的。后面讲到选举流程的时候,我会挑一些有代表性的来介绍。我希望你能举一反三,借此吃透其他方法的代码。毕竟,它们做的事情大同小异,至少代码风格非常相似。在学习重要的方法之前,我们必须要先掌握 KafkaController 类的定义。接下来,我们从 4 个维度来进行学习,分别是原生字段、辅助字段。弄明白了这些字段的含义之后,再去看操作这些字段的方法,会更加有的放矢,理解起来也会更加容易。

原生字段首先来看原生字段。所谓的原生字段,是指在创建一个 KafkaController 实例时,需要指定的字段。先来看下 KafkaController 类的定义代码:

 1 // 字段含义:
 2 // config:Kafka配置信息,通过它,你能拿到Broker端所有参数的值
 3 // zkClient:ZooKeeper客户端,Controller与ZooKeeper的所有交互均通过该属性完成
 4 // time:提供时间服务(如获取当前时间)的工具类
 5 // metrics:实现指标监控服务(如创建监控指标)的工具类
 6 // initialBrokerInfo:Broker节点信息,包括主机名、端口号,所用监听器等
 7 // initialBrokerEpoch:Broker Epoch值,用于隔离老Controller发送的请求
 8 // tokenManager:实现Delegation token管理的工具类。Delegation token是一种轻量级的认证机制
 9 // threadNamePrefix:Controller端事件处理线程名字前缀
10 class KafkaController(val config: KafkaConfig,
11                       zkClient: KafkaZkClient,
12                       time: Time,
13                       metrics: Metrics,
14                       initialBrokerInfo: BrokerInfo,
15                       initialBrokerEpoch: Long,
16                       tokenManager: DelegationTokenManager,
17                       threadNamePrefix: Option[String] = None)
18   extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
19   ......
20 }

KafkaController 实现了 ControllerEventProcessor 接口,因而也就实现了处理 Controller 事件的 process 方法。这里面比较重要的字段有 3 个。

config:KafkaConfig 类实例,里面封装了 Broker 端所有参数的值。

zkClient:ZooKeeper 客户端类,定义了与 ZooKeeper 交互的所有方法。

initialBrokerEpoch:Controller 所在 Broker 的 Epoch 值。Kafka 使用它来确保 Broker 不会处理老 Controller 发来的请求。

其他字段要么是像 time、metrics 一样,是工具类字段,要么是像 initialBrokerInfo、tokenManager 字段一样,使用场景很有限,我就不展开讲了。

辅助字段除了原生字段之外,KafkaController 还定义了很多辅助字段,帮助实现 Controller 的各类功能。我们来看一些重要的辅助字段:

 1 ......
 2 // 集群元数据类,保存集群所有元数据
 3 val controllerContext = new ControllerContext
 4 // Controller端通道管理器类,负责Controller向Broker发送请求
 5 var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
 6   stateChangeLogger, threadNamePrefix)
 7 // 线程调度器,当前唯一负责定期执行Leader重选举
 8 private[controller] val kafkaScheduler = new KafkaScheduler(1)
 9 // Controller事件管理器,负责管理事件处理线程
10 private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
11   controllerContext.stats.rateAndTimeMetrics)
12 ......
13 // 副本状态机,负责副本状态转换
14 val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
15   new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
16 // 分区状态机,负责分区状态转换
17 val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
18   new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
19 // 主题删除管理器,负责删除主题及日志
20 val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
21   partitionStateMachine, new ControllerDeletionClient(this, zkClient))
22 ......

其中,有 7 个字段是重中之重。controllerContext:集群元数据类,保存集群所有元数据。controllerChannelManager:Controller 端通道管理器类,负责 Controller 向 Broker 发送请求。kafkaScheduler:线程调度器,当前唯一负责定期执行分区重平衡 Leader 选举。eventManager:Controller 事件管理器,负责管理事件处理线程。replicaStateMachine:副本状态机,负责副本状态转换。partitionStateMachine:分区状态机,负责分区状态转换。topicDeletionManager:主题删除管理器,负责删除主题及日志。

Controller 选举流程

说完了 ControllerChangeHandler 源码,我们来看下 Controller 的选举。所谓的 Controller 选举,是指 Kafka 选择集群中一台 Broker 行使 Controller 职责。整个选举过程分为两个步骤:触发选举和开始选举。

触发选举

我先用一张图展示下可能触发 Controller 选举的三个场景。

这三个场景是:集群从零启动时;Broker 侦测 /controller 节点消失时;Broker 侦测到 /controller 节点数据发生变更时。这三个场景殊途同归,最后都要执行选举 Controller 的动作。我来一一解释下这三个场景,然后再介绍选举 Controller 的具体操作。

场景一:集群从零启动集群首次启动时,Controller 尚未被选举出来。于是,Broker 启动后,首先将 Startup 这个 ControllerEvent 写入到事件队列中,然后启动对应的事件处理线程和 ControllerChangeHandler ZooKeeper 监听器,最后依赖事件处理线程进行 Controller 的选举。在源码中,KafkaController 类的 startup 方法就是做这些事情的。当 Broker 启动时,它会调用这个方法启动 ControllerEventThread 线程。值得注意的是,每个 Broker 都需要做这些事情,不是说只有 Controller 所在的 Broker 才需要执行这些逻辑。

首先,startup 方法会注册 ZooKeeper 状态变更监听器,用于监听 Broker 与 ZooKeeper 之间的会话是否过期。接着,写入 Startup 事件到事件队列,然后启动 ControllerEventThread 线程,开始处理事件队列中的 Startup 事件。接下来,我们来学习下 KafkaController 的 process 方法处理 Startup 事件的方法:

从这段代码可知,process 方法调用 processStartup 方法去处理 Startup 事件。而 processStartup 方法又会调用 zkClient 的 registerZNodeChangeHandlerAndCheckExistence 方法注册 ControllerChangeHandler 监听器。值得注意的是,虽然前面的三个场景是并列的关系,但实际上,后面的两个场景必须要等场景一的这一步成功执行之后,才能被触发。这三种场景都要选举 Controller,因此,我们最后统一学习 elect 方法的代码实现。总体来说,集群启动时,Broker 通过向事件队列“塞入”Startup 事件的方式,来触发 Controller 的竞选。

场景二:/controller 节点消失Broker 检测到 /controller 节点消失时,就意味着,此时整个集群中没有 Controller。因此,所有检测到 /controller 节点消失的 Broker,都会立即调用 elect 方法执行竞选逻辑。你可能会问:“Broker 是怎么侦测到 ZooKeeper 上的这一变化的呢?”实际上,这是 ZooKeeper 监听器提供的功能,换句话说,这是 Apache ZooKeeper 自己实现的功能,所以我们才说,Kafka 依赖 ZooKeeper 完成 Controller 的选举。讲到这里,我说点题外话,社区最近正在酝酿彻底移除 ZooKeeper 依赖。具体到 Controller 端的变化,就是在 Kafka 内部实现一个类似于 Raft 的共识算法来选举 Controller。我会在后面的特别放送里详细讲一下社区移除 ZooKeeper 的全盘计划。

场景三:/controller 节点数据变更Broker 检测到 /controller 节点数据发生变化,通常表明,Controller“易主”了,这就分为两种情况:

如果 Broker 之前是 Controller,那么该 Broker 需要首先执行卸任操作,然后再尝试竞选;

如果 Broker 之前不是 Controller,那么,该 Broker 直接去竞选新 Controller。

具体到代码层面,maybeResign 方法形象地说明了这两种情况。你要注意方法中的 maybe 字样,这表明,Broker 可能需要执行卸任操作,也可能不需要。Kafka 源码非常喜欢用 maybe*** 来命名方法名,以表示那些在特定条件下才需要执行的逻辑。以下是 maybeResign 的实现:

 1 private def maybeResign(): Unit = {
 2   // 非常关键的一步!这是判断是否需要执行卸任逻辑的重要依据!
 3   // 判断该Broker之前是否是Controller
 4   val wasActiveBeforeChange = isActive
 5   // 注册ControllerChangeHandler监听器  
 6   zkClient.registerZNodeChangeHandlerAndCheckExistence(
 7     controllerChangeHandler)
 8   // 获取当前集群Controller所在的Broker Id,如果没有Controller则返回-1
 9   activeControllerId = zkClient.getControllerId.getOrElse(-1)
10   // 如果该Broker之前是Controller但现在不是了
11   if (wasActiveBeforeChange && !isActive) {
12     onControllerResignation() // 执行卸任逻辑
13   }
14 }

代码的第一行非常关键,它是决定是否需要执行卸任的重要依据。毕竟,如果 Broker 之前不是 Controller,那何来“卸任”一说呢?之后代码要注册 ControllerChangeHandler 监听器,获取当前集群 Controller 所在的 Broker ID,如果没有 Controller,则返回 -1。有了这些数据之后,maybeResign 方法需要判断该 Broker 是否之前是 Controller 但现在不是了。如果是这种情况的话,则调用 onControllerResignation 方法执行 Controller 卸任逻辑。说到“卸任”,你可能会问:“卸任逻辑是由哪个方法执行的呢?”实际上,这是由 onControllerResignation 方法执行的,它主要是用于清空各种数据结构的值、取消 ZooKeeper 监听器、关闭各种状态机以及管理器,等等。

选举 Controller

讲完了触发场景,接下来,我们就要学习 Controller 选举的源码了。前面说过了,这三种选举场景最后都会调用 elect 方法来执行选举逻辑。我们来看下它的实现:

 1 private def elect(): Unit = {
 2     // 第1步:获取当前Controller所在Broker的序号,如果Controller不存在,显式标记为-1
 3     activeControllerId = zkClient.getControllerId.getOrElse(-1)
 4 
 5     // 第2步:如果当前Controller已经选出来了,直接返回即可
 6     if (activeControllerId != -1) {
 7       debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
 8       return
 9     }
10 
11     try {
12       // 第3步:注册Controller相关信息
13       // 主要是创建/controller节点
14       val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
15       controllerContext.epoch = epoch
16       controllerContext.epochZkVersion = epochZkVersion
17       activeControllerId = config.brokerId
18 
19       info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
20         s"and epoch zk version is now ${controllerContext.epochZkVersion}")
21 
22       // 第4步:执行当选Controller的后续逻辑
23       onControllerFailover()
24     } catch {
25       case e: ControllerMovedException =>
26         maybeResign()
27 
28         if (activeControllerId != -1)
29           debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
30         else
31           warn("A controller has been elected but just resigned, this will result in another round of election", e)
32 
33       case t: Throwable =>
34         error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
35           s"Trigger controller movement immediately", t)
36         triggerControllerMove()
37     }
38   }

该方法首先检查 Controller 是否已经选出来了。要知道,集群中的所有 Broker 都要执行这些逻辑,因此,非常有可能出现某些 Broker 在执行 elect 方法时,Controller 已经被选出来的情况。如果 Controller 已经选出来了,那么,自然也就不用再做什么了。相反地,如果 Controller 尚未被选举出来,那么,代码会尝试创建 /controller 节点去抢注 Controller。一旦抢注成功,就调用 onControllerFailover 方法,执行选举成功后的动作。这些动作包括注册各类 ZooKeeper 监听器、删除日志路径变更和 ISR 副本变更通知事件、启动 Controller 通道管理器,以及启动副本状态机和分区状态机。如果抢注失败了,代码会抛出 ControllerMovedException 异常。这通常表明 Controller 已经被其他 Broker 抢先占据了,那么,此时代码调用 maybeResign 方法去执行卸任逻辑。

Controller作用

作为核心组件,Controller 提供的功能非常多。除了集群成员管理,主题管理也是一个极其重要的功能。今天,我就带你深入了解下它们的实现代码。可以说,这是 Controller 最核心的两个功能,它们几乎涉及到了集群元数据中的所有重要数据。掌握了这些,之后你在探索 Controller 的其他代码时,会更加游刃有余。

集群成员管理

首先,我们来看 Controller 管理集群成员部分的代码。这里的成员管理包含两个方面:

成员数量的管理,主要体现在新增成员和移除现有成员;

单个成员的管理,如变更单个 Broker 的数据等。

成员数量管理

每个 Broker 在启动的时候,会在 ZooKeeper 的 /brokers/ids 节点下创建一个名为 broker.id 参数值的临时节点。举个例子,假设 Broker 的 broker.id 参数值设置为 1001,那么,当 Broker 启动后,你会在 ZooKeeper 的 /brokers/ids 下观测到一个名为 1001 的子节点。该节点的内容包括了 Broker 配置的主机名、端口号以及所用监听器的信息(注意:这里的监听器和上面说的 ZooKeeper 监听器不是一回事)。当该 Broker 正常关闭或意外退出时,ZooKeeper 上对应的临时节点会自动消失。基于这种临时节点的机制,Controller 定义了 BrokerChangeHandler 监听器,专门负责监听 /brokers/ids 下的子节点数量变化。一旦发现新增或删除 Broker,/brokers/ids 下的子节点数目一定会发生变化。这会被 Controller 侦测到,进而触发 BrokerChangeHandler 的处理方法,即 handleChildChange 方法。我给出 BrokerChangeHandler 的代码。

该方法的作用就是向 Controller 事件队列写入一个 BrokerChange 事件。事实上,Controller 端定义的所有 Handler 的处理逻辑,都是向事件队列写入相应的 ControllerEvent,真正的事件处理逻辑位于 KafkaController 类的 process 方法中。

成员信息管理

了解了 Controller 管理集群成员数量的机制之后,接下来,我们要重点学习下 Controller 如何监听 Broker 端信息的变更,以及具体的操作。和管理集群成员类似,Controller 也是通过 ZooKeeper 监听器的方式来应对 Broker 的变化。这个监听器就是 BrokerModificationsHandler。一旦 Broker 的信息发生变更,该监听器的 handleDataChange 方法就会被调用,向事件队列写入 BrokerModifications 事件。KafkaController 类的 processBrokerModification 方法负责处理这类事件。该方法首先获取 ZooKeeper 上最权威的 Broker 数据,将其与元数据缓存上的数据进行比对。如果发现两者不一致,就会更新元数据缓存,同时调用 onBrokerUpdate 方法执行更新逻辑。

主题管理

除了维护集群成员之外,Controller 还有一个重要的任务,那就是对所有主题进行管理,主要包括主题的创建、变更与删除。掌握了前面集群成员管理的方法,在学习下面的内容时会轻松很多。因为它们的实现机制是一脉相承的,几乎没有任何差异。

主题创建 / 变更

我们重点学习下主题是如何被创建的。实际上,主题变更与创建是相同的逻辑,因此,源码使用了一套监听器统一处理这两种情况。你一定使用过 Kafka 的 kafka-topics 脚本或 AdminClient 创建主题吧?实际上,这些工具仅仅是向 ZooKeeper 对应的目录下写入相应的数据而已,那么,Controller,或者说 Kafka 集群是如何感知到新创建的主题的呢?这当然要归功于监听主题路径的 ZooKeeper 监听器:TopicChangeHandler。

主题删除

和主题创建或变更类似,删除主题也依赖 ZooKeeper 监听器完成。Controller 定义了 TopicDeletionHandler,用它来实现对删除主题的监听,代码如下:

1 class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
2   // ZooKeeper节点:/admin/delete_topics
3   override val path: String = DeleteTopicsZNode.path
4   // 向事件队列写入TopicDeletion事件
5   override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
6 }

这里的 DeleteTopicsZNode.path 指的是 /admin/delete_topics 节点。目前,无论是 kafka-topics 脚本,还是 AdminClient,删除主题都是在 /admin/delete_topics 节点下创建名为待删除主题名的子节点。比如,如果我要删除 test-topic 主题,那么,Kafka 的删除命令仅仅是在 ZooKeeper 上创建 /admin/delete_topics/test-topic 节点。一旦监听到该节点被创建,TopicDeletionHandler 的 handleChildChange 方法就会被触发,Controller 会向事件队列写入 TopicDeletion 事件。处理 TopicDeletion 事件的方法是 processTopicDeletion,代码如下:

 1 private def processTopicDeletion(): Unit = {
 2   if (!isActive) return
 3   // 从ZooKeeper中获取待删除主题列表
 4   var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
 5   debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
 6   // 找出不存在的主题列表
 7   val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
 8   if (nonExistentTopics.nonEmpty) {
 9     warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
10     zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
11   }
12   topicsToBeDeleted --= nonExistentTopics
13   // 如果delete.topic.enable参数设置成true
14   if (config.deleteTopicEnable) {
15     if (topicsToBeDeleted.nonEmpty) {
16       info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
17       topicsToBeDeleted.foreach { topic =>
18         val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
19         if (partitionReassignmentInProgress)
20           topicDeletionManager.markTopicIneligibleForDeletion(
21             Set(topic), reason = "topic reassignment in progress")
22       }
23       // 将待删除主题插入到删除等待集合交由TopicDeletionManager处理
24       topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
25     }
26   } else { // 不允许删除主题
27     info(s"Removing $topicsToBeDeleted since delete topic is disabled")
28     // 清除ZooKeeper下/admin/delete_topics下的子节点
29     zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
30   }
31 }

首先,代码从 ZooKeeper 的 /admin/delete_topics 下获取子节点列表,即待删除主题列表。

之后,比对元数据缓存中的主题列表,获知压根不存在的主题列表。如果确实有不存在的主题,删除 /admin/delete_topics 下对应的子节点就行了。同时,代码会更新待删除主题列表,将这些不存在的主题剔除掉。

接着,代码会检查 Broker 端参数 delete.topic.enable 的值。如果该参数为 false,即不允许删除主题,代码就会清除 ZooKeeper 下的对应子节点,不会做其他操作。反之,代码会遍历待删除主题列表,将那些正在执行分区迁移的主题暂时设置成“不可删除”状态。

最后,把剩下可以删除的主题交由 TopicDeletionManager,由它执行真正的删除逻辑。这里的 TopicDeletionManager 是 Kafka 专门负责删除主题的管理器,下节课我会详细讲解它的代码实现。

总结

以后关于kafka系列的总结大部分来自Geek Time的课件,大家可以自行关键字搜索。

原文地址:https://www.cnblogs.com/boanxin/p/13618431.html