kafka partiton迁移方法与原理

在kafka中增加新的节点后,数据是不会自动迁移到新的节点上的,需要我们手动将数据迁移(或者成为打散)到新的节点上

1 迁移方法

kafka为我们提供了用于数据迁移的脚本。我们可以用这些脚本完成数据的迁移。

1.1 生成partiton分配表

1.1.1 创建json文件topic-to-move.json

{
  "topics": [{"topic": "testTopic"}],
  "version": 1
}

1.1.2 生成partiton分配表

运行

$ ./kafka-reassign-partitions --zookeeper ${zk_address} --topics-to-move-json-file  topic-to-move.json --broker-list "140,141" --generate

其中${zk_address}是kafka所连接zk地址,"140,141"为该topic将要迁移到的目标节点。

生成结果如下:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"testTopic","partition":1,"replicas":[61,62]},{"topic":"testTopic","partition":0,"replicas":[62,61]}]}

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"testTopic","partition":1,"replicas":[140,141]},{"topic":"testTopic","partition":0,"replicas":[141,140]}]}

其中上半部分是当前的partiton分布情况,下半部分是迁移成功后的partion分布情况。那么我们就用下部分的json进行迁移。另外,也可以自己构造个类似的json文件,同样可以进行迁移。这里我们使用脚本为我们生成的json文件,将下半部分的json保存为expand-cluster-reassignment.json

1.2 执行迁移

$ ./kafka-reassign-partitions --zookeeper ${zk_address}  --reassignment-json-file expand-cluster-reassignment.json --execute

1.3 查看迁移进度

$ ./kafka-reassign-partitions --zookeeper ${zk_address} --reassignment-json-file expand-cluster-reassignment.json --verify

2 源码分析

2.1 脚本调用

kafka-reassign-partitions.sh会调用kafka.admin.ReassignPartitionsCommand.scala,在代码运行过程中抛出的任何异常都会通过标准输出打印出来,所以如果执行该脚本报错,可以看下这块代码来定位问题。

def main(args: Array[String]): Unit = {
    // 略
    try {
      if(opts.options.has(opts.verifyOpt)) // 校验
        verifyAssignment(zkUtils, opts)
      else if(opts.options.has(opts.generateOpt)) // 生成json
        generateAssignment(zkUtils, opts)
      else if (opts.options.has(opts.executeOpt)) // 执行迁移
        executeAssignment(zkUtils, opts)
    } catch {
      case e: Throwable =>
        println("Partitions reassignment failed due to " + e.getMessage)
        println(Utils.stackTrace(e))
    } finally {
      val zkClient = zkUtils.zkClient
      if (zkClient != null)
        zkClient.close()
    }

2.1.1 executeAssignment

executeAssignment 用于执行迁移。

  def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){
    // 略,做一些校验和去重等工作

    // 获取当前的partition分布情况
    zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
    println("Current partition replica assignment

%s

Save this to use as the --reassignment-json-file option during rollback"
      .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))

      // 重点,执行迁移,z即将json写到zk上,准确的说是写到"/admin/reassign_partitions"下
    // start the reassignment
    if(reassignPartitionsCommand.reassignPartitions())
      println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
    else
      println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
  }

executeAssignment将json写到zk上后,brokerwatch到节点数据变化就开始进行迁移了

2.1.2 verifyAssignment

verifyAssignment用于校验迁移进度

def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
    // 略
    println("Status of partition reassignment:")
    val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned) // 重点
    reassignedPartitionsStatus.foreach { partition =>
      partition._2 match {
        case ReassignmentCompleted =>
          println("Reassignment of partition %s completed successfully".format(partition._1))
        case ReassignmentFailed =>
          println("Reassignment of partition %s failed".format(partition._1))
        case ReassignmentInProgress =>
          println("Reassignment of partition %s is still in progress".format(partition._1))
      }
    }
  }

  private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
  :Map[TopicAndPartition, ReassignmentStatus] = {
    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) // 从zk节点"/admin/reassign_partitions"读取迁移信息
    partitionsToBeReassigned.map { topicAndPartition =>
      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
        topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
    }
  }

  def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
                                            reassignedReplicas: Seq[Int],
                                            partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
                                            partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
    val newReplicas = partitionsToBeReassigned(topicAndPartition)
    partitionsBeingReassigned.get(topicAndPartition) match {
      case Some(partition) => ReassignmentInProgress // 如果tp对应的数据存在则说明还在迁移
      case None =>  // 否则可能是成功了
        // check if the current replica assignment matches the expected one after reassignment
        val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
        if(assignedReplicas == newReplicas) // 重点,如果节点不存在了,但是迁移后的replica列表和预期不一致,则报错
          ReassignmentCompleted
        else { // 经常遇到的报错
          println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" +
            " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition))
          ReassignmentFailed
        }
    }
  }

从源码中可以看出判断迁移是否完成是根据"/admin/reassign_partitions"是否存在来判断。如果节点不存在了,并且迁移后的AR和预期一致,则才算成功。

注意:在实际迁移中遇到过好几次报错类似如下,即上面代码的打印的日志

don't match the list of replicas for reassignment

从代码中可以看到出现这个错误的原因是"/admin/reassign_partitions"不存在了,但是当前topic的AR和预期的不一致。这个原因一般是由于迁移的时候broker那边报错了,然后将节点删除了,并没有进行迁移。具体原因需要看下broker的controller的日志。

2.2 broker如何进行迁移

2.2.1 入口

broker的controller节点负责partiton的迁移工作,在broker被选为controller节点的时候会watch "/admin/reassign_partitions" 节点的变化。

private def registerReassignedPartitionsListener() = {
    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
  }

所以迁移的工作主要在partitionReassignedListener中,controller watch到"/admin/reassign_partitions"节点数据变化后,会读取该数据内容,并跳过正在删除的partiton,进行迁移工作。

class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
  this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
  val zkUtils = controller.controllerContext.zkUtils
  val controllerContext = controller.controllerContext

  @throws(classOf[Exception])
  def handleDataChange(dataPath: String, data: Object) {
    val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString) // 读取"/admin/reassign_partitions"节点内的数据,封装成[TopicAndPartition, relipcs] 的形式
    val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
      partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
    }
    partitionsToBeReassigned.foreach { partitionToBeReassigned => // 迁移每一个partiton
      inLock(controllerContext.controllerLock) {
        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { // 正在删除的则跳过
          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
          controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
        } else {
          val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) // 将目标replica列表封装成ReassignedPartitionsContext
          controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) // 重点,以上都是读取,这里才是真正的迁移工作
        }
      }
    }
  }

2.2.2 KafkaController#initiateReassignReplicasForTopicPartition()

initiateReassignReplicasForTopicPartition进行迁移工作。但是他主要做一些校验工作,该方法中会watch该partiton的ISR变化情况,即监听“/brokers/topics/{topic}/partitions/{partiton}/state” 节点的变化, 这和迁移的原理有关系。

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                        reassignedPartitionContext: ReassignedPartitionsContext) {
    val newReplicas = reassignedPartitionContext.newReplicas
    val topic = topicAndPartition.topic
    val partition = topicAndPartition.partition
    val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) // 根据broker的存活进行过滤
    try {
        // 重点, 从controllerContext中读取partition的AR
      val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
      assignedReplicasOpt match {
        case Some(assignedReplicas) =>
        // 如果ontrollerContext中AR和目标迁移列表相同,则抛异常。注意他们都是Seq类型,相同是指顺序也相同。
          if(assignedReplicas == newReplicas) {
            throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
              " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
          } else {
            if(aliveNewReplicas == newReplicas) { // 目标列表里的replic的broker都存活才能进行迁移
        
              watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) // 重点,后面会分析
              controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
            
              deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
              onPartitionReassignment(topicAndPartition, reassignedPartitionContext) // 重点,真正干活的,做迁移工作的
            } else { // 有不存活的,则抛出异常。
              // some replica in RAR is not alive. Fail partition reassignment
              throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
                " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
                "Failing partition reassignment")
            }
          }
        case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
          .format(topicAndPartition))
      }
    } catch {
      case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
      // remove the partition from the admin path to unblock the admin client
      removePartitionFromReassignedPartitions(topicAndPartition) // 重点,一点迁移出问题,抛出异常,则会将"/admin/reassign_partitions"里的相应信息清空或者删除节点
    }
  }

2.2.3 KafkaController#onPartitionReassignment

真正的迁移步骤是在onPartitionReassignment完成的

def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
    val reassignedReplicas = reassignedPartitionContext.newReplicas
    areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { // 判断目标迁移的replic是不是都在ISR中,都在的意思是目标迁移的replic列表是ISR的一个子集。例如ISR列表是[1,2,3],目标迁移列表是[2,3],则为true。

    // 如果是false,则会将AR和目标replic列表做个并集,类似于增加该partiton的副本数。例如AR是[1,2,3], 目标是[2,3,4],则会将partiton的AR设置为[1, 2, 3, 4], 相当于partiton增加了一个新的replic。
      case false =>
        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet // AR和目标replica列表求并集
        // 更新AR到zk和缓存
        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
        // 发送LeaderAndIsr
        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
          newAndOldReplicas.toSeq)
        // 让新增加的replic上线,使其开始从leader同步数据。
        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
        info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
          "reassigned to catch up with the leader")

          // 如果是true,则将不再目标列表中的AR中的replic去掉,例如目标迁移是[2,3],AR是[1,2,3], 则将1下线
      case true =>
        //4. Wait until all replicas in RAR are in sync with the leader.
        val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
        //5. replicas in RAR -> OnlineReplica
        reassignedReplicas.foreach { replica =>
          replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
            replica)), OnlineReplica)
        }
        
        // 目标列表中没有leader则需要重新选下leader
        moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
       
       // 以下是停用掉下掉的replica的一些工作,例如更新AR,更新zk,发送meta请求等。另外删除"/admin/reassign_partitions"节点数据
        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
        
        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)

        removePartitionFromReassignedPartitions(topicAndPartition)
        info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
        controllerContext.partitionsBeingReassigned.remove(topicAndPartition)

        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
    
        deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
    }
  }

可以到这里会有个疑问,目标迁移列表不是ISR的子集,就只是增加了replic,并没有去掉replic的步骤啊。这里的关键是在initiateReassignReplicasForTopicPartition中watch了partiton的ISR情况,即调用了watchIsrChangesForReassignedPartition方法。

2.2.4 watchIsrChangesForReassignedPartition

该方法监听/brokers/topics/{topic}/partitions/{partiton}/state” 节点的变化。如果目标迁移列表已经跟上leader了,那么就会将不在目标迁移列表里的replic下线,完成迁移

def handleDataChange(dataPath: String, data: Object) {
    inLock(controllerContext.controllerLock) {
      debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
      val topicAndPartition = TopicAndPartition(topic, partition)
      try {
        controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
          case Some(reassignedPartitionContext) =>
            val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
            newLeaderAndIsrOpt match {
              case Some(leaderAndIsr) => // check if new replicas have joined ISR
                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet // 求并集ISR和目标迁移列表的并集
                if(caughtUpReplicas == reassignedReplicas) { // 目标迁移列表全部跟上了,则再次调用KafkaController#onPartitionReassignment,这次会走true那个判断分支了,会将不再目标replic列表中的replic下线。
                  // resume the partition reassignment process
                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                    "Resuming partition reassignment")
                  controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
                }
                else {
                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                    "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
                }
              case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
                .format(topicAndPartition, reassignedReplicas.mkString(",")))
            }
          case None =>
        }
      } catch {
        case e: Throwable => error("Error while handling partition reassignment", e)
      }
    }
  }

3 broker 处理迁移的思路总结

从以上分析我们可以看出,broker会watch "/admin/reassign_partitions"节点。当发现有迁移任务的时候,会将partiton的AR进行扩展,例如原先partiton的AR是[1, 2], 现在要迁移到[2, 3],那么partiton会先将AR扩展到[1, 2, 3],并监控ISR的变化。

当replica-2和replica-3都跟上后,即在ISR中的时候,表明新的repica-3已经和leader数据同步了。这个时候就可以将replica-1剔除了,最后得到迁移结果是[2, 3]。即迁移是一个先增加再减少的过程。

4 可能遇到的问题

4.1 报错

Assigned replicas (0,1) don't match the list of replicas for reassignment (1,0) for partition [testTopic,0] Reassignment of partition [testTopic,0] failed 

在2.1.2中已经说明了,该错误是由于"/admin/reassign_partitions"节点已经被删除了,但是AR和目标迁移列表不相同报的错,一般需要看下controller的日志,看下controller在迁移过程中是不是抛出了异常。

4.2 迁移一直在进行中,不能完成

迁移需要等目标迁移列表中的replic都跟上了leader才能完成,目前迁移列表一直跟不上,那么就不会完成。可以看下zk中“/brokers/topics/{topic}/partitions/{partiton}/state”,注意下目标迁移列表是不是在isr中,如果不在说明要迁移的replic还没有完成从leader拉取数据。具体为甚么没有拉取成功,可能是数据量比较大,拉取需要一定的时间;也可能是其他原因比如集群宕机了等,需要具体分析下

原文地址:https://www.cnblogs.com/set-cookie/p/9614241.html