Spark应用程序-任务的调度

任务的调度

​ 关于任务的调度逻辑依然在submitMissingTasks方法中,在任务的划分之后,会生成一个任务的任务的集合,即:

val tasks: Seq[Task[_]]

​ 该集合中包含的正是当前阶段中所有的任务。任务划分之后,程序会继续向下执行:

if (tasks.nonEmpty) {
  logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
    s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
 	……
}

​ 如果任务集合不为空,那么会通过一个任务调度器进行任务的提交,提交执行会将任务集合进行包装生成一个TaskSet。最终提交的是一个TaskSet,继续查看submitTasks源码,直接点进去是一个特质TaskScheduler,查看真正的执行源码,还是要到它的具体实现类TaskSchedulerImpl中查看:

override def submitTasks(taskSet: TaskSet): Unit = {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
      ……

​ 此处有一个createTaskSetManager方法,用于创建taskSet管理器,查看它的源码:

private[scheduler] def createTaskSetManager(
    taskSet: TaskSet,
    maxTaskFailures: Int): TaskSetManager = {
  new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
}

​ 该方法又将TaskSet进行了包装,生成了TaskSetManager对象进行返回。所以manager是一个TaskSetManager对象。

​ 继续向下查看TaskSchedulerImpl类中的submitTasks方法的源码:

stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

​ 此处有一个调度构建器,用于将manager对象添加进去。它的声明是:

private var schedulableBuilder: SchedulableBuilder = null

​ 继续查看它的赋值:

def initialize(backend: SchedulerBackend): Unit = {
  this.backend = backend
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
        s"$schedulingMode")
    }
  }
  schedulableBuilder.buildPools()
}

​ 可以看出一共有两种调度器模式,一种是先进先出FIFO,一种是公平调度器模式,会根据不同的调度器模式,创建不同的调度器对象赋值给schedulableBuilder。而在默认情况下,调度器模式是FIFO,也就是先进先出的模式。

// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =
  try {
        SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
  } catch {
    case e: java.util.NoSuchElementException =>
      throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
  }

​ 继续查看schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)的源码:

override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
  rootPool.addSchedulable(manager)
}

​ rootPool是一个任务池,该方法将manager放入了任务池中。这整个过程就是将划分出的任务进行一系列的包装,然后放入一个任务池中,那么什么时候取出来执行呢?继续向下观看submitTasks的源码,在方法内部的最后一行有一段代码:

backend.reviveOffers()

​ 进一步观察它的内部逻辑:

override def reviveOffers(): Unit = {
  driverEndpoint.send(ReviveOffers)
}

​ 进行了发送消息的操作,作用是告知消息接收方要取出任务。makeOffers就是具体的操作:

      case ReviveOffers =>
        makeOffers()
private def makeOffers(): Unit = {
  // Make sure no executor is killed while some task is launching on it
  val taskDescs = withLock {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
    val workOffers = activeExecutors.map {
      case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
          Some(executorData.executorAddress.hostPort),
          executorData.resourcesInfo.map { case (rName, rInfo) =>
            (rName, rInfo.availableAddrs.toBuffer)
          })
    }.toIndexedSeq
    scheduler.resourceOffers(workOffers)
  }
  if (taskDescs.nonEmpty) {
    launchTasks(taskDescs)
  }
}

​ 先生成对任务的描述,然后如果任务不为空,那么就启动任务。但是具体取出任务的过程是scheduler.resourceOffers(workOffers)这行代码完成的,首先查看一下该方法的注释介绍:

/**
 * Called by cluster manager to offer resources on slaves. We respond by asking our active task
 * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
 * that tasks are balanced across the cluster.
 */

​ 大致意思是,该方法是由集群管理器调用,用于给从属服务器提供资源。以轮循的方式为每个节点分发任务,以便在整个集群中平衡任务数量。但是它在具体的细节上会有优先级的差异,查看源码可以知道:

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
		……			
  val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
    ……

​ getSortedTaskSetQueue方法在底层对任务池中的manager进行了调度算法的处理:

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
  val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
  val sortedSchedulableQueue =
    schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
  for (schedulable <- sortedSchedulableQueue) {
    sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
  }
  sortedTaskSetQueue
}

​ taskSetSchedulingAlgorithm就是具体的调度算法:

private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
  schedulingMode match {
    case SchedulingMode.FAIR =>
      new FairSchedulingAlgorithm()
    case SchedulingMode.FIFO =>
      new FIFOSchedulingAlgorithm()
    case _ =>
      val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
      throw new IllegalArgumentException(msg)
  }
}

​ 不同的调度模式就使用不同的调度算法。这些算法本质就是对manager进行调度,看看哪些manager先执行,哪些后执行。

​ 经过调度算法的处理之后,返回一个TaskSetManager集合,然后会对这个集合进行轮循。在轮循的过程中,会涉及到一个重要的概念,也就是本地化级别:

for (taskSet <- sortedTaskSets) {
		……
    val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
    for (currentMaxLocality <- taskSet.myLocalityLevels) {	
		……
    }
    ……
}

​ 那么什么是本地化级别呢?通过一个案例来进行解释:

​ 假设一个应用程序,有一个Driver和两个Executor,如果Driver中有一个Task,那么这个Task到底发给哪个Executor,这就涉及到一个首选位置的概念,也就是说Task作为计算,那么首选发送的位置应该发给数据所处的Executor位置,这是比较理想的情况。但是这个Task未必会发送到数据所在的Executor节点。所以Task和Data的位置就存在不同的级别,这个级别称之为本地化级别。级别有以下几种:

​ (1)进程本地化:数据和计算在同一个进程中(最高级别,效率最高)

​ (2)节点本地化:数据和计算在同一个节点中

​ (3)机架本地化:数据和计算在同一个架构中

​ (4)任意:数据和计算的位置任意

​ 所以上面的代码会判断taskSet的级别,看看它应该发给哪个地方,然后进行关联。最终返回任务。

​ Spark中任务调度时,TaskScheduler在分发之前需要依据数据的位置来分发,最好将task分发到数据所在的节点上,如果TaskScheduler分发的task在默认3s依然无法执行的话,TaskScheduler会重新发送这个task到相同的Executor中去执行,会重试5次,如果依然无法执行,那么TaskScheduler会降低一级数据本地化的级别再次发送task。

​ 如上图中,会先尝试1,PROCESS_LOCAL数据本地化级别,如果重试5次每次等待3s,会默认这个Executor计算资源满了,那么会降低一级数据本地化级别到2,NODE_LOCAL,如果还是重试5次每次等待3s还是失败,那么还是会降低一级数据本地化级别到3,RACK_LOCAL。这样数据就会有网络传输,降低了执行效率。

​ 最后启动任务:

      if (taskDescs.nonEmpty) {
        launchTasks(taskDescs)
      }
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    //判断序列化的任务数量有没有超过限制  
    if (serializedTask.limit() >= maxRpcMessageSize) {
		……
	else{
        ……
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

​ 取出所有的任务,进行遍历,判断序列化的任务数量是否超过限制,如果没有那么就会找到对应Executor终端,给它发启动Task的消息。所以最后一行的代码的任务就是从任务池中取出任务,然后进行序列化发送给远程的某个executor执行,而这个executor可以根据上面的本地化级别选出来。

参考文献:https://www.cnblogs.com/eric666666/p/11301266.html

原文地址:https://www.cnblogs.com/yxym2016/p/14254577.html