MetadataCache更新

MetadataCache什么时候更新

updateCache方法用来更新缓存的。

发起线程 controller-event-thread

controller选举的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/KafkaController onControllerFailover 288
kafka/controller/KafkaController elect 1658
kafka/controller/KafkaController$Startup$ process 1581
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

启动的时候选举,启动这个动作也是个事件


// KafkaController.scala
  case object Startup extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      registerSessionExpirationListener()
      registerControllerChangeListener()
      elect()
    }

  }

broker启动的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/KafkaController onBrokerStartup 387
kafka/controller/KafkaController$BrokerChange process 1208
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic删除的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/TopicDeletionManager kafka$controller$TopicDeletionManager$$onTopicDeletion 268
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
scala/collection/immutable/Set$Set1 foreach 94
kafka/controller/TopicDeletionManager resumeDeletions 333
kafka/controller/TopicDeletionManager enqueueTopicsForDeletion 110
kafka/controller/KafkaController$TopicDeletion process 1280
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic创建或者修改的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerBrokerRequestBatch updateMetadataRequestBrokerSet 291
kafka/controller/ControllerBrokerRequestBatch newBatch 294
kafka/controller/PartitionStateMachine handleStateChanges 105
kafka/controller/KafkaController onNewPartitionCreation 499
kafka/controller/KafkaController onNewTopicCreation 485
kafka/controller/KafkaController$TopicChange process 1237
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic创建这个是从队列中拿到事件再处理的方式
队列是kafka.controller.ControllerEventManager.queue
放入过程如下,本质还是监听zk的path的child的变化:

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerEventManagerput 44
kafka/controller/TopicChangeListener handleChildChange 1712
org/I0Itec/zkclient/ZkClient$10 run 848
org/I0Itec/zkclient/ZkEventThread run 85

注册监听器的代码如下:

// class KafkaController
  private def registerTopicChangeListener() = {
    zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
  }

顺带说一下有6个地方订阅了zk的子节点的变化:

  • DynamicConfigManager.startup
  • registerTopicChangeListener
  • registerIsrChangeNotificationListener
  • registerTopicDeletionListener
  • registerBrokerChangeListener
  • registerLogDirEventNotificationListener

处理创建topic事件:

// ControllerChannelManager.scala  class ControllerBrokerRequestBatch
  def sendRequestsToBrokers(controllerEpoch: Int) {
  // .......
      val updateMetadataRequest = {
        val liveBrokers = if (updateMetadataRequestVersion == 0) {
          // .......
        } else {
          controllerContext.liveOrShuttingDownBrokers.map { broker =>
            val endPoints = broker.endPoints.map { endPoint =>
              new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
            }
            new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
          }
        }
        new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava,
          liveBrokers.asJava)
      }
      updateMetadataRequestBrokerSet.foreach { broker =>
        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
      }
      // .......
    }

topic创建时更新metadata再进一步的过程
构建发送请求事件放入发送队列等待发送线程发送
构建发送请求事件代码如下:

// ControllerChannelManager
  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
                  callback: AbstractResponse => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
        case None =>
          warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
      }
    }
  }

调用栈:

CLASS_NAMEMETHOD_NAMELINE_NUM
kafka/controller/ControllerChannelManagersendRequest81
kafka/controller/KafkaControllersendRequest662
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2apply405
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2apply405
scala/collection/mutable/HashMap$$anonfun$foreach$1apply130
scala/collection/mutable/HashMap$$anonfun$foreach$1apply130
scala/collection/mutable/HashTable$classforeachEntry241
scala/collection/mutable/HashMapforeachEntry40
scala/collection/mutable/HashMapforeach130
kafka/controller/ControllerBrokerRequestBatchsendRequestsToBrokers502
kafka/controller/PartitionStateMachinehandleStateChanges105
kafka/controller/KafkaControlleronNewPartitionCreation499
kafka/controller/KafkaControlleronNewTopicCreation485
kafka/controller/KafkaController$TopicChangeprocess1237
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply$mcV$sp53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply53
kafka/metrics/KafkaTimertime32
kafka/controller/ControllerEventManager$ControllerEventThreaddoWork64
kafka/utils/ShutdownableThreadrun70

发送线程发送请求:
代码如下:

// ControllerChannelManager.scala class RequestSendThread
  override def doWork(): Unit = {

    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))

    val QueueItem(apiKey, requestBuilder, callback) = queue.take()
    //...
    while (isRunning.get() && !isSendSuccessful) {
        // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
        // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
        try {
          if (!brokerReady()) {
            isSendSuccessful = false
            backoff()
          }
          else {
            val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
              time.milliseconds(), true)
            clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
            isSendSuccessful = true
          }
        } catch {
          case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
            warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
              "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                requestBuilder.toString, brokerNode.toString), e)
            networkClient.close(brokerNode.idString)
            isSendSuccessful = false
            backoff()
        }
      }
      // ......
  }

响应线程

CLASS_NAMEMETHOD_NAMELINE_NUM
kafka/server/MetadataCachekafka$server$MetadataCache$$addOrUpdatePartitionInfo150
kafka/utils/CoreUtils$inLock219
kafka/utils/CoreUtils$inWriteLock225
kafka/server/MetadataCacheupdateCache184
kafka/server/ReplicaManagermaybeUpdateMetadataCache988
kafka/server/KafkaApishandleUpdateMetadataRequest212
kafka/server/KafkaApishandle142
kafka/server/KafkaRequestHandlerrun72

线程信息: kafka-request-handler-5
partitionMetadataLock读写锁控制cache数据的读取与写入的线程安全。元数据信息在发送请求中已经构造好了。此处还涉live broker的更新等。

应该还要补充:leader切换和isr变化等

原文地址:https://www.cnblogs.com/simoncook/p/11809452.html