spark streaming 接收kafka消息之四 -- 运行在 worker 上的 receiver

使用分布式receiver来获取数据
使用 WAL 来实现 At least once 操作:
conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 开启 WAL
// 1、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
// 2、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,但是会出现数据重复;
// 3、Exactly once - 每条数据只会被处理一次,没有数据会丢失,并且没有数据会被多次处理,这种语义是大家最想要的,但是也是最难实现的。

如果不做容错,将会带来数据丢失,因为Receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),Executor突然挂掉(或是driver挂掉通知executor关闭),缓存在内存中的数据就会丢失。因为这个问题,Spark1.2开始加入了WAL(Write ahead log)开启 WAL,将receiver获取数据的存储级别修改为StorageLevel. MEMORY_AND_DISK_SER_2

1 // 缺点,不能自己维护消费 topic partition 的 offset
2 // 优点,开启 WAL,来确保 exactly-once 语义
3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
4     ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)

从Kafka 中读取数据

Driver 规划 receiver 运行的信息

org.apache.spark.streaming.StreamingContext#start中启动了 JobScheduler实例

 1 // private[streaming] val scheduler = new JobScheduler(this)
 2 
 3 // Start the streaming scheduler in a new thread, so that thread local properties
 4 // like call sites and job groups can be reset without affecting those of the
 5 // current thread.
 6 ThreadUtils.runInNewThread("streaming-start") { // 单独的一个daemon线程运行函数题
 7   sparkContext.setCallSite(startSite.get)
 8   sparkContext.clearJobGroup()
 9   sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
10 // 执行start 方法
11   scheduler.start()
12 }
13 state = StreamingContextState.ACTIVE

org.apache.spark.streaming.scheduler.JobScheduler#start 源码如下:

 1 def start(): Unit = synchronized {
 2   if (eventLoop != null) return // scheduler has already been started
 3 
 4   logDebug("Starting JobScheduler")
 5   eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
 6     override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
 7 
 8     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
 9   }
10   eventLoop.start()
11 
12   // attach rate controllers of input streams to receive batch completion updates
13   for {
14     inputDStream <- ssc.graph.getInputStreams
15     rateController <- inputDStream.rateController
16   } ssc.addStreamingListener(rateController)
17 
18   listenerBus.start(ssc.sparkContext)
19   receiverTracker = new ReceiverTracker(ssc)
20   inputInfoTracker = new InputInfoTracker(ssc)
21   receiverTracker.start()
22   jobGenerator.start()
23   logInfo("Started JobScheduler")
24 }

ReceiverTracker 的类声明如下:

1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation.
2 此类负责执行ReceiverInputDStreams的receiver。必须在添加所有输入流并调用StreamingContext.start()之后创建此类的实例,因为它在实例化时需要最终的输入流集。

其 start 方法如下:

 1 /** Start the endpoint and receiver execution thread. */
 2 def start(): Unit = synchronized {
 3   if (isTrackerStarted) {
 4     throw new SparkException("ReceiverTracker already started")
 5   }
 6 
 7   if (!receiverInputStreams.isEmpty) {
 8 // 建立rpc endpoint
 9     endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,这是一个driver的 endpoint
10       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
11 // driver节点上发送启动 receiver 命令
12     if (!skipReceiverLaunch) launchReceivers()
13     logInfo("ReceiverTracker started")
14     trackerState = Started
15   }
16 }
17 
18 /**
19  * Get the receivers from the ReceiverInputDStreams, distributes them to the
20  * worker nodes as a parallel collection, and runs them.
21  */
22 // 从ReceiverInputDStreams 获取到 receivers,然后将它们分配到不同的 worker 节点并运行它们。
23 private def launchReceivers(): Unit = {
24   val receivers = receiverInputStreams.map(nis => {
25 // 未启用WAL 是KafkaReceiver,启动WAL后是ReliableKafkaReceiver
26     val rcvr = nis.getReceiver()
27     rcvr.setReceiverId(nis.id)
28     rcvr
29   })
30   // 运行一个简单的应用来确保所有的salve node都已经启动起来,避免所有的 receiver 任务都在同一个local node上
31   runDummySparkJob()
32 
33   logInfo("Starting " + receivers.length + " receivers")
34   endpoint.send(StartAllReceivers(receivers)) // 发送请求driver 转发 启动 receiver 的命令
35 }

Driver 端StartAllReceivers 的处理代码如下:

 1 override def receive: PartialFunction[Any, Unit] = {
 2   // Local messages
 3   case StartAllReceivers(receivers) =>
 4 // schduleReceiver
 5     val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
 6     for (receiver <- receivers) {
 7       val executors = scheduledLocations(receiver.streamId)
 8       updateReceiverScheduledExecutors(receiver.streamId, executors)
 9       receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
10       startReceiver(receiver, executors)
11     }
12 ……
13 }

getExecutors源码如下:

 1 /**
 2  * Get the list of executors excluding driver
 3  */
 4 // 如果是 local 模式,返回 本地线程; 如果是 yarn 模式,返回 非driver 节点上的 excutors
 5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
 6   if (ssc.sc.isLocal) { // 如果在 local 模式下运行
 7     val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
 8     Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
 9   } else { // 在 yarn 模式下,过滤掉 driver 的 executor
10     ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
11       blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
12     }.map { case (blockManagerId, _) =>
13       ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
14     }.toSeq
15   }
16 }

org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解释如下:

1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors:
2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host.
3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even.
4 This method is called when we start to launch receivers at the first time.
5 该方法就是确保receiver 能够在worker node 上均匀分布的。遵循以下两个原则:
6 1.使用 preferred location 分配 receiver 到这些node 上
7 2.将其他的未分配的receiver均匀分布均匀分布到 每一个 worker node 上 

org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 负责更新receiverid 和 receiver info 的映射关系,源码如下:

 1 private def updateReceiverScheduledExecutors(
 2     receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
 3   val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
 4     case Some(oldInfo) =>
 5       oldInfo.copy(state = ReceiverState.SCHEDULED,
 6         scheduledLocations = Some(scheduledLocations))
 7     case None =>
 8       ReceiverTrackingInfo(
 9         receiverId,
10         ReceiverState.SCHEDULED,
11         Some(scheduledLocations),
12         runningExecutor = None)
13   }
14   receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
15 }

Driver 发送分布式启动receiver job

startReceiver 负责启动 receiver,源码如下:

 1 /**
 2  * Start a receiver along with its scheduled executors
 3  */
 4 private def startReceiver(
 5     receiver: Receiver[_],
 6     scheduledLocations: Seq[TaskLocation]): Unit = {
 7   def shouldStartReceiver: Boolean = {
 8     // It's okay to start when trackerState is Initialized or Started
 9     !(isTrackerStopping || isTrackerStopped)
10   }
11 
12   val receiverId = receiver.streamId
13   if (!shouldStartReceiver) {
14     onReceiverJobFinish(receiverId)
15     return
16   }
17 
18   val checkpointDirOption = Option(ssc.checkpointDir)
19   val serializableHadoopConf =
20     new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
21 
22 // 在 worker node 上启动 receiver 的方法
23   val startReceiverFunc: Iterator[Receiver[_]] => Unit =
24     (iterator: Iterator[Receiver[_]]) => {
25       if (!iterator.hasNext) {
26         throw new SparkException(
27           "Could not start receiver as object not found.")
28       }
29       if (TaskContext.get().attemptNumber() == 0) {
30         val receiver = iterator.next()
31         assert(iterator.hasNext == false)
32         val supervisor = new ReceiverSupervisorImpl(
33           receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
34         supervisor.start()
35         supervisor.awaitTermination()
36       } else {
37         // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
38       }
39     }
40 
41   // Create the RDD using the scheduledLocations to run the receiver in a Spark job
42   val receiverRDD: RDD[Receiver[_]] =
43     if (scheduledLocations.isEmpty) {
44       ssc.sc.makeRDD(Seq(receiver), 1)
45     } else {
46       val preferredLocations = scheduledLocations.map(_.toString).distinct
47       ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
48     }
49   receiverRDD.setName(s"Receiver $receiverId")
50   ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
51   ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
52   // 提交分布式receiver 启动任务
53   val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
54     receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
55   // We will keep restarting the receiver job until ReceiverTracker is stopped
56   future.onComplete {
57     case Success(_) =>
58       if (!shouldStartReceiver) {
59         onReceiverJobFinish(receiverId)
60       } else {
61         logInfo(s"Restarting Receiver $receiverId")
62         self.send(RestartReceiver(receiver))
63       }
64     case Failure(e) =>
65       if (!shouldStartReceiver) {
66         onReceiverJobFinish(receiverId)
67       } else {
68         logError("Receiver has been stopped. Try to restart it.", e)
69         logInfo(s"Restarting Receiver $receiverId")
70         self.send(RestartReceiver(receiver))
71       }
72   }(submitJobThreadPool)
73   logInfo(s"Receiver ${receiver.streamId} started")
74 }

Worker节点启动 receiver监管服务

org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法如下:

 1 /** Start the supervisor */
 2 def start() {
 3   onStart()
 4   startReceiver()
 5 }
 6 override protected def onStart() { // 启动 BlockGenerator 服务
 7   registeredBlockGenerators.foreach { _.start() }
 8 }
 9 // startReceiver 方法如下:
10 /** Start receiver */
11 def startReceiver(): Unit = synchronized {
12   try {
13     if (onReceiverStart()) { // 注册receiver 成功
14       logInfo("Starting receiver")
15       receiverState = Started
16       receiver.onStart() // 启动 receiver
17       logInfo("Called receiver onStart")
18     } else {
19       // The driver refused us
20       stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
21     }
22   } catch {
23     case NonFatal(t) =>
24       stop("Error starting receiver " + streamId, Some(t))
25   }
26 }

注册 receiver 到 driver节点

1 override protected def onReceiverStart(): Boolean = {
2   val msg = RegisterReceiver(
3     streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
4   trackerEndpoint.askWithRetry[Boolean](msg)
5 }

简单描述一下driver 端做的事情,主要负责将其纳入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中来,具体streamid 和 ReceiverTrackingInfo 的映射关系保存在receiverTrackingInfos中。

org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver关键代码如下:

 1 val name = s"${typ}-${streamId}"
 2 val receiverTrackingInfo = ReceiverTrackingInfo(
 3   streamId,
 4   ReceiverState.ACTIVE,
 5   scheduledLocations = None,
 6   runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
 7   name = Some(name),
 8   endpoint = Some(receiverEndpoint))
 9 receiverTrackingInfos.put(streamId, receiverTrackingInfo)
10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

启动 receiver 线程

由于我们启用了 WAL, 所以 这里的receiver 是ReliableKafkaReceiver 的实例
receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源码如下:

 1 override def onStart(): Unit = {
 2   logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
 3 
 4   // Initialize the topic-partition / offset hash map.
 5 // 1. 负责维护消费的 topic-partition 和 offset 的映射关系
 6   topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
 7 
 8   // Initialize the stream block id / offset snapshot hash map.
 9 // 2. 负责维护 block-id 和 partition-offset 之间的映射关系
10   blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
11 
12   // Initialize the block generator for storing Kafka message.
13 // 3. 负责保存 kafka message 的 block generator,入参是GeneratedBlockHandler 实例,这是一个负责监听 block generator事件的一个监听器
14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 
15   blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
16   // 4. 关闭consumer 自动提交 offset 选项
17 // auto_offset_commit 应该是 false
18   if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
19     logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
20       "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
21   }
22 
23   val props = new Properties()
24   kafkaParams.foreach(param => props.put(param._1, param._2))
25   // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
26   // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka.
27   props.setProperty(AUTO_OFFSET_COMMIT, "false")
28 
29   val consumerConfig = new ConsumerConfig(props)
30 
31   assert(!consumerConfig.autoCommitEnable)
32 
33   logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
34 // 5. 初始化 consumer 对象
35 // consumerConnector 是ZookeeperConsumerConnector的实例
36   consumerConnector = Consumer.create(consumerConfig)
37   logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
38   // 6. 初始化zookeeper 的客户端
39   zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
40     consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
41    // 7. 创建线程池来处理消息流,池的大小是固定的,为partition 的总数,并指定线程池中每一个线程的name 的前缀,内部使用ThreadPoolExecutor,并且 创建线程的 factory类是guava 工具包提供的。
42   messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
43     topics.values.sum, "KafkaMessageHandler")
44    // 8. 启动 BlockGenerator内的两个线程
45   blockGenerator.start()
46 
47 // 9. 创建MessageStream对象
48   val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
49     .newInstance(consumerConfig.props)
50     .asInstanceOf[Decoder[K]]
51 
52   val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
53     .newInstance(consumerConfig.props)
54     .asInstanceOf[Decoder[V]]
55  
56   val topicMessageStreams = consumerConnector.createMessageStreams(
57     topics, keyDecoder, valueDecoder)
58 // 10. 将待处理的MessageHandler 放入 线程池中,等待执行
59   topicMessageStreams.values.foreach { streams =>
60     streams.foreach { stream =>
61       messageHandlerThreadPool.submit(new MessageHandler(stream))
62     }
63   }
64 }

其中, 第9 步,创建MessageStream对象,
kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法如下:

1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
2     : Map[String, List[KafkaStream[K,V]]] = {
3   if (messageStreamCreated.getAndSet(true))
4     throw new MessageStreamsExistException(this.getClass.getSimpleName +
5                                  " can create message streams at most once",null)
6   consume(topicCountMap, keyDecoder, valueDecoder)
7 }

其调用了 consume 方法,源码如下:

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
    : Map[String,List[KafkaStream[K,V]]] = {
  debug("entering consume ")
  if (topicCountMap == null)
    throw new RuntimeException("topicCountMap is null")
 // 1. 初始化 topicCount
  val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
 // 2. 获取 每一个topic 和 threadId 集合的映射关系
  val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

  // make a list of (queue,stream) pairs, one pair for each threadId
// 3. 得到每一个 threadId 对应 (queue, stream) 的映射列表
  val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
    threadIdSet.map(_ => {
      val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
      val stream = new KafkaStream[K,V](
        queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
      (queue, stream)
    })
  ).flatten.toList
 // 4. 获取 groupId 在 zookeeper 中的path
  val dirs = new ZKGroupDirs(config.groupId)
// 5. 注册 consumer 到 groupId(在zk中)
  registerConsumerInZK(dirs, consumerIdString, topicCount)
// 6. 重新初始化 consumer
  reinitializeConsumer(topicCount, queuesAndStreams)
  // 7. 返回流 
  loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}

consumer消费kafka数据

在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有如下操作:

 1 // 得到每一个 threadId 对应 (queue, stream) 的映射列表
 2   val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
 3     threadIdSet.map(_ => {
 4       val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
 5       val stream = new KafkaStream[K,V](
 6         queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
 7       (queue, stream)
 8     })
 9   ).flatten.toList
10  // 获取 groupId 在 zookeeper 中的path
11   val dirs = new ZKGroupDirs(config.groupId)
12 // 注册 consumer 到 groupId(在zk中)
13   registerConsumerInZK(dirs, consumerIdString, topicCount)
14 // 重新初始化 consumer
15   reinitializeConsumer(topicCount, queuesAndStreams)

在上面的代码中,可以看到初始化的queue(LinkedBlockingQueue实例)除了被传入stream(KafkaStream)的构造函数被迭代器从中取数据,还和 stream 重组成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,之后被传入reinitializeConsumer 方法中。
kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源码如下:

 1 private def reinitializeConsumer[K,V](
 2     topicCount: TopicCount,
 3     queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
 4  // 1. 获取 该groupid 在 zk 中的路径
 5   val dirs = new ZKGroupDirs(config.groupId)
 6 
 7   // listener to consumer and partition changes
 8 // 2. 初始化loadBalancerListener,这个负载均衡listener 会时刻监控 consumer 和 partition 的变化
 9   if (loadBalancerListener == null) {
10     val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
11     loadBalancerListener = new ZKRebalancerListener(
12       config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
13   }
14 
15   // create listener for session expired event if not exist yet
16   // 3. 监控 session 过期的listner, 有新session注册初始化,会通知 loadBalancer
17 if (sessionExpirationListener == null)
18     sessionExpirationListener = new ZKSessionExpireListener(
19       dirs, consumerIdString, topicCount, loadBalancerListener)
20 
21   // create listener for topic partition change event if not exist yet
22 // 4. 初始化ZKTopicPartitionChangeListener实例,当topic partition 变化时,这个listener会通知 loadBalancer
23   if (topicPartitionChangeListener == null)
24     topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
25  // 5. 将queuesAndStreams 的值经过一系列转换,并添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中
26   val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
27 
28   // map of {topic -> Set(thread-1, thread-2, ...)}
29   val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
30     topicCount.getConsumerThreadIdsPerTopic
31 
32   val allQueuesAndStreams = topicCount match {
33     case wildTopicCount: WildcardTopicCount => // 这里是WildcardTopicCount,走这个分支
34       /*
35        * Wild-card consumption streams share the same queues, so we need to
36        * duplicate the list for the subsequent zip operation.
37        */
38       (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
39     case statTopicCount: StaticTopicCount =>
40       queuesAndStreams
41   }
42 
43   val topicThreadIds = consumerThreadIdsPerTopic.map {
44     case(topic, threadIds) =>
45       threadIds.map((topic, _))
46   }.flatten
47 
48   require(topicThreadIds.size == allQueuesAndStreams.size,
49     "Mismatch between thread ID count (%d) and queue count (%d)"
50     .format(topicThreadIds.size, allQueuesAndStreams.size))
51   val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)
52 
53   threadQueueStreamPairs.foreach(e => {
54     val topicThreadId = e._1
55     val q = e._2._1
56     topicThreadIdAndQueues.put(topicThreadId, q)
57     debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
58     newGauge(
59       "FetchQueueSize",
60       new Gauge[Int] {
61         def value = q.size
62       },
63       Map("clientId" -> config.clientId,
64         "topic" -> topicThreadId._1,
65         "threadId" -> topicThreadId._2.threadId.toString)
66     )
67   })
68 
69   val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
70   groupedByTopic.foreach(e => {
71     val topic = e._1
72     val streams = e._2.map(_._2._2).toList
73     topicStreamsMap += (topic -> streams)
74     debug("adding topic %s and %d streams to map.".format(topic, streams.size))
75   })
76 
77   // listener to consumer and partition changes
78 // 6. 使用 zkClient 注册sessionExpirationListener 实例
79   zkClient.subscribeStateChanges(sessionExpirationListener)
80  // 7. 使用 zkClient 注册loadBalancerListener 实例
81   zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
82  // 遍历每一个topic,使用zkClient 注册topicPartitionChangeListener 实例
83   topicStreamsMap.foreach { topicAndStreams =>
84     // register on broker partition path changes
85     val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
86     zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
87   }
88 
89   // explicitly trigger load balancing for this consumer
90 // 8. 使用 loadBalancerListener 同步做负载均衡
91   loadBalancerListener.syncedRebalance()
92 }

重点看 第 8 步,使用 loadBalancerListener 同步做负载均衡。
kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源码如下:

 1 def syncedRebalance() {
 2   rebalanceLock synchronized {
 3     rebalanceTimer.time {
 4       if(isShuttingDown.get())  { // 如果ZookeeperConsumerConnector
 5 已经shutdown了,直接返回
 6         return
 7       } else {
 8         for (i <- 0 until config.rebalanceMaxRetries) { // 默认是 4 次
 9           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
10           var done = false
11           var cluster: Cluster = null
12           try {
13             // 1. 根据zkClient 实例 获取并创建Cluster 对象,这个 cluster 实例包含了一个 Broker(broker的id,broker在zk中的路径) 列表
14             cluster = getCluster(zkClient) 
15             // 2. 在cluster中做 rebalance操作
16             done = rebalance(cluster)
17           } catch {
18             case e: Throwable =>
19               /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
20                 * For example, a ZK node can disappear between the time we get all children and the time we try to get
21                 * the value of a child. Just let this go since another rebalance will be triggered.
22                 **/
23               info("exception during rebalance ", e)
24           }
25           info("end rebalancing consumer " + consumerIdString + " try #" + i)
26           if (done) {
27             return
28           } else {
29             /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
30              * clear the cache */
31             info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
32           }
33           // stop all fetchers and clear all the queues to avoid data duplication
34           closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
35           Thread.sleep(config.rebalanceBackoffMs)
36         }
37       }
38     }
39   }
40 
41   throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
42 }

重点看 第2 步,在 cluster 中做 rebalance 操作,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源码如下:

 1 private def rebalance(cluster: Cluster): Boolean = {
 2   // 1. 获取 group和 threadId 的Map 映射关系
 3   val myTopicThreadIdsMap = TopicCount.constructTopicCount(
 4     group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
 5   // 2. 获取kafka cluster 中所有可用的node
 6   val brokers = getAllBrokersInCluster(zkClient)
 7   if (brokers.size == 0) { // 如果可用节点为空,设置listener订阅,返回 true
 8     // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
 9     // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
10     // are up.
11     warn("no brokers found when trying to rebalance.")
12     zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
13     true
14   }
15   else {
16     /**
17      * fetchers must be stopped to avoid data duplication, since if the current
18      * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
19      * But if we don't stop the fetchers first, this consumer would continue returning data for released
20      * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
21      */
22    // 3. 做rebalance 之前的准备工作
23    // 3.1. 关闭现有 fetcher 连接
24     closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
25    // 3.2 释放 partition 的所有权(主要是删除zk下的owner 节点的数据以及解除内存中的topic和 fetcher的关联关系)
26     releasePartitionOwnership(topicRegistry)
27    // 3.3. 重新给partition分配 fetcher
28     val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
29     val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
30     val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
31       valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
32 
33     // fetch current offsets for all topic-partitions
34     // 3.4 获取当前fetcher对应的 partitions 的 offsets,这里的offset是指 consumer 下一个要消费的offset
35     val topicPartitions = partitionOwnershipDecision.keySet.toSeq
36 
37     val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
38 
39     if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
40       false
41     else {
42       // 3.5 更新 partition 和 fetcher 的对应关系
43       val offsetFetchResponse = offsetFetchResponseOpt.get
44       topicPartitions.foreach(topicAndPartition => {
45         val (topic, partition) = topicAndPartition.asTuple
46 // requestInfo是OffsetFetchResponse实例中的成员变量,它是一个Map[TopicAndPartition, OffsetMetadataAndError]实例
47         val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
48         val threadId = partitionOwnershipDecision(topicAndPartition)
49         addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
50       })
51 
52       /**
53        * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
54        * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
55        */
56       if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
57         allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
58 
59         partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
60                                   .foreach { case (topic, partitionThreadPairs) =>
61           newGauge("OwnedPartitionsCount",
62             new Gauge[Int] {
63               def value() = partitionThreadPairs.size
64             },
65             ownedPartitionsCountMetricTags(topic))
66         }
67         // 3.6 将已经新的 topic registry 覆盖旧的
68         topicRegistry = currentTopicRegistry
69 // 4. 更新 fetcher
70         updateFetcher(cluster)
71         true
72       } else {
73         false
74       }
75     }
76   }
77 }

其中addPartitionTopicInfo 源码如下:

 1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
 2                                     partition: Int, topic: String,
 3                                     offset: Long, consumerThreadId: ConsumerThreadId) {
 4 //如果map没有对应的 key,会使用valueFactory初始化键值对,并返回 对应的 value
 5     val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)
 6 
 7     val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
 8     val consumedOffset = new AtomicLong(offset)
 9     val fetchedOffset = new AtomicLong(offset)
10     val partTopicInfo = new PartitionTopicInfo(topic,
11                                                partition,
12                                                queue,
13                                                consumedOffset,
14                                                fetchedOffset,
15                                                new AtomicInteger(config.fetchMessageMaxBytes),
16                                                config.clientId)
17     // 1. 将其注册到新的 Topic注册中心中,即注册 partition 和 fetcher 的关系
18 partTopicInfoMap.put(partition, partTopicInfo)
19     debug(partTopicInfo + " selected new offset " + offset)
20 // 2. 更新consumer 的 已经消费的offset信息
21     checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
22   }
23 }

第4步, 更新 fetcher 源码如下:

 1 private def updateFetcher(cluster: Cluster) {
 2   // update partitions for fetcher
 3   var allPartitionInfos : List[PartitionTopicInfo] = Nil
 4   for (partitionInfos <- topicRegistry.values)
 5     for (partition <- partitionInfos.values)
 6       allPartitionInfos ::= partition
 7   info("Consumer " + consumerIdString + " selected partitions : " +
 8     allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
 9 
10   fetcher match {
11     case Some(f) =>
12       f.startConnections(allPartitionInfos, cluster)
13     case None =>
14   }
15 }

其中,f.startConnections方法真正执行 更新操作。此时引入一个新的类。即 fetcher 类,kafka.consumer.ConsumerFetcherManager。

kafka.consumer.ConsumerFetcherManager#startConnections 的源码如下:

 1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
 2 // LeaderFinderThread 在 topic 的leader node可用时,将 fetcher 添加到 leader 节点上
 3   leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
 4   leaderFinderThread.start()
 5 
 6   inLock(lock) {
 7 // 更新ConsumerFetcherManager 成员变量
 8     partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
 9     this.cluster = cluster
10     noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
11     cond.signalAll()
12   }
13 }

ConsumerFetcherManager 有一个LeaderFinderThread 实例,该类的父类kafka.utils.ShutdownableThread ,run 方法如下:

 1 override def run(): Unit = {
 2   info("Starting ")
 3   try{
 4     while(isRunning.get()){
 5       doWork()
 6     }
 7   } catch{
 8     case e: Throwable =>
 9       if(isRunning.get())
10         error("Error due to ", e)
11   }
12   shutdownLatch.countDown()
13   info("Stopped ")
14 }

doWork其实就是一个抽象方法,其子类LeaderFinderThread的实现如下:

 1 // thread responsible for adding the fetcher to the right broker when leader is available
 2 override def doWork() {
 3 // 1. 获取 partition 和leader node的映射关系
 4   val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
 5   lock.lock()
 6   try {
 7     while (noLeaderPartitionSet.isEmpty) { // 这个字段在startConnections 已更新新值
 8       trace("No partition for leader election.")
 9       cond.await()
10     }
11 
12     trace("Partitions without leader %s".format(noLeaderPartitionSet))
13     val brokers = getAllBrokersInCluster(zkClient) // 获取所有可用broker 节点
14     // 获取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息
15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
16                                                         brokers,
17                                                         config.clientId,
18                                                         config.socketTimeoutMs,
19                                                         correlationId.getAndIncrement).topicsMetadata
20     if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
21 // 2. 根据获取到的 partition 和 leader node 的关系更新noLeaderPartitionSet 和leaderForPartitionsMap 两个map集合,其中noLeaderPartitionSet 包含的是没有确定leader 的 partition 集合,leaderForPartitionsMap 是 已经确定了 leader 的 partition 集合。
22     topicsMetadata.foreach { tmd =>
23       val topic = tmd.topic
24       tmd.partitionsMetadata.foreach { pmd =>
25         val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
26         if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
27           val leaderBroker = pmd.leader.get
28           leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
29           noLeaderPartitionSet -= topicAndPartition
30         }
31       }
32     }
33   } catch {
34     case t: Throwable => {
35         if (!isRunning.get())
36           throw t /* If this thread is stopped, propagate this exception to kill the thread. */
37         else
38           warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
39       }
40   } finally {
41     lock.unlock()
42   }
43 
44   try {
45 // 3. 具体为 partition 分配 fetcher
46     addFetcherForPartitions(leaderForPartitionsMap.map{
47       case (topicAndPartition, broker) =>
48         topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
49     )
50   } catch {
51     case t: Throwable => {
52       if (!isRunning.get())
53         throw t /* If this thread is stopped, propagate this exception to kill the thread. */
54       else {
55         warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
56         lock.lock()
57         noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
58         lock.unlock()
59       }
60     }
61   }
62   // 4. 关闭空闲fetcher线程
63   shutdownIdleFetcherThreads()
64   Thread.sleep(config.refreshLeaderBackoffMs)
65 }

重点看第3 步,具体为 partition 分配 fetcher,addFetcherForPartitions 源码如下:

 1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
 2   mapLock synchronized {
 3 // 获取 fetcher 和 partition的映射关系
 4     val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
 5       BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
 6     for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
 7 
 8       var fetcherThread: AbstractFetcherThread = null
 9       fetcherThreadMap.get(brokerAndFetcherId) match {
10         case Some(f) => fetcherThread = f
11         case None =>
12 // 根据brokerAndFetcherId 去初始化Fetcher并启动 fetcher
13           fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
14           fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
15           fetcherThread.start
16       }
17 
18       fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
19         topicAndPartition -> brokerAndInitOffset.initOffset
20       })
21     }
22   }
23 
24   info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
25     "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
26 }

kafka.consumer.ConsumerFetcherManager#createFetcherThread的源码如下:

1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
2   new ConsumerFetcherThread(
3     "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
4     config, sourceBroker, partitionMap, this)
5 }

先来看ConsumerFetcherThread的构造方法声明:

 1 class ConsumerFetcherThread(name: String,
 2                             val config: ConsumerConfig,
 3                             sourceBroker: Broker,
 4                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
 5                             val consumerFetcherManager: ConsumerFetcherManager)
 6         extends AbstractFetcherThread(name = name, 
 7                                       clientId = config.clientId,
 8                                       sourceBroker = sourceBroker,
 9                                       socketTimeout = config.socketTimeoutMs,
10                                       socketBufferSize = config.socketReceiveBufferBytes,
11                                       fetchSize = config.fetchMessageMaxBytes,
12                                       fetcherBrokerId = Request.OrdinaryConsumerId,
13                                       maxWait = config.fetchWaitMaxMs,
14                                       minBytes = config.fetchMinBytes,
15                                       isInterruptible = true)

注意,partitionMap 中的value 是PartitionTopicInfo ,这个对象中封装了存放fetch结果值的BlockingQueue[FetchedDataChunk] 实例。
再来看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面我们已经看过了,主要看该子类是如何重新 doWork方法的:

 1 override def doWork() {
 2   inLock(partitionMapLock) { // 加锁,执行,释放锁
 3     if (partitionMap.isEmpty) // 如果没有需要执行的 fetch 操作,等待200ms后返回
 4       partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
 5     partitionMap.foreach { // 将所有的 fetch 的信息添加到fetchRequestBuilder中
 6       case((topicAndPartition, offset)) =>
 7         fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
 8                          offset, fetchSize)
 9     }
10   }
11   // 构建批抓取的fetchRequest对象
12   val fetchRequest = fetchRequestBuilder.build()
13 // 处理 FetchRequest
14   if (!fetchRequest.requestInfo.isEmpty)
15     processFetchRequest(fetchRequest)
16 }

其中 kafka.server.AbstractFetcherThread#processFetchRequest 源码如下:

 1 private def processFetchRequest(fetchRequest: FetchRequest) {
 2   val partitionsWithError = new mutable.HashSet[TopicAndPartition]
 3   var response: FetchResponse = null
 4   try {
 5     trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
 6 // 发送请求,并获取返回值。
 7 // simpleConsumer  就是SimpleConsumer 实例,已作说明,不再赘述。
 8     response = simpleConsumer.fetch(fetchRequest)
 9   } catch {
10     case t: Throwable =>
11       if (isRunning.get) {
12         warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
13         partitionMapLock synchronized {
14           partitionsWithError ++= partitionMap.keys
15         }
16       }
17   }
18   fetcherStats.requestRate.mark()
19 
20   if (response != null) {
21     // process fetched data
22     inLock(partitionMapLock) { // 获取锁,执行处理response 操作,释放锁
23       response.data.foreach {
24         case(topicAndPartition, partitionData) =>
25           val (topic, partitionId) = topicAndPartition.asTuple
26           val currentOffset = partitionMap.get(topicAndPartition)
27           // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
28           if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
29             partitionData.error match { // 根据返回码来确定具体执行哪部分处理逻辑
30               case ErrorMapping.NoError => // 成功返回,没有错误
31                 try {
32                   val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
33                   val validBytes = messages.validBytes
34                   val newOffset = messages.shallowIterator.toSeq.lastOption match {
35                     case Some(m: MessageAndOffset) => m.nextOffset
36                     case None => currentOffset.get
37                   }
38                   partitionMap.put(topicAndPartition, newOffset)
39                   fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
40                   fetcherStats.byteRate.mark(validBytes)
41                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
42                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
43                 } catch {
44                   case ime: InvalidMessageException => // 消息获取不完整
45                     // we log the error and continue. This ensures two things
46                     // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
47                     // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
48                     //    should get fixed in the subsequent fetches
49                     logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
50                   case e: Throwable =>
51                     throw new KafkaException("error processing data for partition [%s,%d] offset %d"
52                                              .format(topic, partitionId, currentOffset.get), e)
53                 }
54               case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error
55                 try {
56                   val newOffset = handleOffsetOutOfRange(topicAndPartition)
57                   partitionMap.put(topicAndPartition, newOffset)
58                   error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
59                     .format(currentOffset.get, topic, partitionId, newOffset))
60                 } catch {
61                   case e: Throwable =>
62                     error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
63                     partitionsWithError += topicAndPartition
64                 }
65               case _ =>
66                 if (isRunning.get) {
67                   error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
68                     ErrorMapping.exceptionFor(partitionData.error).getClass))
69                   partitionsWithError += topicAndPartition
70                 }
71             }
72           }
73       }
74     }
75   }
76 
77   if(partitionsWithError.size > 0) {
78     debug("handling partitions with error for %s".format(partitionsWithError))
79     handlePartitionsWithErrors(partitionsWithError)
80   }
81 }

其中processPartitionData 源码如下,它负责处理具体的返回消息:

 1 // process fetched data
 2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
 3 // partitionMap 是一个成员变量,在构造函数中作为入参
 4   val pti = partitionMap(topicAndPartition)
 5   if (pti.getFetchOffset != fetchOffset)
 6     throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
 7                               .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
 8 // 数据入队
 9   pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
10 }

可以看到,终于在这里,把从leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 缓冲堵塞队列中。

KafkaStream从queue中堵塞式获取数据

KafkaStream 是依赖于 LinkedBlockingQueue 的同理 KafkaStream 也会返回一个迭代器 kafka.consumer.ConsumerIterator,用于迭代访问 KafkaStream 中的数据。
kafka.consumer.ConsumerIterator 的主要源码如下:

 1 // 判断是否有下一个元素
 2 def hasNext(): Boolean = {
 3   if(state == FAILED)
 4     throw new IllegalStateException("Iterator is in failed state")
 5   state match {
 6     case DONE => false
 7     case READY => true
 8     case _ => maybeComputeNext()
 9   }
10 }
11 // 获取下一个元素,父类实现
12 def next(): T = {
13   if(!hasNext())
14     throw new NoSuchElementException()
15   state = NOT_READY
16   if(nextItem == null)
17     throw new IllegalStateException("Expected item but none found.")
18   nextItem
19 }
20 // 获取下一个元素,使用子类ConsumerIterator实现
21 override def next(): MessageAndMetadata[K, V] = {
22   val item = super.next() // 调用父类实现
23   if(consumedOffset < 0)
24     throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
25   currentTopicInfo.resetConsumeOffset(consumedOffset)
26   val topic = currentTopicInfo.topic
27   trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
28   consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
29   consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
30   item
31 }
32  // 或许有,尝试计算一下下一个
33 def maybeComputeNext(): Boolean = {
34   state = FAILED
35   nextItem = makeNext()
36   if(state == DONE) {
37     false
38   } else {
39     state = READY
40     true
41   }
42 }
43 // 创建下一个元素,这个在子类ConsumerIterator中有实现
44 protected def makeNext(): MessageAndMetadata[K, V] = {
45 // 首先channel 是 LinkedBlockingQueue实例, 是 KafkaStream 中的 queue 成员变量,queue 成员变量
46   var currentDataChunk: FetchedDataChunk = null
47   // if we don't have an iterator, get one
48   var localCurrent = current.get() 
49 // 如果没有迭代器或者是没有下一个元素了,需要从channel中取一个
50   if(localCurrent == null || !localCurrent.hasNext) {
51 // 删除并返回队列的头节点。
52     if (consumerTimeoutMs < 0)
53       currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素
54     else {
55       currentDataChunk = channel.poll(consumerTimeoutMs,  TimeUnit.MILLISECONDS) // 阻塞方法,等待指定时间,超时也会返回
56       if (currentDataChunk == null) { // 如果没有数据,重置状态为NOT_READY
57         // reset state to make the iterator re-iterable
58         resetState()
59         throw new ConsumerTimeoutException
60       }
61     }
62 // 关闭命令
63     if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
64       debug("Received the shutdown command")
65       return allDone // 该函数将状态设为DONE, 返回null
66     } else {
67       currentTopicInfo = currentDataChunk.topicInfo
68       val cdcFetchOffset = currentDataChunk.fetchOffset
69       val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
70       if (ctiConsumeOffset < cdcFetchOffset) {
71         error("consumed offset: %d doesn't match fetch offset: %d for %s;
 Consumer may lose data"
72           .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
73         currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
74       }
75       localCurrent = currentDataChunk.messages.iterator
76 
77       current.set(localCurrent)
78     }
79     // if we just updated the current chunk and it is empty that means the fetch size is too small!
80     if(currentDataChunk.messages.validBytes == 0)
81       throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
82                                              "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
83                                              .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
84   }
85   var item = localCurrent.next()
86   // reject the messages that have already been consumed
87   while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
88     item = localCurrent.next()
89   }
90   consumedOffset = item.nextOffset
91 
92   item.message.ensureValid() // validate checksum of message to ensure it is valid
93  // 返回处理封装好的 kafka 数据
94   new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
95 }

消费到的数据cache 到WAL中

我们再来看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相应的代码:

1 // 10. 将待处理的MessageHandler 放入 线程池中,等待执行
2   topicMessageStreams.values.foreach { streams =>
3     streams.foreach { stream =>
4       messageHandlerThreadPool.submit(new MessageHandler(stream))
5     }
6   }

其中 MessageHandler 是一个 Runnable 对象,其 run 方法如下:

 1 override def run(): Unit = {
 2   while (!isStopped) {
 3     try {
 4 // 1. 获取ConsumerIterator 迭代器对象
 5       val streamIterator = stream.iterator()
 6       // 2. 遍历迭代器中获取每一条数据,并且保存message和相应的 metadata 信息
 7 while (streamIterator.hasNext) {
 8         storeMessageAndMetadata(streamIterator.next)
 9       }
10     } catch {
11       case e: Exception =>
12         reportError("Error handling message", e)
13     }
14   }
15 }

其中第二步中关键方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法如下:

1 /** Store a Kafka message and the associated metadata as a tuple. */
2 private def storeMessageAndMetadata(
3     msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
4   val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
5   val data = (msgAndMetadata.key, msgAndMetadata.message)
6   val metadata = (topicAndPartition, msgAndMetadata.offset)
7 // 添加数据到 block
8   blockGenerator.addDataWithCallback(data, metadata)
9 }

addDataWithCallback 源码如下:

 1 /**
 2  * Push a single data item into the buffer. After buffering the data, the
 3  * `BlockGeneratorListener.onAddData` callback will be called.
 4  */
 5 def addDataWithCallback(data: Any, metadata: Any): Unit = {
 6   if (state == Active) {
 7     waitToPush()
 8     synchronized {
 9       if (state == Active) {
10 // 1. 将数据放入 buffer 中,以便处理线程从中获取数据
11         currentBuffer += data
12 // 2. 在启动 receiver线程中,可以知道listener 是指GeneratedBlockHandler 实例
13         listener.onAddData(data, metadata)
14       } else {
15         throw new SparkException(
16           "Cannot add data as BlockGenerator has not been started or has been stopped")
17       }
18     }
19   } else {
20     throw new SparkException(
21       "Cannot add data as BlockGenerator has not been started or has been stopped")
22   }
23 }

第二步比较简单,先看一下第二步:
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源码如下:

 1 def onAddData(data: Any, metadata: Any): Unit = {
 2   // Update the offset of the data that was added to the generator
 3   if (metadata != null) {
 4     val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
 5     updateOffset(topicAndPartition, offset)
 6   }
 7 }
 8 // 这里的 updateOffset 调用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源码如下:
 9 /** Update stored offset */
10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
11   topicPartitionOffsetMap.put(topicAndPartition, offset)
12 }

第一步的原理如下:
在 BlockGenerator中有一个定时器,定时(200ms)去执行检查currentBuffer是否为empty任务, 若不为空,则执行如下操作并把它放入等待生成block 的队列中,有两外一个线程来时刻监听这个队列,有数据,则执行pushBlock 操作。
第一个定时器线程如下:

 1 private val blockIntervalTimer =
 2   new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
 3 
 4 // 其中,updateCurrentBuffer 方法如下
 5 /** Change the buffer to which single records are added to. */
 6 private def updateCurrentBuffer(time: Long): Unit = {
 7   try {
 8     var newBlock: Block = null
 9     synchronized {
10       if (currentBuffer.nonEmpty) {
11         val newBlockBuffer = currentBuffer
12         currentBuffer = new ArrayBuffer[Any]
13         val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
14         listener.onGenerateBlock(blockId)
15         newBlock = new Block(blockId, newBlockBuffer)
16       }
17     }
18 
19     if (newBlock != null) {
20       blocksForPushing.put(newBlock)  // put is blocking when queue is full
21     }
22   } catch {
23     case ie: InterruptedException =>
24       logInfo("Block updating timer thread was interrupted")
25     case e: Exception =>
26       reportError("Error in block updating thread", e)
27   }
28 }
29 
30 // listener.onGenerateBlock(blockId) 代码如下:
31 def onGenerateBlock(blockId: StreamBlockId): Unit = {
32   // Remember the offsets of topics/partitions when a block has been generated
33   rememberBlockOffsets(blockId)
34 }
35 // rememberBlockOffsets 代码如下:
36 /**
37  * Remember the current offsets for each topic and partition. This is called when a block is
38  * generated.
39  */
40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
41   // Get a snapshot of current offset map and store with related block id.
42   val offsetSnapshot = topicPartitionOffsetMap.toMap
43   blockOffsetMap.put(blockId, offsetSnapshot)
44   topicPartitionOffsetMap.clear()
45 }
46 // 可以看出,主要是清除 topic-partition-> offset 映射关系
47 // 建立 block 和topic-partition-> offset的映射关系

其中,blocksForPushing是一个有界阻塞队列,另外一个线程会一直轮询它。

 1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
 2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
 3 
 4 /** Keep pushing blocks to the BlockManager. */
 5 // 这个方法主要的作用就是一直不停地轮询blocksForPushing队列,并处理相应的push block 事件。
 6 private def keepPushingBlocks() {
 7   logInfo("Started block pushing thread")
 8 
 9   def areBlocksBeingGenerated: Boolean = synchronized {
10     state != StoppedGeneratingBlocks
11   }
12 
13   try {
14     // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
15     while (areBlocksBeingGenerated) { // 线程没有被停止,则一直循环
16 // 超时poll操作获取并删除头节点,超过时间(10ms)则返回
17       Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
18         case Some(block) => pushBlock(block) // 如果有数据则进行处理。
19         case None =>
20       }
21     }
22 
23     // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
24     logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
25     while (!blocksForPushing.isEmpty) { // 如果队列中还有数据,继续进行处理
26       val block = blocksForPushing.take() // 这是一个堵塞方法,不过现在会马上返回,因为队列里面有数据。
27       logDebug(s"Pushing block $block")
28       pushBlock(block) // 处理数据
29       logInfo("Blocks left to push " + blocksForPushing.size())
30     }
31     logInfo("Stopped block pushing thread")
32   } catch {
33     case ie: InterruptedException =>
34       logInfo("Block pushing thread was interrupted")
35     case e: Exception =>
36       reportError("Error in block pushing thread", e)
37   }
38 }

其中的pushBlock源码如下:

1 private def pushBlock(block: Block) {
2   listener.onPushBlock(block.id, block.buffer)
3   logInfo("Pushed block " + block.id)
4 }

其调用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源码如下:

1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
2   // Store block and commit the blocks offset
3   storeBlockAndCommitOffset(blockId, arrayBuffer)
4 }

其中,storeBlockAndCommitOffset具体代码如下:

 1 /**
 2  * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
 3  * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
 4  */
 5 private def storeBlockAndCommitOffset(
 6     blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
 7   var count = 0
 8   var pushed = false
 9   var exception: Exception = null
10   while (!pushed && count <= 3) { // 整个过程,总共允许3 次重试
11     try {
12       store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
13       pushed = true
14     } catch {
15       case ex: Exception =>
16         count += 1
17         exception = ex
18     }
19   }
20   if (pushed) { // 已经push block
21 // 更新 offset
22     Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
23 // 如果已经push 到 BlockManager 中,则不会再保留 block和topic-partition-> offset的映射关系
24     blockOffsetMap.remove(blockId)
25   } else {
26     stop("Error while storing block into Spark", exception)
27   }
28 }
29 // 其中,commitOffset源码如下:
30 /**
31  * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
32  * metadata schema in Zookeeper.
33  */
34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
35   if (zkClient == null) {
36     val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
37     stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
38     return
39   }
40 
41   for ((topicAndPart, offset) <- offsetMap) {
42     try {
43 // 获取在 zk 中 comsumer 的partition的目录
44       val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
45       val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
46       // 更新 consumer 的已消费topic-partition 的offset 操作
47       ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
48     } catch {
49       case e: Exception =>
50         logWarning(s"Exception during commit offset $offset for topic" +
51           s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
52     }
53 
54     logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
55       s"partition ${topicAndPart.partition}")
56   }
57 }

关键方法store 如下:

1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
2 def store(dataBuffer: ArrayBuffer[T]) {
3   supervisor.pushArrayBuffer(dataBuffer, None, None)
4 }

其调用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl实例)的pushArrayBuffer方法,内部操作如下:

1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
2 def pushArrayBuffer(
3     arrayBuffer: ArrayBuffer[_],
4     metadataOption: Option[Any],
5     blockIdOption: Option[StreamBlockId]
6   ) {
7   pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
8 }

org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源码如下:

 1 /** Store block and report it to driver */
 2 def pushAndReportBlock(
 3     receivedBlock: ReceivedBlock,
 4     metadataOption: Option[Any],
 5     blockIdOption: Option[StreamBlockId]
 6   ) {
 7 // 1.准备blockId,time等信息
 8   val blockId = blockIdOption.getOrElse(nextBlockId)
 9   val time = System.currentTimeMillis
10 // 2. 执行存储 block 操作
11   val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
12   logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
13 // 3. 获取保存的message 的记录数
14   val numRecords = blockStoreResult.numRecords
15 // 4. 通知trackerEndpoint已经添加block,执行更新driver 的WAL操作
16   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
17   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
18   logDebug(s"Reported block $blockId")
19 }

其中,receivedBlockHandler 的赋值语句如下:

 1 private val receivedBlockHandler: ReceivedBlockHandler = {
 2   if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
 3     if (checkpointDirOption.isEmpty) {
 4       throw new SparkException(
 5         "Cannot enable receiver write-ahead log without checkpoint directory set. " +
 6           "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
 7           "See documentation for more details.")
 8     }
 9 // enable WAL并且checkpoint dir 不为空,即,在这里,返回WriteAheadLogBasedBlockHandler 对象,这个对象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息
10     new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
11       receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
12   } else {
13     new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
14   }
15 }

ReceivedBlockHandler 的 storeBlock方法源码如下:

 1 /**
 2  * This implementation stores the block into the block manager as well as a write ahead log.
 3  * It does this in parallel, using Scala Futures, and returns only after the block has
 4  * been stored in both places.
 5  */
 6 // 并行地将block 存入 blockmanager 和 write ahead log,使用scala 的Future 机制实现的,当两个都写完毕之后,返回。
 7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
 8 
 9   var numRecords = None: Option[Long]
10   // Serialize the block so that it can be inserted into both
11 // 1. 将ReceivedBlock序列化(未使用压缩机制)成字节数组
12   val serializedBlock = block match { // serializedBlock 就是序列化后的结果
13     case ArrayBufferBlock(arrayBuffer) => // go this branch
14       numRecords = Some(arrayBuffer.size.toLong)
15       blockManager.dataSerialize(blockId, arrayBuffer.iterator)
16     case IteratorBlock(iterator) =>
17       val countIterator = new CountingIterator(iterator)
18       val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
19       numRecords = countIterator.count
20       serializedBlock
21     case ByteBufferBlock(byteBuffer) =>
22       byteBuffer
23     case _ =>
24       throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
25   }
26 
27   // 2. Store the block in block manager
28   val storeInBlockManagerFuture = Future {
29     val putResult =
30       blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
31     if (!putResult.map { _._1 }.contains(blockId)) {
32       throw new SparkException(
33         s"Could not store $blockId to block manager with storage level $storageLevel")
34     }
35   }
36 
37   // 3. Store the block in write ahead log
38   val storeInWriteAheadLogFuture = Future {
39     writeAheadLog.write(serializedBlock, clock.getTimeMillis())
40   }
41 
42   // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle
43   val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
44 // 等待future任务结果返回。默认时间是 30s, 使用spark.streaming.receiver.blockStoreTimeout 参数来变更默认值
45   val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
46   // 返回cache之后的block 相关信息
47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
48 }

将WAL的block信息发送给driver

注意WriteAheadLogBasedStoreResult 这个 WriteAheadLogBasedStoreResult 实例,后面 RDD 在处理的时候会使用到。
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源码如下:

1 // 4. 通知trackerEndpoint已经添加block,执行更新driver 的WAL操作
2   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
3   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
4   logDebug(s"Reported block $blockId")

Driver将WAL block数据写入到 driver 的WAL中

跳过中间的RPC操作,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:

 1 case AddBlock(receivedBlockInfo) =>
 2   if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
 3     walBatchingThreadPool.execute(new Runnable {
 4       override def run(): Unit = Utils.tryLogNonFatalError {
 5         if (active) {
 6           context.reply(addBlock(receivedBlockInfo))
 7         } else {
 8           throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
 9         }
10       }
11     })
12   } else {
13     context.reply(addBlock(receivedBlockInfo))
14   }

其中 addBlock方法源码如下:

1 /** Add new blocks for the given stream */
2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
3   receivedBlockTracker.addBlock(receivedBlockInfo)
4 }

其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源码如下:

 1 /** Add received block. This event will get written to the write ahead log (if enabled). */
 2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 3   try {
 4     val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
 5     if (writeResult) {
 6       synchronized {
 7         getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 8       }
 9       logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
10         s"block ${receivedBlockInfo.blockStoreResult.blockId}")
11     } else {
12       logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
13         s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
14     }
15     writeResult
16   } catch {
17     case NonFatal(e) =>
18       logError(s"Error adding block $receivedBlockInfo", e)
19       false
20   }
21 }
22 /** Write an update to the tracker to the write ahead log */
23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
24   if (isWriteAheadLogEnabled) {
25     logTrace(s"Writing record: $record")
26     try {
27       writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
28         clock.getTimeMillis())
29       true
30     } catch {
31       case NonFatal(e) =>
32         logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
33         false
34     }
35   } else {
36     true
37   }
38 }
39 /** Get the queue of received blocks belonging to a particular stream */
40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
41   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
42 }

上述代码,主要是将BlockAdditionEvent写WAL和更新队列(其实就是mutable.HashMap[Int, ReceivedBlockQueue]),这个队列中存放的是streamId ->UnallocatedBlock 的映射关系

从WAL RDD 中读取数据

createStream 源码如下:

 1 /**
 2  * Create an input stream that pulls messages from Kafka Brokers.
 3  * @param ssc         StreamingContext object
 4  * @param kafkaParams Map of kafka configuration parameters,
 5  *                    see http://kafka.apache.org/08/configuration.html
 6  * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
 7  *                    in its own thread.
 8  * @param storageLevel Storage level to use for storing the received objects
 9  * @tparam K type of Kafka message key
10  * @tparam V type of Kafka message value
11  * @tparam U type of Kafka message key decoder
12  * @tparam T type of Kafka message value decoder
13  * @return DStream of (Kafka message key, Kafka message value)
14  */
15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
16     ssc: StreamingContext,
17     kafkaParams: Map[String, String],
18     topics: Map[String, Int],
19     storageLevel: StorageLevel
20   ): ReceiverInputDStream[(K, V)] = {
21 // 可以通过设置spark.streaming.receiver.writeAheadLog.enable参数为 true来开启WAL
22   val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
23   new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
24 }

创建的是KafkaInputDStream对象:

 1 /**
 2  * Input stream that pulls messages from a Kafka Broker.
 3  *
 4  * @param kafkaParams Map of kafka configuration parameters.
 5  *                    See: http://kafka.apache.org/configuration.html
 6  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
 7  * in its own thread.
 8  * @param storageLevel RDD storage level.
 9  */
10 private[streaming]
11 class KafkaInputDStream[
12   K: ClassTag,
13   V: ClassTag,
14   U <: Decoder[_]: ClassTag,
15   T <: Decoder[_]: ClassTag](
16     ssc_ : StreamingContext,
17     kafkaParams: Map[String, String],
18     topics: Map[String, Int],
19     useReliableReceiver: Boolean,
20     storageLevel: StorageLevel
21   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
22 
23   def getReceiver(): Receiver[(K, V)] = {
24     if (!useReliableReceiver) { // 未启用 WAL,会使用 KafkaReceiver 对象
25       new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
26     } else { // 如果启用了WAL, 使用ReliableKafkaReceiver
27       new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
28     }
29   }
30 }

org.apache.spark.streaming.kafka.KafkaInputDStream 继承父类的 compute方法:

 1 /**
 2  * Generates RDDs with blocks received by the receiver of this stream. */
 3 override def compute(validTime: Time): Option[RDD[T]] = {
 4   val blockRDD = {
 5 
 6     if (validTime < graph.startTime) {
 7       // If this is called for any time before the start time of the context,
 8       // then this returns an empty RDD. This may happen when recovering from a
 9       // driver failure without any write ahead log to recover pre-failure data.
10       new BlockRDD[T](ssc.sc, Array.empty)
11     } else {
12       // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
13       // for this batch
14       val receiverTracker = ssc.scheduler.receiverTracker
15       val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
16 
17       // Register the input blocks information into InputInfoTracker
18       val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
19       ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
20 
21       // Create the BlockRDD
22       createBlockRDD(validTime, blockInfos)
23     }
24   }
25   Some(blockRDD)
26 }

getBlocksOfBatch 如下:

1 /** Get the blocks for the given batch and all input streams. */
2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
3   receivedBlockTracker.getBlocksOfBatch(batchTime)
4 }
5 调用:
6 /** Get the blocks allocated to the given batch. */
7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
8   timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
9 }

JobGenerator将WAL block 分配给一个batch,并生成job

取出WAL block 信息

在 org.apache.spark.streaming.scheduler.JobGenerator 中声明了一个定时器:

1 // timer 会按照批次间隔 生成 GenerateJobs 任务,并放入eventLoop 堵塞队列中
2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
3   longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

EventLoop 实例化代码如下:

1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
2   override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
3 
4   override protected def onError(e: Throwable): Unit = {
5     jobScheduler.reportError("Error in job generator", e)
6   }
7 }
8 eventLoop.start()

EventLoop里定义了一个LinkedBlockingDeque双端堵塞队列和一个执行daemon线程,daemon线程会不停从 双端堵塞队列中堵塞式取数据,一旦取到数据,会调 onReceive 方法,即 processEvent 方法:

 1 /** Processes all events */
 2 private def processEvent(event: JobGeneratorEvent) {
 3   logDebug("Got event " + event)
 4   event match {
 5     case GenerateJobs(time) => generateJobs(time)
 6     case ClearMetadata(time) => clearMetadata(time)
 7     case DoCheckpoint(time, clearCheckpointDataLater) =>
 8       doCheckpoint(time, clearCheckpointDataLater)
 9     case ClearCheckpointData(time) => clearCheckpointData(time)
10   }
11 }

由于是GenerateJobs 事件, 会继续调用generateJobs 方法:

 1 /** Generate jobs and perform checkpoint for the given `time`.  */
 2 private def generateJobs(time: Time) {
 3   // Set the SparkEnv in this thread, so that job generation code can access the environment
 4   // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
 5   // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
 6   SparkEnv.set(ssc.env)
 7   Try {
 8 // 1. 将 WAL block 信息 分配给batch(这些数据块信息是worker 节点cache 到WAL 之后发送给driver 端的)
 9     jobScheduler.receiverTracker.allocateBlocksToBatch(time)
10 // 2. 使用分配的block数据块来生成任务
11     graph.generateJobs(time) // generate jobs using allocated block
12   } match {
13     case Success(jobs) =>
14       val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
15       jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
16     case Failure(e) =>
17       jobScheduler.reportError("Error generating jobs for time " + time, e)
18   }
19 // 发布DoCheckpoint 事件,保存checkpoint操作,主要是将新的checkpoint 数据写入到 hdfs, 删除旧的 checkpoint 数据
20   eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
21 }

第一步中调用的
org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法如下:

1 /** Allocate all unallocated blocks to the given batch. */
2 def allocateBlocksToBatch(batchTime: Time): Unit = {
3   if (receiverInputStreams.nonEmpty) {
4     receivedBlockTracker.allocateBlocksToBatch(batchTime)
5   }
6 }

其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法如下:

 1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
 2   if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
 3 // 遍历输入流,根据流的 streamId 获取未被分配的block队列,并返回[streamId, seq[receivedBlockInfo]],由此可知,到此为止,数据其实已经从receiver中读出来了。
 4    // 获取 streamid和 WAL的blocks 的映射关系
 5 val streamIdToBlocks = streamIds.map { streamId =>
 6         (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
 7     }.toMap
 8     val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
 9     if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
10       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
11       lastAllocatedBatchTime = batchTime
12     } else {
13       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
14     }
15   } else {
16     // This situation occurs when:
17     // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
18     // possibly processed batch job or half-processed batch job need to be processed again,
19     // so the batchTime will be equal to lastAllocatedBatchTime.
20     // 2. Slow checkpointing makes recovered batch time older than WAL recovered
21     // lastAllocatedBatchTime.
22     // This situation will only occurs in recovery time.
23     logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
24   }
25 }

其中,getReceivedBlockQueue的源码如下:

1 /** Get the queue of received blocks belonging to a particular stream */
2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
3   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
4 }

可以看到,worker node 发送过来的block 数据被取出来了。

根据WAL block创建 RDD

org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源码如下:

 1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
 2 
 3   if (blockInfos.nonEmpty) {
 4     val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
 5    // 所有的block已经有了WriteAheadLogRecordHandle, 创建一个WALBackedBlockRDD即可, 否则创建BlockRDD。
 6 // 其中,WriteAheadLogRecordHandle 是一个跟WAL 相关联的EntryInfo,实现类FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正需要数据时,根据这些handle信息从 WAL 中读取数据。
 7     // Are WAL record handles present with all the blocks
 8     val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
 9 
10     if (areWALRecordHandlesPresent) {
11       // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
12       val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
13       val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
14       new WriteAheadLogBackedBlockRDD[T](
15         ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
16     } else {
17       // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
18       // others then that is unexpected and log a warning accordingly.
19       if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
20         if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
21           logError("Some blocks do not have Write Ahead Log information; " +
22             "this is unexpected and data may not be recoverable after driver failures")
23         } else {
24           logWarning("Some blocks have Write Ahead Log information; this is unexpected")
25         }
26       }
27       val validBlockIds = blockIds.filter { id =>
28         ssc.sparkContext.env.blockManager.master.contains(id)
29       }
30       if (validBlockIds.size != blockIds.size) {
31         logWarning("Some blocks could not be recovered as they were not found in memory. " +
32           "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
33           "for more details.")
34       }
35       new BlockRDD[T](ssc.sc, validBlockIds)
36     }
37   } else {
38     // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
39     // according to the configuration
40     if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
41       new WriteAheadLogBackedBlockRDD[T](
42         ssc.sparkContext, Array.empty, Array.empty, Array.empty)
43     } else {
44       new BlockRDD[T](ssc.sc, Array.empty)
45     }
46   }
47 }

org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源码如下:

 1 /**
 2  * Gets the partition data by getting the corresponding block from the block manager.
 3  * If the block does not exist, then the data is read from the corresponding record
 4  * in write ahead log files.
 5  */
 6 override def compute(split: Partition, context: TaskContext): Iterator[T] = {
 7   assertValid()
 8   val hadoopConf = broadcastedHadoopConf.value
 9   val blockManager = SparkEnv.get.blockManager
10   val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
11   val blockId = partition.blockId
12 
13   def getBlockFromBlockManager(): Option[Iterator[T]] = {
14     blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
15   }
16 
17   def getBlockFromWriteAheadLog(): Iterator[T] = {
18     var dataRead: ByteBuffer = null
19     var writeAheadLog: WriteAheadLog = null
20     try {
21       // The WriteAheadLogUtils.createLog*** method needs a directory to create a
22       // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
23       // writing log data. However, the directory is not needed if data needs to be read, hence
24       // a dummy path is provided to satisfy the method parameter requirements.
25       // FileBasedWriteAheadLog will not create any file or directory at that path.
26       // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
27       // this dummy directory should not already exist otherwise the WAL will try to recover
28       // past events from the directory and throw errors.
29       val nonExistentDirectory = new File(
30         System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
31       writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
32         SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
33       dataRead = writeAheadLog.read(partition.walRecordHandle)
34     } catch {
35       case NonFatal(e) =>
36         throw new SparkException(
37           s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
38     } finally {
39       if (writeAheadLog != null) {
40         writeAheadLog.close()
41         writeAheadLog = null
42       }
43     }
44     if (dataRead == null) {
45       throw new SparkException(
46         s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
47           s"read returned null")
48     }
49     logInfo(s"Read partition data of $this from write ahead log, record handle " +
50       partition.walRecordHandle)
51     if (storeInBlockManager) {
52       blockManager.putBytes(blockId, dataRead, storageLevel)
53       logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
54       dataRead.rewind()
55     }
56     blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
57   }
58  // 如果partition.isBlockIdValid 为true,则说明该 block 数据存在executors 中
59   if (partition.isBlockIdValid) {
60 // 先根据 BlockManager从 executor中读取数据, 如果没有,再从WAL 中读取数据
61 // BlockManager 从内存还是从磁盘上获取的数据 ?
62 blockManager 从 local 或 remote 获取 block,其中 local既可以从 memory 中获取也可以从 磁盘中读取, 其中remote获取数据是同步的,即在fetch block 过程中会一直blocking。
63     getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
64   } else {
65     getBlockFromWriteAheadLog()
66   }
67 }

至此,从启动 receiver,到receiver 接收数据并保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 通过WAL RDD 来获取数据等等都一一做了说明。

原文地址:https://www.cnblogs.com/johnny666888/p/11100334.html