Spark 1.60的executor schedule

第一次看源码还是Spark 1.02。这次看新源码发现调度方式有了一些新的特征,在这里随便写一下。

不变的是,master还是接收Appclient和worker的消息,并且在接收RegisterApplication等消息后会执行一遍schedule()。schedule()依旧会先找到空闲的worker用以执行waitingDrivers。但是调度Executor的方式有了一点变化。

 1   private def startExecutorsOnWorkers(): Unit = {
 2     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
 3     // in the queue, then the second app, etc.
 4     for (app <- waitingApps if app.coresLeft > 0) {
 5       val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
 6       // Filter out workers that don't have enough resources to launch an executor
 7       val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
 8         .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
 9           worker.coresFree >= coresPerExecutor.getOrElse(1))
10         .sortBy(_.coresFree).reverse
11       val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
12 
13       // Now that we've decided how many cores to allocate on each worker, let's allocate them
14       for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
15         allocateWorkerResourceToExecutors(
16           app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
17       }
18     }
19   }

我们可以看到,在这里虽然依旧是那个简单的fifo调度,但是不再是对核心进行逐个调度,会适应executor对核心数的要求,在寻找usableWorkers时会找到memory和cores都满足条件的worker。这一点是为了改变之前的一个bug,即:

cluster has 4 workers with 16 cores each. User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is allocated at a time, 12 cores from each worker would be assigned to each executor. Since 12 < 16, no executors would launch [SPARK-8881]

其中scheduleExecutorsOnWorkers是找到每个可用的worker分配给当前app的核心数量。

简单而言这还是一个简单的fifo调度,比起Yarn默认的capacity来讲少了许多功能,而google的Borg论文《Large-scale cluster management at Google with Borg》中写的调度细节更是复杂,不过胜在简单易懂,看起代码来轻松许多。。。

原文地址:https://www.cnblogs.com/gaoze/p/5191986.html