spark的runJob函数2

上一篇我们讲到了spark的runJob方法提交job运行,runJob在提交时,需要RDD和一个函数,那么运行机制是什么呢?函数如何运行的呢?
首先job被提交后,需要切分stage,然后每个stage会划分成一组task提交executor运行。如何切分stage和task,需要另写一篇来解读。那么我们下面来分析如何运行task。
我们看下面代码

private[spark] class CoarseGrainedExecutorBackend(
    override val rpcEnv: RpcEnv,
    driverUrl: String,
    executorId: String,
    hostPort: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv)
  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

  var executor: Executor = null
  @volatile var driver: Option[RpcEndpointRef] = None


  //每个Executor都会对应一个CoarseGrainedExecutorBackend,核心代码是

  override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor(hostname) =>
      logInfo("Successfully registered with driver")
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)
      System.exit(1)

    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

    case KillTask(taskId, _, interruptThread) =>
      if (executor == null) {
        logError("Received KillTask command but executor was null")
        System.exit(1)
      } else {
        executor.killTask(taskId, interruptThread)
      }

    case StopExecutor =>
      logInfo("Driver commanded a shutdown")
      // Cannot shutdown here because an ack may need to be sent back to the caller. So send
      // a message to self to actually do the shutdown.
      self.send(Shutdown)

    case Shutdown =>
      executor.stop()
      stop()
      rpcEnv.shutdown()
  }


处理不同的事件,主要和driver进行交互。在这看到了LaunchTask函数,这里将启动运行一个task。
首先要反序列化task,得到taskdesc的描述信息,然后调用executor#launchTask 函数加载task。

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }


没有什么稀奇的,创建一个TaskRunner,是一个Runnable,放入到线程池运行。从这里我们可以看出spark和MR的最大区别,MR是一个进程运行一个Task,而Spark是一个Executor进程可以以多线程的方式执行多个Task。

继续看一下TaskRunner的run方法。

   override def run(): Unit = {

         //初始化参数
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
      val deserializeStartTime = System.currentTimeMillis()
      Thread.currentThread.setContextClassLoader(replClassLoader)
      val ser = env.closureSerializer.newInstance()
      logInfo(s"Running $taskName (TID $taskId)")
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var taskStart: Long = 0
      startGCTime = computeTotalGcTime()

      //反序列化Task
      try {

        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
        task.setTaskMemoryManager(taskMemoryManager)

        ......(忽略的代码)

        //此处是task运行的核心,调用task的run方法。并且的到返回的结果
        val (value, accumUpdates) = try {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        }

        ...忽略的代码
    }

我们继续追踪Task#run方法

final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem)
  : (T, AccumulatorUpdates) = {

      //首先要创建taskContext,我们在runJob指定的function中有个参数就是TaskContext,就在这进行初始化。
    context = new TaskContextImpl(
      stageId,
      partitionId,
      taskAttemptId,
      attemptNumber,
      taskMemoryManager,
      metricsSystem,
      internalAccumulators,
      runningLocally = false)
    TaskContext.setTaskContext(context)
    context.taskMetrics.setHostname(Utils.localHostName())
    context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    try {

        //开始执行runTask方法,用刚才创建的TaskContext
      (runTask(context), context.collectAccumulators())
    } finally {
      context.markTaskCompleted()
      try {
        Utils.tryLogNonFatalError {
          // Release memory used by this thread for unrolling blocks
          SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
          // Notify any tasks waiting for execution memory to be freed to wake up and try to
          // acquire memory again. This makes impossible the scenario where a task sleeps forever
          // because there are no other tasks left to notify it. Since this is safe to do but may
          // not be strictly necessary, we should revisit whether we can remove this in the future.
          val memoryManager = SparkEnv.get.memoryManager
          memoryManager.synchronized { memoryManager.notifyAll() }
        }
      } finally {
        TaskContext.unset()
      }
    }
  }


最后代码运行到runTask,参数是TaskContext


def runTask(context: TaskContext): T

这是一个抽象的方法,并没有实现,一个有两个子类来实现,一个是 ResultTask 另外一个是ShuffleMapTask。
从名字上来看,ShuffleMapTask是负责shuffle过程的,ResultTask是返回结果的。事实也是这么个情况。

只有最后一个stage创建的task才是ResultTask,其他的stage生成的task是ShuffleMapTask。

runJob提交的funciton

val func = (tc: TaskContext, it: Iterator[String]) => {}

作用到ResultTask的。

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

ResultTask#runTask,在这个函数中看到继续反序列化task,得到func方法,最后执行func方法,参数是创建的taskContext和计算好的iterator结果。

原文地址:https://www.cnblogs.com/luckuan/p/5252574.html