spark streaming 监听器执行顺序

trait StreamingListener {

  /** Called when the streaming has been started */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }

  /** Called when a receiver has been started */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

顺序如下

  • 1.onStreamingStarted
  • 2.onBatchSubmitted
  • 3.onBatchStarted
  • 4.onOutputOperationStarted
  • your action operator
  • 5.onOutputOperationCompleted
  • 6.onBatchCompleted

此处少了 onReceiverStarted() onReceiverError() onReceiverStopped() 是因为测试使用的kafka版本为0.10.x,没有receiver的连接方式

原文地址:https://www.cnblogs.com/deemoo/p/15771710.html