Spark消息通信原理(三)——Spark运行时消息通信

一、Spark的应用程序执行过程:
        在Spark中,每一个“作业”称为一个应用程序(Application),每一个Application都必须有一个SparkContext,相当于application的入口,或者理解为环境。当用户(Client)提交应用程序(Application)时,该application的SparkContext就会向Master发送应用注册消息,master会根据该应用所需要的资源来分配Executor进程,Executor进程分布在各个Worker机器上面,并由WorkerEndpoint利用Driver的command环境来创建;Executor创建完成后,向Driver注册为该application的执行器,注册成功后,就向SparkContext发送注册成功消息;当SparkContext的RDD执行action算子时,就会触发执行操作,Driver根据RDD生成DAG图(有向无环图),通过DAGSecheduler进行划分stage,并将stage转化为TaskSet(一个TaskSet由多个Task组成,即Task的集合);接着由TaskSecheduler向已经注册的Executor发送执行消息,Executor接收到任务消息后,启动并运行;最后当所有任务运行结束后,由Driver处理各个executor的计算结果,并回收资源。

下面两图是Application的内部划分结构图:

        Spark的Application和SparkContext关联(正如上面所说的,Application通过SparkContext与各节点进行关联),每个Application中有一个或者多个job,可以并行或者串行运行job;job里面包含多个stage,stage以shuffle进行划分,stage包含多个task(一个partition就是一个task),多个task构成TaskSet,task则是任务的最小工作单元。

二、Spark运行消息通信交互过程


        执行应用程序需要启动SparkContext,在SparkContext启动中,会先在DriverEndpoint中实例化SchedulerBackend对象(Standalone模式下,实例化的是SparkDeploySchedulerBackend对象),该对象继承DriverEndpoint和ClientEndpoint两个终端点。

(1)ClientEndpoint向MasterEndpoint注册Application
        在Spark消息通信原理(二)提过,所有终端点都有tryRegisterAllMasters方法,用于向master注册某些消息。ClientEndpoint的tryRegisterAllMasters方法,则是用于向master注册Application的消息。

ptivate def tryRegisterAllMasters(): Array[JFuture[_]] = {
    for(masterAddress <- masterRpcAddresses) yield {
        //想线程池中启动注册线程,只要当线程读到的注册成功标识为true时,退出注册线程
        registerMasterThreadPool.submit(new Runnable{
            override def run():Unit = try{
                //判断注册成功标识
                if(registered){
                    return
                }
                //获取Master终端点的引用,用来发送注册应用信息
                val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
                //向master发送注册应用信息
                masterRef.send(RegisterApplication(appDescription, self))
            }catch{...}
        })
    }
}

(2)MasterEndpoint处理申请注册Application的消息
        当Master接收到注册应用消息时,master在registerApplication方法中做了两件事:1、记录application信息,并加入到应用列表中(FIFO执行);2、注册完毕后,master发送RegisteredApplication消息给ClientEndpoint,同时调用startExecutorsOnWorkers方法,发送LaunchExecutor消息,通知Worker启动Executor;

ClientEndpoint收到RegisteredApplication消息时会更新相关状态:

case RegisteredApplication(appId_, masterRef) =>
    appId.set(appId_)
    registered.set(true)
    master = Some(masterRef)
    listener.connected(appId.get)

startExecutorsOnWorkers方法中,首先获取符合执行应用的worker节点,然后遍历通知这些worker启动相应的executor(可能是一个或多个):

private def startExecutorsOnWorkers() :Unit = {
    //使用FIFO调度算法,先注册,先执行
    for(app <- waitingApps if app.coresLeft > 0){
        val coresPerExecutor:Option[Int] = app.desc.coresPerExecutor
 
        //找出存活的、剩余内存大于等于启动Executor所需大小的、核数大于等于1的worker
        val usableWorkers = workers.toArray.filter(_.state==WorkerState.ALIVE)
            .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1))
            .sortBy(_.coresFree).reverse
 
        //确定应用运行在哪些worker上,以及每个worker分配用于运行的核数
        //分配算法有两种:1、将应用运行在尽可能多的worker上;2、将应用运行在尽可能少的worker上
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
 
        //通知分配的worker启动worker
        for(pos <- 0 until usableWorkers.length if assignedCores(pos) > 0){
            allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
        }
    }
}

(3)Worker创建CoarseGrainedExecutorBackend对象,用于启动Executor进程
        当worker收到master发送的LaunchExecutor消息时,先实例化ExecutorRunner对象,实例化过程中会创建进程生成器(ProcessBuilder),然后由该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象就是Executor运行的容器,最后worker发送ExecutorStateChanged消息给Master。

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
    if (masterUrl != activeMasterUrl){
        logWarning("InvalidMaster (" + masterUrl + ") attempted to launch executor.")
    }else{
        try{
            //创建Executor执行目录
            val executorDir =  new File(workDir, appId + "/" + execId)
            if(!executorDir.mkdirs()){
                throw new IOException("Failed to creata directory " + executorDir)
            }
 
            //通过SPARK_EXECUTOR_DIRS环境变量,在worker中创建Executor执行目录,当程序执行完毕后,由worker进行删除
            val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map{ dir =>
                    val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                    Utils.chmod700(appDir)
                    appDir.getAbsolutePath()
                }.toSeq)
            appDirectories(appId) = appLocalDirs
 
            //在ExecutorRunner中创建CoarseGrainedExecutorBackend对象,使用的是应用信息中的command,command则是在SchedulerBackend中创建的
            val manager = new ExecutorRunner(
                            appId,
                            execId,
                            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command,conf)),
                            cores_,
                            memory_,
                            self,
                            workerId,
                            host,
                            webUi.boundPort,
                            publicAddress,
                            sparkHome,
                            executorDir,
                            workerUri,
                            conf,
                            appLocalDirs,
                            ExecutorState.RUNNING)
            executors(appId + "/" + execId) = manager
            manager.start()
            coresUsed += cores_
            memoryUsed += memory_
 
            //向master发送ExecutorStateChanged消息,表示Executor状态已经更改为Executor.RUNNING
            sendToMaster(ExecutorStateChanged(appId,execId,manager.state,None,None))
        }catch{...}
    }

(4)DriverEndpoint处理RegisterExecutor消息
        在第(3)点有提到,CoarseGrainedExecutorBackend对象是Executor的容器,该对象是在ExecutorRunner实例化时被创建,以及启动,启动时,会向DriverEndpoint发送RegisterExecutor消息。Driver接收到注册消息后,先判断需要注册的Executor是否已经被注册在列表当中,如果存在,则返回RegisterExecutorFailed消息返回CoarseGrainedExecutorBackend;如果不存在,则Driver会记录该Executor信息,并发送RegisteredExecutor消息。最后Driver分配任务所需资源,并发送LaunchTask消息。

case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
    if(executorDataMap.contains(executorId)){
        //判断列表是否已经存在该executor
        executorRef.send(RegisterExecutorFailed("Duplicate executor ID:" + executorId))
        context.reply(true)
    }else{
        ...
        //1、记录该Executor的编号,以及需要的核数
        addressToExecutorId(executorRef.address) = executorId
        totalCoreCount.addAndGet(cores)
        totalRegisteredExecutors.addAndGet(1)
        val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
        //2、创建Executor编号和其具体信息的键值列表
        CoarseGrainedSchedulerBackend.this.synchronized{
            executorDataMap.put(executorId, data)
            if(currentExecutorIdCounter < executorId.toInt){
                currentExecutorIdCounter = executorId.toInt    //记录、更新当前executor数量
            }
            if(numPendingExecutors > 0){
                numPendingExecutors -= 1
            }
        }
        //3、向CoarseGrainedSchedulerBackend发送注册成功信息;
        executorRef.send(RegisteredExecutor(executorAddress.host))
        //4、并监听在总线中加入添加Executor事件
        listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
 
        //5、分配资源,并向Executor发送LaunchTask任务消息
        makeOffers()
    }

(5)CoarseGrainedExecutorBackend实例化Executor对象
        当CoarseGrainedExecutorBackend收到来自Driver发过来的RegisteredExecutor消息时,就会实例化Executor对象。启动Executor完毕,就会定时向Driver发送心跳。由(3)(4)、(5)步骤来看,executor并不是由WorkerEndpoint直接创建,而是Worker先创建CoarseGrainedExecutorBackend对象,然后CoarseGrainedExecutorBackend对象向Driver注册Executor,注册成功后,才让CoarseGrainedExecutorBackend实例化Executor对象,最后Executor交给Driver管理。CoarseGrainedExecutorBackend处理RegisteredExecutor消息的源码:

case RegisteredExecutor =>
    logInfo("Successfully registered with driver")
    //根据环境实例化(启动)Executor
    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

executor发送心跳,等待Driver下发任务:

private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
private def startDriverHeartbeater() : Unit = {
    val intevalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
 
    //等待随机的时间间隔,这样,心跳在同步中不会结束
    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
    val heartbeatTask = new Runnable(){
        override def run():Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
 
    //发送心跳
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}

(6)DriverEndpoint向Executor发送LaunchTask消息
        executor接收到LaunchTask消息之后,就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。任务执行完成后,分别给CoarseGrainedExecutorBackend和Driver发送状态变更,然后继续等待任务分配(Driver继续分配任务前,会先对执行结果进行处理)。

case LaunchTask(data) =>
    if(executor == null){
        //当executor没有实例化(启动),输出异常日志,并关闭Executor
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
    }else{
        val taskDesc = ser.desrialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
 
        //启动TaskRunner进程
        executor.launchTask(this, taskId=taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
    }
 
 
//启动TaskRunner进程的方法
def launchTask(context:ExecutorBackend, taskId:Long, attemptNumber:Int, taskName:String, serializedTask:ByteBuffer) : Unit = {
    //创建当前task的TaskRunner
    val tr = new TaskRunner(context, taskId=taskId, attemptNumber=attemptNumber, taskName, serializedTask)
    //将当前task的TaskRunner放进threadPool里面,统一由Executor调度
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
}

(7)Driver进行StatusUpdate
        当DriverEndpoint接收到Executor发送过来的StatusUpdate消息后,调用TaskSchedulerImpl的statusUpdate方法,根据不同executor执行后的结果进行处理,处理完毕后,继续给Executor发送LaunchTask消息。

case StatusUpdate(executorId, taskId, state, data) =>
    scheduler.statusUpdate(taskId, state, data.value)    //scheduler是TaskSchedulerImpl的一个引用
    if(TaskState.isFinished(state)){
        executorDataMap.get(executorId) match{
            case Some(executorInfo) =>
                executorInfo.freeCores += scheduler.CPUS_PER_TASK
                //继续向刚才的Executor发送LaunchTask消息,跟(4)中,Driver处理RegisterExecutor消息时调用的是同一个方法
                makeOffers(executorId)    
            case None =>
        }
    }

        至此,不断重复(6)、(7)操作,直至所有任务执行完毕。

原文地址:https://www.cnblogs.com/SysoCjs/p/11345395.html