Spark Executor Task 的执行和数量

基本原理 (YARN 模式)

每个 stage 会有多个 partition,每个 partition 由 Executor 的一个 Task 执行

stage 的默认 partition 数量由 spark.default.parallelism 参数决定,默认由 parent stage 决定

最大可以同时执行多少 Task,由三个参数决定

  • Executor 的数量,由 spark.executor.instances 或 --num-executors 指定,默认是 1
  • Executor 的核数,由 spark.executor.cores 或 --executor-cores 指定,默认是 1
  • 每个 Task 需要的核数,由 spark.task.cpus 指定,默认是 1

Executor 的数量,还有 Executor 的核数,参考实际的机器数量和 CPU 数量,但可以配的比机器数和 CPU 数大

具体流程 (YARN 模式)

在 DAGScheduler 中

  • 提交 Job 时执行 handleJobSubmitted 函数,handleJobSubmitted 函数调用 submitStage 函数
  • submitStage 函数是一个递归函数,从最后一个 action stage 开始,不断往前寻找 parent stage,将有 parent stage 的 stage 添加到 waitingStages,直到找到第一个 stage,对其执行 submitMissingTasks 函数
  • submitMissingTasks 获取 stage 的所有 partitions,为每个 partitions 创建 task,调用 taskScheduler.submitTasks 提交 stage 的所有 task
  • 完成后再对相应的 waitingStages 再调用 submitStage

在 TaskSchedulerImpl 中

  • submitTasks 函数添加所有 task 然后调用 backend.reviveOffers() 函数

在 CoarseGrainedSchedulerBackend 中

  • reviveOffers 调用 driverEndpoint.send(ReviveOffers)
  • Backend 的内部类 DriverEndpoint 收到后调用 makeOffers()
  • makeOffers 函数 scheduler.resourceOffers(workOffers) 分配资源
  • 对能分配到资源的 task 调用 launchTasks(taskDescs)
  • launchTasks 发送 LaunchTask 消息给相应的 CoarseGrainedExecutorBackend
  • CoarseGrainedExecutorBackend 就是真正的 Executor JVM 程序

在 TaskSchedulerImpl 中

  • resourceOffers 调用 resourceOfferSingleTaskSet
  • resourceOfferSingleTaskSet 判断是否有足够的 CPU 资源

在 CoarseGrainedExecutorBackend 中

  • 收到 LaunchTask 后,调用 executor.launchTask 启动任务
  • Executor.launchTask,调用线程池执行任务

每个 Executor 的 Task 最大并发数量,由 Executor 定义的 CPU (默认是 1) 和 Task 定义的 CPU (默认是 1) 决定

代码

// DAGScheduler.scala

  private[scheduler] def handleJobSubmitted(......) {
    ......
    submitStage(finalStage)
    ......
  }

  private def submitStage(stage: Stage) {
    ......
        val missing = getMissingParentStages(stage).sortBy(_.id)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
    ......
  }

  private def submitMissingTasks(stage: Stage, jobId: Int) {
    ......
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    val tasks: Seq[Task[_]] = try {
        ......
    }

    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      ......
      submitWaitingChildStages(stage)
    }
  }

// TaskSchedulerImpl.scala

  override def submitTasks(taskSet: TaskSet) {
    ......
    backend.reviveOffers()
  }
// CoarseGrainedSchedulerBackend.scala

  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) {
    private def makeOffers(executorId: String) {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = withLock {
        // Filter out executors under killing
        if (executorIsAlive(executorId)) {
          val executorData = executorDataMap(executorId)
          val workOffers = IndexedSeq(
            new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort)))
          scheduler.resourceOffers(workOffers)
        } else {
          Seq.empty
        }
      }
      if (!taskDescs.isEmpty) {
        launchTasks(taskDescs)
      }
    }
  }
// TaskSchedulerImpl.scala

  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    ......
            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
              currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
    ......
  }

  private def resourceOfferSingleTaskSet(
    ......
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager.put(tid, taskSet)
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            // Only update hosts for a barrier task.
            if (taskSet.isBarrier) {
              // The executor address is expected to be non empty.
              addressesWithDescs += (shuffledOffers(i).address.get -> task)
            }
            launchedTask = true
          }
        }
      }
    ......
  }

// CoarseGrainedSchedulerBackend.scala

    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      ......
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
      ......
    }
// CoarseGrainedExecutorBackend.scala

  override def receive: PartialFunction[Any, Unit] = {
    ......
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskDesc)
      }
    ......

// Executor.scala

  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
  }



原文地址:https://www.cnblogs.com/moonlight-lin/p/13941440.html