Kafka 0.8 Controller设计机制和状态变化

在kafka集群中,其中一个broker server作为中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。

下面说明如何通过中央控制器操作分区和副本的状态。

名词解释

  • isr:同步副本组
  • OfflinePartitionLeaderSelector:分区下线后新的领导者选举
  • OAR:老的分配副本
  • PartitionStateChange: 分区状态

1 PartitionStateChange

1.1 其有效状态如下:

  • NonExistentPartition: 这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。
  • NewPartition:创建分区后,分区处于NewPartition状态。在这种状态下,还没有Leader/ISR组。
  • OnlinePartition:一旦一个分区Leader被选出,就会处于该状态。
  • OfflinePartition:如果分区Leader成功选举后,当Leader分区崩溃或挂了,分区状态转变为该状态。

1.2 其有效的状态转移如下:

  • NonExistentPartition -> NewPartition : 集群Controller根据计算规则,从zk中读取分区信息,创建新partition和replica。(?什么样的计算规则?
    1. 首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
    2. 对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
  • NewPartition -> OnlinePartition :
    1. 分配第一个alive的副本作为分区leader,并且该分区所有replica作为一个同步复制组(ISR),将这些信息(Leader和ISR数据) 写到zk中。
    2. 对于这个分区,发送LeaderAndIsr请求给每一个replica分区和并发送UpdateMetadata请求到每个活者的broker server。目的是让所有的Broker都拥有全局的partition信息。
  • OnlinePartition,OfflinePartition -> OnlinePartition:
    1. 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。。
      • OfflinePartitionLeaderSelector
      • ReassignedPartitionLeaderSelector
      • PreferredReplicaPartitionLeaderSelector
      • ControlledShutdownLeaderSelector
    2. 对于这个分区,发送LeaderAndIsr请求给每一个接收副本和UpdateMetadata请求到每个broker server
  • NewPartition,OnlinePartition -> OfflinePartition:表示分区下线状态
  • OfflinePartition -> NonExistentPartition: 表示分区不存在状态

2 ReplicaStateChange

2.1 有效状态如下:

  • NewReplica:当创建topic或分区重新分配期间,replica被创建。在这种状态下,副本只能成为Follower。
  • OnlineReplica:一旦此分区一个副本启动且部分分配副本,将处于在线副本状态。在这种状态下,它可以成为Leader或成为Follower
  • OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。
  • NonExistentReplica:如果一个副本被删除了,它将变为此状态。

2.2 有效状态转移如下:

  • NonExistentReplica - - > NewReplica: 使用当前Leader和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
  • NewReplica - > OnlineReplica : 添加新的副本到副本列表中
  • OnlineReplica,OfflineReplica - > OnlineReplica: 使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
  • NewReplica,OnlineReplica - > OfflineReplica
    1. 发送StopReplicaRequest到相应副本(w / o删除)
    2. 从isr和发送LeaderAndIsr请求重删除此副本(isr)领导者副本和UpdateMetadata分区每个存活broker。
  • OfflineReplica - > NonExistentReplica : 发送StopReplicaRequest到副本(删除)

3.KafkaController操作:

3.1 当新建topic时,调用方法onNewPartitionCreation

3.2 当创建新分区时:

  1. 创建新分区列表 -> 调用方法NewPartition
  2. 创建所有新分区副本 -> 调用方法NewReplica
  3. 新分区在线列表 -> 调用方法OnlinePartition
  4. 新分区所有在线副本 -> OnlineReplica

3.3 当broker失败或挂掉时:

  1. 当前broker所有领导者分区为下线分区 -> 调用方法OfflinePartition
  2. 下线和在线分区列表 -> OnlinePartition (使用下线分区Leader election)
  3. 在broker上所有fail副本 -> OfflineReplica

3.4 当broker启动时:

  1. 发送UpdateMetadate请求给新启动broker的所有分区。
  2. 新启动broker的分区副本-> OnlineReplica
  3. 下线和在线分区列表 -> OnlinePartition (使用下线分区领导者选举)
  4. 当新的broker启动时,对于所有分区副本,系统会调用方法onPartitionReassignment执行未完成的分区分配。

3.5 当分区重新分配时: (OAR: 老的分配副本; RAR:每当重新分配副本会有新的副本组)

  1. 用OAR + RAR副本组修改并分配副本列表.
  2. 当处于OAR + RAR时,发送LeaderAndIsr请求给每个副本。
  3. 副本处于RAR - OAR -> 调用方法NewReplica
  4. 等待直到新的副本加入isr中
  5. 副本处于RAR -> 调用方法OnlineReplica
  6. 设置AR to RAR并写到内存中
  7. send LeaderAndIsr request 给一个潜在领导者 (如果当前领导者不在RAR中)和一个被分配的副本列表(使用RAR) 和相同sir到每个处于RAR的broker中。
  8. replicas in OAR - RAR -> Offline (强制这些副本从isr重剔除)
  9. replicas in OAR - RAR -> NonExistentReplica (强制这些副本被删除)
  10. 在zk上修改重分配副本到RAR中。
  11. 在zk上修改 /admin/reassign_partitions路径,并删除此分区
  12. 选举领导者后,副本和isr信息变化,所以重新发送更新元数据请求给每一个broker。

3.6 当中央控制器failover时:

  • replicaStateMachine.startup():
    1. 从任何下线副本或上线副本中初始化每个副本
    2. 每个副本 -> OnlineReplica (强制LeaderAndIsr请求发送到每个副本)
  • partitionStateMachine.startup():
    1. 从新建分区中初始化每个分区, 下线或上线分区
    2. each OfflinePartition and NewPartition -> OnlinePartition (强制领导者选举)
  • 恢复分区分配
  • 恢复领导者选举

3.7 当发送首选副本选举时:

影响分区列表 -> 调用方法OnlinePartition (with PreferredReplicaPartitionLeaderSelector)

3.8 关闭broker:

  1. 在关闭broker中对于每个分区如果是领导者分区 -> 调用方法OnlinePartition (ControlledShutdownPartitionLeaderSelector)
  2. 在关闭broker中每个副本是Follower,将发送StopReplica请求 (w/o deletion)
  3. 在关闭broker中每个副本是Follower -> 调用方法OfflineReplica (强制从同步副本组中删除副本)

4 源码分析

在KafkaController类中定义了很多属性,我们先重点了解下面的PartitionLeaderSelector对象,主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。

trait PartitionLeaderSelector {
  /**
   * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
   * the LeaderAndIsrRequest.
   */
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

通过我们上面的代码,可以看到在KafkaController中共定义了五种selector选举器:

  1. NoOpLeaderSelector
  2. OfflinePartitionLeaderSelector
  3. ReassignedPartitionLeaderSelector
  4. PreferredReplicaPartitionLeaderSelector
  5. ControlledShutdownLeaderSelector

4.1.ReassignedPartitionLeaderSelector

从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。

4.2 PreferredReplicaPartitionLeaderSelector

如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。

4.3 ControlledShutdownLeaderSelector

将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。

4.4 NoOpLeaderSelector

原则上不做任何事情,返回当前的leader和isr。

4.5 OfflinePartitionLeaderSelector

从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。

所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,而是每个handler都是一个函数,混在KafkaApi类中

/**
   * Top-level method that handles all requests and multiplexes to the right api
   */
  def handle(request: RequestChannel.Request) {
      trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
        format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
      ApiKeys.forId(request.requestId) match {
        case ApiKeys.PRODUCE => handleProducerRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    }

4.6 case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

这个方法让其成为follower还是leader,得调用下面这个方法,
kafka.server.ReplicaManager#becomeLeaderOrFollower:流程图如下。
image

def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
                             metadataCache: MetadataCache,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
    leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
      stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                .format(localBrokerId, stateInfo, correlationId,
                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
    }
    replicaStateChangeLock synchronized {
      val responseMap = new mutable.HashMap[TopicPartition, Short]
      if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
        stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
          "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
          correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
        }
        BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
      } else {
        val controllerId = leaderAndISRRequest.controllerId
        controllerEpoch = leaderAndISRRequest.controllerEpoch

        // First check partition's leader epoch
        val partitionState = new mutable.HashMap[Partition, PartitionState]()
        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
          val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
          val partitionLeaderEpoch = partition.getLeaderEpoch()
          // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
          // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
          if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
            if(stateInfo.replicas.contains(config.brokerId))
              partitionState.put(partition, stateInfo)
            else {
              stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
                "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                  topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
            }
          } else {
            // Otherwise record the error code in response
            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
              "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
              .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
          }
        }

        val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
          stateInfo.leader == config.brokerId
        }
        val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

        val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
        else
          Set.empty[Partition]
        val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
        else
          Set.empty[Partition]

        // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
        // have been completely populated before starting the checkpointing there by avoiding weird race conditions
        if (!hwThreadInitialized) {
          startHighWaterMarksCheckPointThread()
          hwThreadInitialized = true
        }
        replicaFetcherManager.shutdownIdleFetcherThreads()

        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
        BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
      }
    }
  }
代码解释:
  1. 如果请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。

  2. 如果partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

    2.1 如果当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中。
    否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中

  3. 如果partitionStateInfo中的leader epoch小于当前ReplicManager则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

  4. 筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。

    • 如果partitionsTobeLeader不为空,则对其执行makeLeaders方。
    • 如果partitionsToBeFollower不为空,则对其执行makeFollowers方法。
原文地址:https://www.cnblogs.com/byrhuangqiang/p/6367849.html