【原创】kafka controller源代码分析(一)

Kafka集群中的一个broker会被作为controller负责管理分区和副本的状态以及执行类似于重分配分区之类的管理任务。如果当前的controller失败了,会从剩下的broker中选出新的controller。

一、PartitionLeaderSelector.scala
顾名思义就是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。
    该文件中还定义了五种leader选举器,在详细介绍每种选举器之前,我们先说下分区都有哪些状态。Kafka定义了4中分区:
  • NonExistentPartition —— 这个状态表示该分区要么没有被创建过或曾经被创建过但后面被删除了
  • NewPartition —— 分区创建之后就处于NewPartition状态。在这个状态中,分区应该已经分配了副本,但是还没有选举出leader和ISR
  • OnlinePartition —— 一旦分区的leader被推选出来,它就处于OnlinePartition状态
  • OfflinePartition —— 如果leader选举出来后,leader broker宕机了,那么该分区就处于OfflinePartition状态。
既然有四种状态就需要定义合法的状态转换:
NonExistentPartition -> NewPartition
1. 首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
2. 对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
OnlinePartition, OfflinePartition -> OnlinePartition
1. 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。
  • 如果是OfflinePartitionLeaderSelector的话,新的leader就是一个可用的副本(该部分最好不在ISR中),而新的ISR就是以前的ISR或者是刚刚选出来的leader,而接收LeaderAndIsr请求的副本集合就是当前可用的副本集合;
  • 如果是ReassignedPartitionLeaderSelector的话,新的leader就是一个可用的已分配的副本,新的ISR就是当前的ISR,而接收请求的副本集合就是重分配的副本集合;
  • 如果是PreferredReplicaPartitionLeaderSelector的话,新的leader就是第一个分配的副本,新的ISR就是当前的ISR,接收请求的副本就是已分配的副本集合;
  • 如果是ControlledShutdownLeaderSelector的话,新的leader就是ISR中没有被关闭的副本,新的ISR就是去除关闭状态副本的ISR,而接收副本就是当前可用的已分配副本
2. 对于这个分区而言,发送LeaderAndIsr请求给每个接收请求的副本并且发送UpdateMetadata请求给每个可用的broker
NewPartition, OnlinePartition -> OfflinePartition
标记分区状态为离线(offline),仅此而已
OfflinePartition -> NonExistentPartition
仅仅是标记分区状态为NonExistentPartition即可
 
下面逐一分析各个leader选举器类,先从NoOpLeaderSelector开始:
NoOpLeaderSelector类 —— 本质上什么都不做,只是返回当前的leader和ISR以及给定分区当前的AR(assigned replicas)
OfflinePartitionLeaderSelector类 —— 如果ISR中至少有一个可用的broker,则从ISR中选取一个broker作为新的leader,而可用的ISR就是新的ISR。如果没有可用的broker且没有启用unclean leader选取,那么就抛出异常NoReplicaOnlineException。否则就从AR中选出一个可用的broker作为新的leader和ISR。但如果连AR中都没有可用的broker,抛出异常。最后将可用的AR作为接收LeaderAndIsr请求的副本集合。一旦成功选举出leader之后保存到zookeeper中并更新缓存信息。
ReassignedPartitionLeaderSelector类 —— 从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
PreferredReplicaPartitionLeaderSelector类 —— 如果AR中的第一个副本就是当前leader的话,抛出异常,否则就选举该副本为leader,把当前ISR当做新的ISR,令AR作为接收LeaderAndIsr请求的副本集合。
ControlledShutdownLeaderSelector类 —— 将ISR中那些处于关闭状态的副本去除掉作为新的ISR,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。
二、PartitionStateMachine.scala
这个就是分区的状态机。首先定义了一个trait封装了定义好的四种分区状态:NewPartition、OnlinePartition、OfflinePartition和NonExistentPartition。而PartitionStateMachine类就是分区的状态机类。在具体展开该类的字段方法之前,先说一下它内部定义的三个嵌套类,用于动态监听不同的分区操作:
1. TopicChangeListener监听类
这个类继承了Zookeeper的IZKChildListener接口,后者是znode子节点事件监听接口,当ZKClient接收到某个path节点变更或子节点变更事件时就会触发该listener。而TopicChangeListener类负责监听分区的所有可能的状态转换。具体实现也很简单就是实现接口的handleChildChange方法,具体逻辑如下:
  • 获取给定zk路径下所有子节点的集合,并与当前controller中保存的topic集合比较,找出新增topic集合和被删除的topic集合
  • 更新controller中保存的当前topic集合
  • 从zk的/brokers/topics/[topic]路径下找出那些新增topic对应的分区副本分配记录信息
  • 更新controller中保存的topic分区副本分配记录,去掉那些被删除topic的记录,并加入上一步中获取到的那些新增topic的分配记录
  • 如果新增了topic,那么调用onNewTopicCreation方法为新增topic注册分区变更监听器并设置分区状态,最后发送元数据更新请求通知各个broker
2. DeleteTopicsListener监听类
这个监听器负责监听topic的删除,主要包括1. 将要删除的topic加入到待删除topic缓存中——前提是这个topic必须存在;2. 如果存在要删除的topic,那么通知删除topic线程。它也实现了handleChildChange方法,这个方法会在topic被删除的时候被调用。具体逻辑如下:
  • 获取待删除topic集合(方法参数传过来的),与controller中保存的topic集合比较,找到那些不存在的topic
  • 如果确实存在不存在的topic集合,那么删除zk中/admin/delete_topics下对应的子节点
  • 从待删除的topic集合中去掉那些不存在的topic
  • 遍历更新后的待删除topic集合,判断每个topic当前是否正处于其他状态变更过程中,比如PreferredLeaderSelector或ReassignedLeaderSelector。如果是的话调用DeleteTopicManager的markTopicIneligibleForDelete方法标记该topic为暂时不能被删除状态
  • 将所有带删除的topic都加入到controller的topic删除列表中等待专有线程对其执行删除操作
3. AddPartitionsListener监听类
用于监听增加分区事件,也实现了handleDataChange方法,具体逻辑如下:
  • 获取zookeeper中新增分区topic对应的分区副本分配记录,与controller中保存的分区副本记录相比较,找出新增的分区记录
  • 如果这些新增分区所属的topic当前正在执行删除操作,那么直接记录一个日志错误返回,即跳过增加分区的操作,否则调用controller的onNewPartitionCreation方法来创建这些分区
okay,说完这些监听类之后我们可以梳理一下PartitionStateMachine类的字段和方法。先说字段:
1. controllerContext —— 就是KafkaController类的一个实例,封装了很多controller的方法
2. controllerId —— controller ID,由配置文件中的broker.id属性指定
3. zkClient —— 一个ZooKeeper的客户端,用于与zookeeper服务器交互
4. partitionState —— 保存的分区的状态信息
5. brokerRequestBatch —— 主要用于批量发送请求给Broker
6. hasStarted —— 主要用于标识该状态机是否开启
7. noOpPartitionLeaderSelector —— 一个默认的leader选举类,主要被当做默认值使用,本质上其实什么都不做
8. topicChangeListener/deleteTopicListener/addPartitionListener —— 三个监听类实例,用于监听状态变更、删除topic和增加新分区事件
9. stateChangeLogger —— 一个日志类
下面说说具体的方法:
1. initializePartitionState —— 在分区状态机启动时候会被调用,用于设置Zookeeper中所有现存分区的初始状态。具体逻辑是:首先获取zk中已有分区记录,遍历每一条分区记录,如果该分区没有leader和ISR的话则置于NewPartition状态,否则检查一下leader broker是否可用(即在可用broker列表中),如果可用的话将分区状态设置为OnlinePartition否则设置为OfflinePartition。
2. assertValidPreviousStates —— 在分区状态转换开始前验证一下转换前的状态是否支持进行这种转换。被允许的转换之前已经说过了,不再赘述。
3. assignReplicasToPartitions —— 在执行NonExistentPartition到NewPartition的转换时会调用该方法用来更新controller的保存的分区副本分配缓存信息。具体方法就是先从zookeeper中读出该分区的AR记录然后加入到controller保存的缓存中。
4. initializeLeaderAndIsrForPartition —— 在执行NewPartition到OnlinePartition转换时会调用该方法。如果一个分区处于NewPartition状态时,它并没有选举出leader和ISR。在转换到OnlinePartition状态后,会在Zookeeper上创建对应的leader和ISR记录。一旦处于OnlinePartition状态后分区永远不能退回到NewPartition状态,而只能是OfflinePartition状态。具体逻辑如下:
  • 获取目标分区对应的副本分配记录,并从中找出可用的已分配副本。如果当前没有可用的副本那么报错表示该状态转换失败;否则选举出第一个副本broker作为leader,把当前可用的副本集合作为ISR
  • 一旦确定了leader和ISR之后,在zookeeper上的/brokers/topics/[topic]/partitions/[partitionId]/state节点下创建出对应的leader和ISR路径
  • 更新controller的分区leader缓存信息,并且把LeaderAndIsr请求加入到brokerRequestBatch中
5. handleStateChange —— 这个方法是分区状态机的核心方法。该方法主要确保所有的状态转换都是合法的。具体逻辑如下:
  • 如果分区状态机本身没有启动,直接抛出异常退出
  • 查看目标分区在状态中的当前状态,如果没有任何记录设置当前状态为NonExistentPartition
  • 如果要转换到NewPartition状态,当前必须处于NonExistentPartition状态,然后更新controller中的分区副本分配缓存信息,并将分区状态设置为NewPartition
  • 如果要转换到OnlinePartition状态,当前必须处于NewPartition、OnlinePartition或OfflinePartition中的一种。如果当前状态是NewPartition,首先为该分区选举出leader和ISR并写入zookeeper中保存;如果是OnlinePartition或OfflinePartition,则使用electLeaderForPartition方法(后面会说到)重新为分区选举leader,最后设置分区状态为OnlinePartition
  • 如果要转换到OfflinePartition状态,当前必须处于NewPartition、OnlinePartition或OfflinePartition中的一种。当分区的leader不存在或不可用时会执行这样的转换。该转换也仅仅是变更分区状态为OfflinePartition
  • 如果要转换到NonExistentPartition状态,当前必须处于OfflinePartition,同样地,转换操作也仅仅是设置目标分区状态为NonExistentPartition
6. getLeaderIsrAndEpochOrThrowException —— 从Zookeeper中获取某个分区的leader和ISR信息,如果不存在则抛出异常。
7. electLeaderForPartition —— 为那些leader已不可用的目标离线分区选举新的leader,在执行Offlinepartition, OnlinePartition到OnlinePartition的转换时会被调用。具体逻辑是:
  • 从zookeeper中获取目标分区的leader和ISR,以及controller_epoch信息
  • 如果zk中的controller_epoch比当前controller保存的值大,说明当前controller曾经可能失败过并选举过别的controller且那个controller也干预过目标分区的leader选举。如果是这样的话直接终止正在进行中的leader选举
  • 否则,使用指定的leader选举器进行leader选举,获取leader、ISR和AR的信息并写入zookeeper中保存
  • 如果zookeeper更新失败(比如无法连接zookeeper等),从步骤1开始重试,直至成功为止
  • 更新controller缓存的leader、ISR信息,并将leader、ISR、AR信息加入到元数据请求队列中等待后面发送元数据更新请求
8. triggerOnlinePartitionStateChange —— 当Kafka集群成功选出controller或发生broker变更时就会调用该方法尝试将所有处于NewPartition或OfflinePartition状态的分区转换状态到OnlinePartition状态。具体做法就是遍历分区状态缓存中的所有分区,只要其所属的topic不在要删除topic队列中且分区处于NewPartition或OfflinePartition状态就调用handleStateChange方法进行到OnlinePartition的状态转换。然后发送更新LeaderAndIsr请求以及元数据请求给broker
9. partitionInState —— 从controller缓存中获取当前处于给定状态的所有分区,貌似这个方法没有被使用过
10. startup —— 启动状态机,先初始化各个分区的状态,然后设置启动标识位为true表明已启动,最后调用triggerOnlinePartitionStateChange方法将符合条件的分区都置为OnlinePartition状态
11. registerTopicChangeListener/deregisterTopicChangeListener —— 分别注册和取消Zookeeper的/brokers/topics上分区状态转换监听器
12. registerDeleteTopicListener/deregisterDeleteTopicListener —— 分别注册和取消zookeeper的/admin/delete_topics下的topic删除的监听器
13. registerPartitionChangeListener/deregisterPartitionChangeListener —— 分别注册和取消zookeeper的/brokers/topics/[topic]下的分区变更监听器
14. registerListeners —— 注册状态转换监听器,另外如果开启了topic删除的功能(设置delete.topic.enable属性为true),那么还要注册删除topic的监听器
15. deregisterListeners —— 取消状态转换监听器以及每个topic的分区变更监听器,清空topic-》分区变更监听器的缓存。类似地,如果开启了topic删除的功能(设置delete.topic.enable属性为true),还要取消删除topic的监听器
16. shutdown —— controller关闭时会调用该方法来关闭状态机:首先设置启动标识位为false,清空分区状态缓存并取消所有Zookeeper监听器
17. handleStateChanges —— 分区变更监听器会调用该方法为一组分区进行状态转换,具体做法就是遍历分区集合,为每一个分区调用handleStateChange方法设置分区状态,最后发送LeaderAndIsr请求以及更新元数据请求给broker
三、ReplicaStateMachine.scala
副本状态机类,定义了一个分区副本的所有状态集合:
NewReplica —— 创建topic或重分配分区时controller会创建新的副本,新创建的副本就是处于这个状态。在此状态中的副本只能接收"成为follower"的状态变更请求,可由NonExistentReplica状态转换而来
OnlineRelica —— 一旦启动了一个副本以及该分区AR副本集合中的一部分,那么就将设置该副本状态为OnlineReplica。在此状态中的副本可以接收"成为leader"或"成为follower"的状态变更请求。可由NewRelica、OnlineReplica或OfflineReplica状态转换而来
OfflineReplica —— 如果一个副本挂掉(保存该副本的broker宕机)将被置于OfflineReplica状态,可由NewReplica或OnlineReplica状态转换而来
ReplicaDeletionStarted —— 开启副本删除操作时会将副本状态置于ReplicaDeletionStarted状态,可由OfflineReplica状态转换而来
ReplicaDeletionSuccessful —— 如果副本删除请求成功,返回的响应没有错误的话,该副本会被置于ReplicaDeletionSuccessful状态,可由ReplicaDeletionStarted状态转换而来
ReplicaDeletionIneligible —— 如果副本删除失败,将被置于ReplicaDeletionIneligible状态,可由ReplicaDeletionStarted状态转换而来
NonExistentReplica —— 如果副本被成功删除将被置于NonExistentReplica状态,可由ReplicaDeletionSuccessful状态转换而来
在具体展开该类的字段方法之前,先说一下它内部定义的嵌套类,用于动态监听副本所有的状态变更:
BrokerChangeListener类
和很多监听器类一样,也实现了handleChildChange方法,具体逻辑如下:
  • 如果启动了状态机,首先开启leader选举计时器
  • 获取要进行状态变更的副本所在的所有broker Id,与controller中缓存的副本列表比较,找出新增加的broker
  • 在Zookeeper中获取这些新增加broker的元数据信息并封装到一个Broker列表中返回
  • 找出那些已被删除的broker,然后使用给定的broker列表更新controller中可用broker缓存
  • 分别调用addBroker和removeBroker方法更新对应的broker线程
  • 最后调用onBrokerStartup和onBrokerFailure回调函数分别处理新增加的broker和删除的broker
 
ReplicaStateMachine类与PartitionStateMachine类的代码结构很像,我们还是一个个地分析其定义的字段和方法:
类字段
1. controllerContext —— 就是KafkaController类的一个实例,封装了很多controller的方法
2. controllerId —— controller ID,由配置文件中的broker.id属性指定
3. zkClient —— 一个ZooKeeper的客户端,用于与zookeeper服务器交互
4. replicaState —— 保存的分区副本的状态信息缓存
5. brokerChangeListener —— zk监听器,用于监听副本的状态变更
6. hasStarted —— 主要用于标识该状态机是否开启
7. brokerRequestBatch —— 主要用于批量发送请求给Broker
8. stateChangeLogger —— 一个日志类,用于一些日志输出
类方法
1. initializeReplicaState —— 设置zookeeper中所有分区的副本的初始状态,在启动副本状态机时会调用该方法。具体逻辑如下:
  • 获取controller中保存的分区副本分配缓存记录
  • 找出每个分区的AR信息,遍历AR中的每个broker,如果这个broker可用(是否在controller的可用broker列表缓存中),则将该副本状态置于OnlineReplica,否则将副本状态置于ReplicaDeletionIneligible状态——controller failover时候broker会宕机,处于该broker的所有副本都要置于这个状态
2. partitionAssignedToBroker —— 找出目标broker上某个topic的所有分区信息,不过这个方法貌似没有被用到
3. assertValidPrevisousStates —— 给定目标转换状态,验证当前状态是否合法
4. handleStateChange —— 副本状态机的核心方法。它定义了合法的状态转换,包括:
  • NonExistentReplica -> NewReplica:将当前leader和ISR封装到一个LeaderAndIsr请求中发送给新的replica broker,并发送UpdateMetadata请求给每个当前可用的broker
  • NewReplica -> OnlineReplica:将新增副本加入到AR中
  • OnlineReplica, OfflineReplica -> OnlineReplica:将当前leader和ISR封装到一个LeaderAndIsr请求中发送给新的replica broker,并发送UpdateMetadata请求给每个当前可用的broker
  • NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible -> OfflineReplica:首先发送StopReplicaRequest请求给副本(但不删除副本),然后将该副本从ISR中移除,之后发送LeaderAndIsr请求给leader副本更新ISR并发送UpdateMetadata请求给所有可用的broker
  • OfflineReplica -> ReplicaDeletionStarted: 发送StopReplicaRequest请求给副本(同时删除该副本)
  • ReplicaDeletionStarted -> ReplicaDeletionSuccessful:在状态机中标记副本状态
  • ReplicaDeletionStarted -> ReplicaDeletionIneligble:在状态机中标记副本状态
  • ReplicaDeletionSuccessful -> NonExistentReplica:从controller的分区副本分配缓存中移除某个副本
具体逻辑如下:
  • 如果状态机没有启动,直接抛出异常退出
  • 获取controller中保存的给定副本的状态,如果无缓存记录,初始化状态为NonExistentReplica,然后获取controller中缓存的该分区的副本分配记录
  • 根据给定的目标状态进入不同的分支:
    • 如果要转换为NewReplica:必须验证当前状态是NonExistentReplica,然后从zookeeper中找出该副本分区的leader、ISR等信息。如果不存在这些信息,就等待leader选举之后发请求给该副本;如果存在这些信息,还要判断一些leader是否就是自己,如果是的话抛错因为被选举为leader的副本是不能进行状态转换到NewReplica的。如果以上步骤都没有抛错,就把发送LeaderAndIsr请求给该副本并发送UpdateMetadata请求给所有可用的broker。最后设置副本状态为NewReplica
    • 如果要转换为ReplicaDeletionStarted,必须验证当前状态是OfflineReplica,然后更新副本状态并发送停止副本的请求(通过回调函数来完成)
    • 如果要转换为ReplicaDeletionIneligible,说明该副本目前不能被删除,首先要验证当前状态必须是ReplicaDeletionStarted,然后更新状态为ReplicaDeletionIneligible即可
    • 如果要转换为NonExistentReplica,必须验证当前状态是ReplicaDeletionSuccessful,之后在controller缓存中把副本从所有分区的AR中移除并从状态缓存中移除该副本的记录
    • 如果要转换为OnlineReplica,必须验证当前状态是NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible中的一种。如果是NewReplica状态,找出当前分区的AR集合,如果该副本不在AR中则把它加入到AR;如果是其他状态的话则需要先检查是否存在leader,如果不存在的话意味着该分区从未处于OnlinePartition状态,也就是说broker从未开启过该分区的消息日志写入,因此在本方法中也就什么都不做。但如果存在leader信息,将发送LeaderAndIsr请求给该副本,并发送UpdateMetadata请求给所有可用的broker,最后设置状态即可
    • 如果要转换为OfflineReplica,必须验证当前状态是NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible中的一种。首先发送停止副本的请求使得副本不在从leader出获取消息,然后找出分区的leader和ISR。如果不存在则抛出异常,否则将分区副本从ISR中移除并发送LeaderAndIsr请求更新移除后的ISR,最后设置副本状态即可
5. handleStateChanges —— 状态机启动时以及broker发生变更时会调用这个方法,主要负责处理一组副本的状态变更。具体做法就是为每一个要处理的副本调用handleStateChange方法来进行状态转换,然后统一发送LeaderAndIsr请求列表中的所有请求
6. startup —— 启动状态机,先初始化副本状态,然后设置启动标识位,最后调用handleStateChanges方法将controller中保存所有当前副本设置为OnlineReplica
7. registerBrokerChangeListener/deregisterBrokerChangeListener —— 在zk上注册/取消用于监听副本状态变更的监听器
8. registerListeners/deregisterListeners —— 同上
9. shutdown —— 关闭controller时候调用该方法关闭状态机——设置启动标识位为false,清空副本状态缓存并取消所有zk上的监听器注册
10. areAllReplicasForTopicDeleted —— 查看某topic下的所有副本是否已经被删除
11. isAtLeastOneReplicaInDeletionStartedState —— 判断是否存在至少一个副本正处于被删除的过程中
12. replicasInState —— 返回某个topic下指定副本状态的所有副本
13. isAnyReplicaInState —— 判断某topic下是否存在指定状态的副本
14. replicasInDeletionStates —— 返回某topic下正处于删除操作过程中的所有副本
15. partitionAssignedToBroker —— 在一组topic中找出在给定broker有副本的topic及分区
原文地址:https://www.cnblogs.com/huxi2b/p/4501084.html