Spark作业执行原理(三)——提交调度阶段

        在上一篇划分调度阶段中的handleJobSubmitted方法中,提到finalStage的生成,在生成finalStage的同时,建立起所有Stage的依赖关系,然后通过finalStage生成一个作业实例,在该作业实例中按照顺序提交调度阶段进行执行,在执行过程中监听总线获取作业、阶段执行的情况。

回顾handleJobSubmitted方法中部分源码:

//根据最后一个阶段生成作业
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    
    ...
 
    //提交作业
    submitStage(finalStage)
    submitWaitingStages()

        作业的提交阶段从submitStage方法开始,在submitStage方法中调用getMissingParentStages获取finalStage的父调度阶段,如果不存在父调度阶段,则使用submitMissingTasks方法提交执行;如果存在,则把父调度阶段放进waitingStages列表中,通过递归的方式调用submitStage方法。通过这样的逻辑,就可以根据stage的依赖关系,从最前面的stage开始执行作业,一直到最后一个。

submitStage方法部分源码:

private def submitStage(stage: Stage){
    val jobId = activeJobForStage(stage)
    if(jobId.isDefined){
        logDebug("submitStage("+stage+")")
        if(!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)){
            //获取父调度阶段,但并不是通过调度阶段的依赖关系,而是通过Stage的判断依据来获取父调度阶段
            val missing = getMissingParentStages(stage).sortBy(_.id)
            if(missing.isEmpty){
                //如果不存在父调度阶段,调用submitMissingTasks()提交
                logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
                submitMissingTasks(stage, jobId.get)
            }else{
                //如果存在父调度阶段,将当前阶段放进等待列表,同时递归调用submitStage方法,直至找到最前面的没有父调度阶段的Stage
                for(parent <- missing){
                    submitStage(parent)
                }
                waitingStages += stage
            }
        }
    }else{
        abortStage(stage, "No active job for stage " + stage.id, None)
    }
}

        鉴于递归的逻辑,当最开始的调度阶段完成后,相继提交后续调度阶段,但注意一个问题,调度当前阶段时,必须依赖父调度阶段的状态,显然,父调度阶段的成功与否直接影响后续阶段的调度,所以,在调度后续阶段前,先判断当前调度阶段所依赖的父调度阶段的结果是否可用(即运行是否成功):如果可用,则提交当前调度阶段;如果不可用,则尝试提交结果不可用的父调度阶段。至于什么时候进行是否可用判断呢?这个工作交给是在ShuffleMapTask完成时(即已经交给executor执行了)进行,DAGScheduler会检查调度阶段的所有任务是否都完成:如果执行失败,则重新提交该阶段;如果所有任务成功,则扫描等待调度阶段列表,检查列表中的阶段的父调度阶段是否存在未完成,如果不存在,则表明该调度阶段准备就绪,生成实例并提交运行。

提交调度阶段流运行顺序:

为了方便理解,对上一篇最后的图添加了一个stage。

  1. 在submitStage方法中,先创建作业实例,然后判断该调度阶段是否存在父调度阶段,由于ResultStage3有两个父调度阶段ShuffleMapStage0和ShuffleMapStage2,所以ResultStage3会先放进waitingStages中;
  2. 然后递归调用submitStage,发现ShuffleMapStage0没有父调度阶段,而ShuffleMapStage2有一个父调度阶段ShuffleMapStage1,所以ShuffleMapStage2会被放进waitingStages中,再之,ShuffleMapStage1也没有父调度阶段,则ShuffleMapStage0和ShuffleMapStage1会被放到执行列表中,作为第一次调度使用submitMissingTasks方法,提交运行;
  3. Executor执行完成时会发送消息,通知DAGScheduler更新状态并检查运行情况,如果发现有任务执行失败,则重新提交调度阶段;如果所有任务执行成功,则继续提交下一次调度阶段。这里进入第二次调度阶段,首先扫描等待队列的stage是否有父调度阶段没有完成,显然ResultStage3还有ShuffleMapStage2没有完成,所以ResultStage3继续放在等待队列,ShuffleMapStage2则没有父调度阶段,可以放在运行队列中,作为第二次调度提交;
  4. 此时,ShuffleMapStage2执行完毕,ResultStage3已经没有父调度阶段,可以作为第三次调度提交。
原文地址:https://www.cnblogs.com/SysoCjs/p/11355900.html