kafka源码分析(一)server启动分析

 

1 启动入口Kafka.scala

Kafka的启动入口是Kafka.scala的main()函数:

 1 def main(args: Array[String]): Unit = {
 2 
 3     try {
 4       //通过args读取properties
 5       val serverProps = getPropsFromArgs(args)
 6       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
 7 
 8       // 增加shutdown方法
 9       Runtime.getRuntime().addShutdownHook(new Thread() {
10         override def run() = {
11 
12           kafkaServerStartable.shutdown
13         }
14       })
15 
16       kafkaServerStartable.startup
17       kafkaServerStartable.awaitShutdown
18     }
19     catch {
20       case e: Throwable =>
21         fatal(e)
22         System.exit(1)
23     }
24     System.exit(0)
25   }

 上面代码主要包含:

从配置文件读取kafka服务器启动参数的getPropsFromArgs()方法;

  • 创建KafkaServerStartable对象;
  • KafkaServerStartable对象增加shutdown函数;
  • 启动KafkaServerStartable的starup()方法;
  • 启动KafkaServerStartable的awaitShutdown()方法;

2 KafkaServer的包装类KafkaServerStartable

 1 def startup() {
 2     try {
 3       server.startup()
 4     }
 5     catch {
 6       case e: Throwable =>
 7         fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
 8         System.exit(1)
 9     }
10   }

3 具体启动类KafkaServer

KafkaServer启动的代码层次比较清晰,加上注释,基本没有问题:

  1 /**
  2     * 启动接口
  3     * 生成Kafka server实例
  4     * 实例化LogManager、SocketServer和KafkaRequestHandlers
  5     */
  6   def startup() {
  7     try {
  8 
  9       if (isShuttingDown.get)
 10         throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
 11 
 12       if (startupComplete.get)
 13         return
 14 
 15       val canStartup = isStartingUp.compareAndSet(false, true)
 16       if (canStartup) {
 17         brokerState.newState(Starting)
 18 
 19         /* start scheduler */
 20         kafkaScheduler.startup()
 21 
 22         /* setup zookeeper */
 23         zkUtils = initZk()
 24 
 25         /* Get or create cluster_id */
 26         _clusterId = getOrGenerateClusterId(zkUtils)
 27         info(s"Cluster ID = $clusterId")
 28 
 29         /* generate brokerId */
 30         config.brokerId = getBrokerId
 31         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 32 
 33         /* create and configure metrics */
 34         val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
 35           Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
 36         reporters.add(new JmxReporter(jmxPrefix))
 37         val metricConfig = KafkaServer.metricConfig(config)
 38         metrics = new Metrics(metricConfig, reporters, time, true)
 39 
 40         quotaManagers = QuotaFactory.instantiate(config, metrics, time)
 41         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 42 
 43         /* start log manager */
 44         logManager = createLogManager(zkUtils.zkClient, brokerState)
 45         logManager.startup()
 46 
 47         metadataCache = new MetadataCache(config.brokerId)
 48         credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
 49 
 50         socketServer = new SocketServer(config, metrics, time, credentialProvider)
 51         socketServer.startup()
 52 
 53         /* start replica manager */
 54         replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
 55           isShuttingDown, quotaManagers.follower)
 56         replicaManager.startup()
 57 
 58         /* start kafka controller */
 59         kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
 60         kafkaController.startup()
 61 
 62         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
 63 
 64         /* start group coordinator */
 65         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
 66         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
 67         groupCoordinator.startup()
 68 
 69         /* Get the authorizer and initialize it if one is specified.*/
 70         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
 71           val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
 72           authZ.configure(config.originals())
 73           authZ
 74         }
 75 
 76         /* start processing requests */
 77         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
 78           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
 79           clusterId, time)
 80 
 81         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
 82           config.numIoThreads)
 83 
 84         Mx4jLoader.maybeLoad()
 85 
 86         /* start dynamic config manager */
 87         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
 88           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
 89           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
 90           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 91 
 92         // Create the config manager. start listening to notifications
 93         dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
 94         dynamicConfigManager.startup()
 95 
 96         /* tell everyone we are alive */
 97         val listeners = config.advertisedListeners.map { endpoint =>
 98           if (endpoint.port == 0)
 99             endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
100           else
101             endpoint
102         }
103         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
104           config.interBrokerProtocolVersion)
105         kafkaHealthcheck.startup()
106 
107         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
108         checkpointBrokerId(config.brokerId)
109 
110         /* register broker metrics */
111         registerStats()
112 
113         brokerState.newState(RunningAsBroker)
114         shutdownLatch = new CountDownLatch(1)
115         startupComplete.set(true)
116         isStartingUp.set(false)
117         AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
118         info("started")
119       }
120     }
121     catch {
122       case e: Throwable =>
123         fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
124         isStartingUp.set(false)
125         shutdown()
126         throw e
127     }
128   }

3.1 KafkaScheduler

KafkaScheduler是一个基于java.util.concurrent.ScheduledThreadPoolExecutor的调度器,它内部是以前缀kafka-scheduler-xx(xx是线程序列号)的线程池处理真正的工作。

 1 /**
 2   * KafkaScheduler是一个基于java.util.concurrent.ScheduledThreadPoolExecutor的scheduler
 3   * 它内部是以前缀kafka-scheduler-xx的线程池处理真正的工作
 4   *
 5   * @param threads          线程池里线程的数量
 6   * @param threadNamePrefix 使用时的线程名称,这个前缀将有一个附加的数字
 7   * @param daemon           如果为true,线程将是守护线程,并且不会阻塞jvm关闭
 8   */
 9 @threadsafe
10 class KafkaScheduler(val threads: Int,
11                      val threadNamePrefix: String = "kafka-scheduler-",
12                      daemon: Boolean = true) extends Scheduler with Logging {
13   private var executor: ScheduledThreadPoolExecutor = null
14   private val schedulerThreadId = new AtomicInteger(0)
15 
16   override def startup() {
17     debug("Initializing task scheduler.")
18     this synchronized {
19       if (isStarted)
20         throw new IllegalStateException("This scheduler has already been started!")
21       executor = new ScheduledThreadPoolExecutor(threads)
22       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
23       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
24       executor.setThreadFactory(new ThreadFactory() {
25         def newThread(runnable: Runnable): Thread =
26           Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
27       })
28     }
29   }

3.2 zk初始化

zookeeper初始化主要完成两件事情:

1     // 连接到zk服务器;创建通用节点
2     val zkUtils = ZkUtils(config.zkConnect,
3       sessionTimeout = config.zkSessionTimeoutMs,
4       connectionTimeout = config.zkConnectionTimeoutMs,
5       secureAclsEnabled)
6     zkUtils.setupCommonPaths()

通用节点包括:

 1   // 这些是在kafka代理启动时应该存在的路径
 2   val persistentZkPaths = Seq(ConsumersPath,
 3     BrokerIdsPath,
 4     BrokerTopicsPath,
 5     ConfigChangesPath,
 6     getEntityConfigRootPath(ConfigType.Topic),
 7     getEntityConfigRootPath(ConfigType.Client),
 8     DeleteTopicsPath,
 9     BrokerSequenceIdPath,
10     IsrChangeNotificationPath)

3.3 日志管理器LogManager

LogManager是kafka的子系统,负责log的创建,检索及清理。所有的读写操作由单个的日志实例来代理。

 1 /**
 2     * 启动后台线程,负责log的创建,检索及清理
 3     */
 4   def startup() {
 5     /* Schedule the cleanup task to delete old logs */
 6     if (scheduler != null) {
 7       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
 8       scheduler.schedule("kafka-log-retention",
 9         cleanupLogs,
10         delay = InitialTaskDelayMs,
11         period = retentionCheckMs,
12         TimeUnit.MILLISECONDS)
13       info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
14       scheduler.schedule("kafka-log-flusher",
15         flushDirtyLogs,
16         delay = InitialTaskDelayMs,
17         period = flushCheckMs,
18         TimeUnit.MILLISECONDS)
19       scheduler.schedule("kafka-recovery-point-checkpoint",
20         checkpointRecoveryPointOffsets,
21         delay = InitialTaskDelayMs,
22         period = flushCheckpointMs,
23         TimeUnit.MILLISECONDS)
24       scheduler.schedule("kafka-delete-logs",
25         deleteLogs,
26         delay = InitialTaskDelayMs,
27         period = defaultConfig.fileDeleteDelayMs,
28         TimeUnit.MILLISECONDS)
29     }
30     if (cleanerConfig.enableCleaner)
31       cleaner.startup()
32   }

3.4 SocketServer

1 /**
2   * SocketServer是socket服务器,
3   * 线程模型是:1个Acceptor线程处理新连接,Acceptor还有多个处理器线程,每个处理器线程拥有自己的选择器和多个读socket请求Handler线程。
4   * handler线程处理请求并产生响应写给处理器线程
5   */
6 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {

3.5 复制管理器ReplicaManager

启动ISR线程

1 def startup() {
2     // 启动ISR过期线程
3     // 一个follower可以在配置上落后于leader。在它被从ISR中移除之前,复制
4     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
5     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
6   }

3.6 kafka控制器KafkaController

当kafka 服务器的控制器模块启动时激活

1 def startup() = {
2     inLock(controllerContext.controllerLock) {
3       info("Controller starting up")
4       registerSessionExpirationListener()
5       isRunning = true
6       controllerElector.startup
7       info("Controller startup complete")
8     }
9   }

session过期监听器注册:

 1 private def registerSessionExpirationListener() = {
 2     zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
 3   }
 4     public void subscribeStateChanges(final IZkStateListener listener) {
 5         synchronized (_stateListener) {
 6             _stateListener.add(listener);
 7         }
 8     }
 9 
10 
11  class SessionExpirationListener() extends IZkStateListener with Logging {
12       this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
13       @throws(classOf[Exception])
14       def handleStateChanged(state: KeeperState) {
15         // do nothing, since zkclient will do reconnect for us.
16     }

选主过程:

def startup {
    inLock(controllerContext.controllerLock) {
      controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
      elect
    }
  }

def elect: Boolean = {
    val timestamp = SystemTime.milliseconds.toString
    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
   
   leaderId = getControllerID 
    /* 
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
     * it's possible that the controller has already been elected when we get here. This check will prevent the following 
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    if(leaderId != -1) {
       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
       return amILeader
    }

    try {
      val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                      electString,
                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
                                                      JaasUtils.isZkSecurityEnabled())
      zkCheckedEphemeral.create()
      info(brokerId + " successfully elected as leader")
      leaderId = brokerId
      onBecomingLeader()
    } catch {
      case e: ZkNodeExistsException =>
        // If someone else has written the path, then
        leaderId = getControllerID 

        if (leaderId != -1)
          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
        else
          warn("A leader has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
        resign()
    }
    amILeader
  }

 def amILeader : Boolean = leaderId == brokerId

3.7 GroupCoordinator

GroupCoordinator处理组成员管理和offset管理,每个kafka服务器初始化一个协作器来负责一系列组别。每组基于它们的组名来赋予协作器。

1 def startup() {
2     info("Starting up.")
3     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
4     joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
5     isActive.set(true)
6     info("Startup complete.")
7   }

注意:若同时需要一个组锁和元数据锁,请务必保证先获取组锁,然后获取元数据锁来防止死锁。

3.8 KafkaApis消息处理接口

 1 /**
 2    * Top-level method that handles all requests and multiplexes to the right api
 3    */
 4   def handle(request: RequestChannel.Request) {
 5     try{
 6       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
 7         format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
 8       request.requestId match {
 9         case RequestKeys.ProduceKey => handleProducerRequest(request)
10         case RequestKeys.FetchKey => handleFetchRequest(request)
11         case RequestKeys.OffsetsKey => handleOffsetRequest(request)
12         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
13         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
14         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
15         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
16         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
17         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
18         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
19         case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
20         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
21         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
22         case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
23         case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
24         case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
25         case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
26         case requestId => throw new KafkaException("Unknown api code " + requestId)
27       }
28     } catch {
29       case e: Throwable =>
30         if ( request.requestObj != null)
31           request.requestObj.handleError(e, requestChannel, request)
32         else {
33           val response = request.body.getErrorResponse(request.header.apiVersion, e)
34           val respHeader = new ResponseHeader(request.header.correlationId)
35 
36           /* If request doesn't have a default error response, we just close the connection.
37              For example, when produce request has acks set to 0 */
38           if (response == null)
39             requestChannel.closeConnection(request.processor, request)
40           else
41             requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
42         }
43         error("error when handling request %s".format(request.requestObj), e)
44     } finally
45       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
46   }

我们以处理消费者请求为例:

 1 /**
 2    * Handle a produce request
 3    */
 4   def handleProducerRequest(request: RequestChannel.Request) {
 5     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
 6     val numBytesAppended = produceRequest.sizeInBytes
 7 
 8     val (authorizedRequestInfo, unauthorizedRequestInfo) =  produceRequest.data.partition  {
 9       case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))
10     }
11 
12     // the callback for sending a produce response
13     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
14 
15       val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
16 
17       var errorInResponse = false
18 
19       mergedResponseStatus.foreach { case (topicAndPartition, status) =>
20         if (status.error != ErrorMapping.NoError) {
21           errorInResponse = true
22           debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
23             produceRequest.correlationId,
24             produceRequest.clientId,
25             topicAndPartition,
26             ErrorMapping.exceptionNameFor(status.error)))
27         }
28       }
29 
30       def produceResponseCallback(delayTimeMs: Int) {
31 
32         if (produceRequest.requiredAcks == 0) {
33           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
34           // the request, since no response is expected by the producer, the server will close socket server so that
35           // the producer client will know that some error has happened and will refresh its metadata
36           if (errorInResponse) {
37             val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
38               topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
39             }.mkString(", ")
40             info(
41               s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
42                 s"from client id ${produceRequest.clientId} with ack=0
" +
43                 s"Topic and partition to exceptions: $exceptionsSummary"
44             )
45             requestChannel.closeConnection(request.processor, request)
46           } else {
47             requestChannel.noOperation(request.processor, request)
48           }
49         } else {
50           val response = ProducerResponse(produceRequest.correlationId,
51                                           mergedResponseStatus,
52                                           produceRequest.versionId,
53                                           delayTimeMs)
54           requestChannel.sendResponse(new RequestChannel.Response(request,
55                                                                   new RequestOrResponseSend(request.connectionId,
56                                                                                             response)))
57         }
58       }
59 
60       // When this callback is triggered, the remote API call has completed
61       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
62 
63       quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
64                                                                    numBytesAppended,
65                                                                    produceResponseCallback)
66     }
67 
68     if (authorizedRequestInfo.isEmpty)
69       sendResponseCallback(Map.empty)
70     else {
71       val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
72 
73       // call the replica manager to append messages to the replicas
74       replicaManager.appendMessages(
75         produceRequest.ackTimeoutMs.toLong,
76         produceRequest.requiredAcks,
77         internalTopicsAllowed,
78         authorizedRequestInfo,
79         sendResponseCallback)
80 
81       // if the request is put into the purgatory, it will have a held reference
82       // and hence cannot be garbage collected; hence we clear its data here in
83       // order to let GC re-claim its memory since it is already appended to log
84       produceRequest.emptyData()
85     }
86   }

 

3.9 动态配置管理DynamicConfigManager

利用zookeeper做动态配置中心

 1 /**
 2    * Begin watching for config changes
 3    */
 4   def startup() {
 5     zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
 6     zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
 7     processAllConfigChanges()
 8   }
 9 
10   /**
11    * Process all config changes
12    */
13   private def processAllConfigChanges() {
14     val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
15     import JavaConversions._
16     processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
17   }
18 
19   /**
20    * Process the given list of config changes
21    */
22   private def processConfigChanges(notifications: Seq[String]) {
23     if (notifications.size > 0) {
24       info("Processing config change notification(s)...")
25       val now = time.milliseconds
26       for (notification <- notifications) {
27         val changeId = changeNumber(notification)
28 
29         if (changeId > lastExecutedChange) {
30           val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
31 
32           val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
33           processNotification(jsonOpt)
34         }
35         lastExecutedChange = changeId
36       }
37       purgeObsoleteNotifications(now, notifications)
38     }
39   }

3.10 心跳检测KafkaHealthcheck

心跳检测也使用zookeeper维持:

 1 def startup() {
 2     zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
 3     register()
 4   }
 5 
 6   /**
 7    * Register this broker as "alive" in zookeeper
 8    */
 9   def register() {
10     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
11     val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
12       if (endpoint.host == null || endpoint.host.trim.isEmpty)
13         EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType)
14       else
15         endpoint
16     )
17 
18     // the default host and port are here for compatibility with older client
19     // only PLAINTEXT is supported as default
20     // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
21     val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
22     zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
23   }

4 小结

kafka中KafkaServer类,是网络处理,io处理等的入口.

ReplicaManager    副本管理

KafkaApis    处理所有request的Proxy类,根据requestKey决定调用具体的handler

KafkaRequestHandlerPool 处理request的线程池,请求处理池

LogManager    kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据

TopicConfigManager  监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理

KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息

KafkaController  kafka集群中央控制器选举,leader选举,副本分配。

KafkaScheduler  负责副本管理和日志管理调度等等

ZkClient         负责注册zk相关信息.

BrokerTopicStats  topic信息统计和监控

ControllerStats          中央控制器统计和监控

原文地址:https://www.cnblogs.com/zcjcsl/p/8619484.html