Spark作业执行原理(二)——划分调度阶段

        Spark调度阶段的划分是由DAGScheduler实现,DAGScheduler会从最后一个RDD出发,根据RDD的lineage使用广度优先算法遍历整个依赖树(总共使用了两次,一次是遍历区分ResultStage范围;另一次则是遍历获取ShuffleMapStage划分依据,用来划分每个ShuffleMapStage范围),从而划分调度阶段,调度阶段的划分依据是以是否进行shuffle操作进行的。

        真正的stage划分代码,是从handleJobSubmitted方法中根据最后一个RDD实例化ResultStage对象开始,实例化过程中,finalRDD使用getParentStages找出其依赖的祖先RDD是否存在Shuffle操作,如果没有存在Shuffle操作,则本次作业只有一个ResultStage;如果存在Shuffle操作,则本次作业除了一个ResultStage之外,还至少一个ShuffleMapStage。

handleJobSubmitted部分源码:

private[scheduler] def handleJobSubmitted(jobId:Int, finalRDD:RDD[_], func:(TaskContext, Iterator[_]) => _, partitions:Array[Int], callSite:CallSite, listener:JobListener, properties:Properties){
    //定义一个ResultStage类型对象,用于存储DAG划分出来的最后一个Stage
    val finalStage:ResultStage = null
    try{
        finalStage = new ResultStage(finalRDD, func, partitions, jobId, callSite)
    }catch{...}
 
    //根据最后一个阶段生成作业
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    
    ...
 
    //提交作业
    submitStage(finalStage)
    submitWaitingStages()
}

        上面代码在实例化ResultStage时,传入了一个finalRDD,其实这个finalRDD会被传到getParentStagesAndId的方法中,在该方法中调用getParentStages,生成最后一个调度阶段finalStage(这里第一次使用广度优先算法)。

private def getParentStages(rdd:RDD[_], firstJobId:Int):List[Stage] = {
    val parents = new HashSet[Stage]     //   parents是一个元素类型为Stage的HashSet集合
    val visited = new HashSet[RDD[_]]    //用于存放已经访问过的RDD
 
    //存放非ShuffleDependency的RDD
    val waitingForVisit = new Stack[RDD[_]]
 
    //广度优先遍历,根据当前所依赖的RDD类型,进行不同的操作
    def visit(r:RDD[_]){
        if(!visited(r)){
            visited += r    //将当前RDD标记为已访问,即存放到visited的HashSet集合里面
            for (dep <- r.dependencies){
                //当前RDD所依赖的父RDD类型为ShuffleDepedency时,需要向前遍历,获取ShuffleMapStage
                case shufDep:ShuffleDependency[_, _, _] =>
                    parents += getShuffleMapStage(shufDep, firstJobId)
                case _ =>
                    waitingForVisit.push(dep.rdd)
            }
        }
    }
 
    waitingFoVisit.push(rdd)    
    //开始遍历Stack中的rdd
    while(waitingForVisit.nonEmpty){
        visit(waitingForVisit.pop())
    }
    parents.toList    //返回parents
    
}

        上面代码显示,如果当前遍历的RDD,其所依赖的父RDD的类型是ShuffleDependency类型时,需要往前遍历,找出所有ShuffleMapStage(或者说找出所有划分ShuffleMapStage的依据——RDD),该算法也是用到了广度优先遍历算法,跟getParentStage类似,具体由getAncestorShuffleDependencies方法实现。

getAncestorShuffleDependencies方法部分源码:

private def getAncestorShuffleDependencies(rdd:RDD[_]):Stack[ShuffleDependency[_, _, _]] = {
    val parents = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
 
    //用于存放非ShuffleDependency类型的RDD
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r:RDD[_]){
        if(!visited(r)){
            visited += r    //标记当前rdd已经被访问过,即加入visited 中
            for(dep <- r.dependencies){
                case shufDep:ShuffleDependency[_, _, _] =>
                    if(!shuffleToMapStage.contains(shufDep.shuffleId)){
                        parents.push(shufDep)    //shuffle依据放进Stack中
                    }
                case _ =>    //不操作
            }
        }
    }
 
    //向前遍历依赖树,获取所有的类型为ShuffleDependency的RDD,作为划分阶段的依据
    waitingForVisit.push(rdd)
    while(waitingForVisit.nonEmpty){
        visit(waitingForVisit.pop())
    }
    parents    //返回parents
}

        getAncestorShuffleDependencies方法其实只是找出了ShuffleDependency类型的RDD,而这些RDD就是划分各个ShuffleMapStage的依据。

        当所有阶段的划分操作完成后,这些阶段就会建立起依赖关系。该依赖关系是通过调度阶段属性parents:List[Stage]来定义,通过该属性可以获取当前阶段所有祖先阶段,可以根据这些信息按顺序提交调度阶段进行运行。

下面是一张Spark调度阶段的Stage划分图:

Spark调度阶段Stage划分流程:

  1. 在SparkContext中触发提交作业时,会调用DAGScheduler的handleJobSubmitted方法,在该方法中会先找到最后一个RDD(即RDD7),并调用getParentStages方法;
  2. 在getParentStages方法中判断RDD7所依赖的父RDD是否存在Shuffle操作,上图RDD6属于ShuffleDepedency类型,则对RDD6进行下一步操作;
  3. 通过getAncestorShuffleDependencies方法,对RDD6进行向前遍历,寻找所有的划分依据,向前遍历,发现只有RDD4,所以RDD3->RDD4被划分成一个ShuffleMapStage0,RDD5->RDD6被划分成ShuffleMapStage1;
  4. 最后,剩下的生成ResultStage2,一共3个阶段,在提交阶段按顺序运行。
原文地址:https://www.cnblogs.com/SysoCjs/p/11355800.html