spark作业运行过程之--DAGScheduler

DAGScheduler--stage划分和创建以及stage的提交

本篇,我会从一次spark作业的运行为切入点,将spark运行过程中涉及到的各个步骤,包括DAG图的划分,任务集的创建,资源分配,任务序列化,任务分发到各个executor,任务执行,任务结果回传driver等等各个环节串联起来,以整个任务运行的调用链为线索,将spark-core中的各个基础设施联系起来,这样我们就能对spark的各个基础设施模块的作用有一个整体的认识,然后有了对spark整体框架的印象,再对其中的各个模块各个击破,分别深入研究,通过这种循序渐进的方式,最后才能对spark-core有一个比较深入而全面的掌握。当然,这篇文章的主要目的是理清spark作业的整个运行流程。

入口:SparkContext.runJob

我们知道spark中的作业执行时懒执行的,懒执行最大的好处是可以把一些算子向流水线一样chain在一起,从而形成流式的计算模式,个人认为这个特点也是spark比mapreduce性能高的一种重要原因,至于后来的一些基于mapreduce优化的框架如tez, mahout等实际上一个重要的优化手段也是把一些能够流水线式执行的算子chain在一起,避免中间多次落盘。扯远了,我们回到这个方法,通过方法注释可以看出来,这个方法是spark中所有行动算子的入口。

/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd           target RDD to run tasks on
* @param func          a function to run on each partition of the RDD
* @param partitions    set of partitions to run on; some jobs may not want to compute on all
*                      partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
// 调用DAGScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
// 更新控制台打印的进度条信息
progressBar.foreach(_.finishAll())
// 处理RDD的checkpoint
rdd.doCheckpoint()
}

  • 首先清除闭包的一些不必要的引用,这一步主要是为了方便序列化,因为一些不必要的引用可能引用了不可序列化的对象,这会导致函数不可序列化。很多时候,用户写的代码并不是很靠谱,spark考虑到这一点,所以这也是为了尽量减少用户的开发难度。
  • 调用DAGScheduler执行提交任务的逻辑

这个方法很简单,不必赘述。

DAGScheduler.submitJob

经过一些调用,最终会调用到这个方法。

def submitJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
// 检查是否有非法的partition
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  throw new IllegalArgumentException(
    "Attempting to access a non-existent partition: " + p + ". " +
      "Total number of partitions: " + maxPartitions)
}

// nextJobId每次自增1
val jobId = nextJobId.getAndIncrement()
// 如果要运行的分区数为0,那么就没必要运行,直接返回成功就行了
if (partitions.size == 0) {
  // Return immediately if the job is running 0 tasks
  return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// 向DAG的事件处理器投递一个任务提交的事件
eventProcessLoop.post(JobSubmitted(
  jobId, rdd, func2, partitions.toArray, callSite, waiter,
  SerializationUtils.clone(properties)))
waiter

}

这个方法的逻辑也很简单。首先做一些检查,然后向DAG调度器内部的一个事件处理器投递一个作业提交的事件。DAGScheduler自己有一个事件处理器,是很常规的事件循环处理,使用单线程的方法循环处理事件队列中的事件,逻辑很简单,所以这里不再展开。投递任务提交任务后,最终会调用DAGScheduler的handleJobSubmitted方法。我们可以看到,DAGScheduler中还有很多其他类似的处理方法,对应了不同的事件类型,事件分发逻辑在DAGSchedulerEventProcessLoop.doOnReceive方法中,不再展开。我们仍然回到作业运行这条主线上来,继续看handleJobSubmitted。

handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties) {
var finalStage: ResultStage = null
try {
  // New stage creation may throw an exception if, for example, jobs are run on a
  // HadoopRDD whose underlying HDFS files have been deleted.
  // 创建最后一个stage
  finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
  case e: Exception =>
    logWarning("Creating new stage failed due to exception - job: " + jobId, e)
    listener.jobFailed(e)
    return
}

// 设置活跃的任务
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
  job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

// 更新一些簿记量
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
// 向事件总线中投递一个事件
listenerBus.post(
  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交最后一个stage
submitStage(finalStage)

}

  • 涉及到的一些簿记量的更新就不再展开了。

  • 创建最后一个stage,这一步其实会根据shuffle依赖关系对整个RDD的计算关系图(DAG)进行划分,形成不同的stage, 最后一步行动算子会创建ResultStage, 然后提交最后一个stage。

接下来的小结我们重点分析一下DAG图的划分以及stage的创建,这也是DAGScheduler的主要功能。

stage的划分和创建

DAGScheduler.createResultStage

private def createResultStage(
  rdd: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  jobId: Int,
  callSite: CallSite): ResultStage = {
// 首先创建依赖的父stage
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
// 有了父stage,就可以创建最后一个stage了
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

重点在于创建父stage。

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
  getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}

getShuffleDependencies

这个方法用一个栈实现对rdd的深度优先遍历,可以看到在找到shuffle依赖时就记录下来,并且不再继续寻找shuffle依赖前面的依赖。
所以这个方法只会在整个DAG图上找到这个rdd的上一级所有的shuffle依赖,而不会跨越多级shuffle依赖。
private[scheduler] def getShuffleDependencies(
rdd: RDD[]): HashSet[ShuffleDependency[, , ]] = {
val parents = new HashSet[ShuffleDependency[
, , ]]
val visited = new HashSet[RDD[
]]
// 用栈实现深度优先遍历
val waitingForVisit = new ArrayStack[RDD[
]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
// 如果是shuffle依赖,记录下来,并没有继续向上寻找shuffle依赖的依赖
case shuffleDep: ShuffleDependency[
, _, _] =>
parents += shuffleDep
case dependency =>
// 对于窄依赖,
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}

getOrCreateShuffleMapStage

我们继续看另一个重要的方法,创建shuffle的stage。

private def getOrCreateShuffleMapStage(
  shuffleDep: ShuffleDependency[_, _, _],
  firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
  case Some(stage) =>
    stage

  case None =>
    // Create stages for all missing ancestor shuffle dependencies.
    // 获取所有还没有创建stage的祖先shuffle依赖
    getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
      // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
      // that were not already in shuffleIdToMapStage, it's possible that by the time we
      // get to a particular dependency in the foreach loop, it's been added to
      // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
      // SPARK-13902 for more information.
      if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
        createShuffleMapStage(dep, firstJobId)
      }
    }
    // Finally, create a stage for the given shuffle dependency.
    createShuffleMapStage(shuffleDep, firstJobId)
}
}

可以看到,这个方法会将所有还没创建stage的祖先shuffle依赖全部创建出来。
我们看一下,创建ShuffleMapStage的具体过程:

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
// 这里可以看出,一个ShuffleStage的rdd是shuffle输入侧的rdd
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
// 这里调用了获取父Stage的方法,实际上这几个方法会形成递归调用
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
// 一个Stage就是对一些引用的封装,其中比较重要的是mapOutputTracker
val stage = new ShuffleMapStage(
  id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

// 更新一些簿记量
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

// 在map输出追踪器中注册这个shuffle
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
  // Kind of ugly: need to register RDDs with the cache and map output tracker here
  // since we can't do it in the RDD constructor because # of partitions is unknown
  logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
  mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}

其中比较关键的步骤有:

  • 创建所有的父stage
  • 封装一个ShuffleMapStage对象,比较重要的是mapOutputTracker对象的引用。这个对象主要作用是追踪shuffle过程中map阶段的输出的位置信息,后面我们会讲到map输出是通过shuffleManager对map输出数据进行分区和排序处理并序列化,然后blockManager进行存储,而map输出的位置信息是通过blockId标识,并且都会传回driver,在driver中有一个MapOutputTrackerMaster组件专门负责维护所有stage的所有map任务的输出的位置信息。
  • 在mpOutputTrackerMaster注册新创建的stage,其实就是在映射结构里加一条数据

小结

对于stage的创建过程做一个小结:这里涉及到几个方法形成的递归调用;在遍历rdd依赖的过程中按深度优先遍历,每遇到一个shuffle依赖就创建一个stage,所有上游的stage创建完成后,最后再创建一个ResultStage。

stage提交

接下来,我们看一下在作业运行的过程中DAGScheduler负责的最后一步:stage提交

submitStage

首先是submitStage方法。

private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
  logDebug("submitStage(" + stage + ")")
  if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    val missing = getMissingParentStages(stage).sortBy(_.id)
    logDebug("missing: " + missing)
    if (missing.isEmpty) {
      logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
      submitMissingTasks(stage, jobId.get)
    } else {
      for (parent <- missing) {
        submitStage(parent)
      }
      waitingStages += stage
    }
  }
} else {
  abortStage(stage, "No active job for stage " + stage.id, None)
}
}

这个方法比较简单:

  • 首先是提交还没有运行过多父stage,把自身放到等待队列中

  • 如果父stage都已经运行完成了,或者不存在父stage,那么提交当前stage,即调用submitMissingTasks

submitMissingTasks

private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")

// First figure out the indexes of partition ids to compute.
// 首先是找出还没有计算的partition有哪些
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

// 更新簿记量
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
// outputCommitCoordinator内部簿记量的更新
stage match {
  case s: ShuffleMapStage =>
    outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
  case s: ResultStage =>
    outputCommitCoordinator.stageStart(
      stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}

// 找出每个Task的偏向位置,对于一般的shuffle stage,通过mapOutputTracker来计算Task的偏向位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      partitionsToCompute.map { id =>
        val p = s.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
  }
} catch {
  case NonFatal(e) =>
    stage.makeNewStageAttempt(partitionsToCompute.size)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    abortStage(stage, s"Task creation failed: $e
${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}

// 更新stage最近一次的尝试的信息
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

// If there are tasks to execute, record the submission time of the stage. Otherwise,
// post the even without the submission time, which indicates that this stage was
// skipped.
if (partitionsToCompute.nonEmpty) {
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
// 向事件总线投递一个事件
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
// 对任务进行序列化,这里对RDD和ShuffleDependency对象进行序列化
var taskBinary: Broadcast[Array[Byte]] = null
try {
  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  // For ResultTask, serialize and broadcast (rdd, func).
  val taskBinaryBytes: Array[Byte] = stage match {
    case stage: ShuffleMapStage =>
      JavaUtils.bufferToArray(
        closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
    case stage: ResultStage =>
      JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
  }

  // RDD和ShuffleDependency的序列化数据是通过广播变量传输到executor端的
  // 广播变量实际上也是先将数据通过blockManager写入内存或磁盘,然后executor端通过rpc远程拉取数据
  taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
  // In the case of a failure during serialization, abort the stage.
  case e: NotSerializableException =>
    abortStage(stage, "Task not serializable: " + e.toString, Some(e))
    runningStages -= stage

    // Abort execution
    return
  case NonFatal(e) =>
    abortStage(stage, s"Task serialization failed: $e
${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}

val tasks: Seq[Task[_]] = try {
  // 对任务运行的统计量累加器对象的序列化
  // 累加器对象序列化有一个比较有意思的地方,在readObject方法中,可以看一下
  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  stage match {
    case stage: ShuffleMapStage =>
      stage.pendingPartitions.clear()
      // 每个分区创建一个Task
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        stage.pendingPartitions += id
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
          taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
          Option(sc.applicationId), sc.applicationAttemptId)
      }

    case stage: ResultStage =>
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptNumber,
          taskBinary, part, locs, id, properties, serializedTaskMetrics,
          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
      }
  }
} catch {
  case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $e
${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}

if (tasks.size > 0) {
  logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
    s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
  // 从这里DAGScheduler把接力棒交给了Task调度器
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
  // Because we posted SparkListenerStageSubmitted earlier, we should mark
  // the stage as completed here in case there are no tasks to run
  markStageAsFinished(stage, None)

  val debugString = stage match {
    case stage: ShuffleMapStage =>
      s"Stage ${stage} is actually done; " +
        s"(available: ${stage.isAvailable}," +
        s"available outputs: ${stage.numAvailableOutputs}," +
        s"partitions: ${stage.numPartitions})"
    case stage : ResultStage =>
      s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
  }
  logDebug(debugString)

  submitWaitingChildStages(stage)
}
}

这个方法比较长,但是应该说是在DAG调度器提交作业的过程中最重要的方法了。主要做的事情其实就是根据要提交的stage创建一个任务集,每个partition创建一个Task,所有要计算的Task形成一个任务集。

  • 更新一些簿记量
  • 找出每个Task的偏向位置,对于一般的shuffle stage,通过mapOutputTracker来计算Task的偏向位置
  • 向事件总线投递一个stage提交的事件
  • 对RDD和ShuffleDependency或者ResultStage的计算函数func进行序列化,以用于传输
  • 序列化任务运行统计量的累加器对象,加器对象序列化有一个比较有意思的地方,在readObject方法中,可以看一下
  • 对每个要计算的分区创建一个Task,根据stage类型分为ShuffleMapTask和ResultTask两种
  • 最后调用TaskScheduler的方法提交任务

至此,DAGScheduler完成了他的使命,成功将接力棒交给了TaskScheduler,接下来就是TaskScheduler的表演了。
下一篇,我们会继续分析TaskSchedulerImpl这个类对于任务提交所做的一些工作,主要是资源分配的工作,需要考虑本地性,黑名单,均衡性等问题。

遗留的问题

  • 如何计算任务的偏向位置?
  • outputCommitCoordinator的作用?
  • 广播变量的底层机制是什么?这个后面会专门分析广播变量,其实就是利用块管理器blockManager(块管理器应该是最重要的基础设施了)
原文地址:https://www.cnblogs.com/zhuge134/p/10961742.html