spark1.1.0源码阅读-taskScheduler

1. sparkContext中设置createTaskScheduler

 1       case "yarn-standalone" | "yarn-cluster" =>
 2         if (master == "yarn-standalone") {
 3           logWarning(
 4             ""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")
 5         }
 6         val scheduler = try {
 7           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
 8           val cons = clazz.getConstructor(classOf[SparkContext])
 9           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
10         } catch {
11           // TODO: Enumerate the exact reasons why it can fail
12           // But irrespective of it, it means we cannot proceed !
13           case e: Exception => {
14             throw new SparkException("YARN mode not available ?", e)
15           }
16         }
17         val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
18         scheduler.initialize(backend) //调用实现类的initialize函数
19         scheduler

在taskSchedulerImpl.scala中

 1   def initialize(backend: SchedulerBackend) {
 2     this.backend = backend
 3     // temporarily set rootPool name to empty
 4     rootPool = new Pool("", schedulingMode, 0, 0)
 5     schedulableBuilder = {
 6       schedulingMode match {
 7         case SchedulingMode.FIFO =>
 8           new FIFOSchedulableBuilder(rootPool)
 9         case SchedulingMode.FAIR =>
10           new FairSchedulableBuilder(rootPool, conf)
11       }
12     }
13     schedulableBuilder.buildPools()
14   }

2. submitTasks

 1   override def submitTasks(taskSet: TaskSet) {
 2     val tasks = taskSet.tasks
 3     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
 4     this.synchronized {
 5       val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
 6       activeTaskSets(taskSet.id) = manager
 7       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
 8 
 9       if (!isLocal && !hasReceivedTask) {
10         starvationTimer.scheduleAtFixedRate(new TimerTask() {
11           override def run() {
12             if (!hasLaunchedTask) {
13               logWarning("Initial job has not accepted any resources; " +
14                 "check your cluster UI to ensure that workers are registered " +
15                 "and have sufficient memory")
16             } else {
17               this.cancel()
18             }
19           }
20         }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
21       }
22       hasReceivedTask = true
23     }
24     backend.reviveOffers()
25   }

3. CoarseGrainedSchedulerBackend的reviveOffers

1   override def reviveOffers() {
2     driverActor ! ReviveOffers  //将msg发给CoarseGrainedSchedulerBackend的driverActor
3   }
1       case ReviveOffers =>
2         makeOffers()
1     // Make fake resource offers on all executors
2     def makeOffers() {
3       launchTasks(scheduler.resourceOffers(
4         executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
5     }
1 /**
2  * Represents free resources available on an executor.
3  */
4 private[spark]
5 case class WorkerOffer(executorId: String, host: String, cores: Int)
 1   /**
 2    * Called by cluster manager to offer resources on slaves. We respond by asking our active task
 3    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
 4    * that tasks are balanced across the cluster.
 5    */
 6   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
 7     SparkEnv.set(sc.env)
 8 
 9     // Mark each slave as alive and remember its hostname
10     for (o <- offers) {
11       executorIdToHost(o.executorId) = o.host
12       if (!executorsByHost.contains(o.host)) {
13         executorsByHost(o.host) = new HashSet[String]()
14         executorAdded(o.executorId, o.host)
15       }
16     }
17 
18     // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
19     val shuffledOffers = Random.shuffle(offers)
20     // Build a list of tasks to assign to each worker.
21     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
22     val availableCpus = shuffledOffers.map(o => o.cores).toArray
23     val sortedTaskSets = rootPool.getSortedTaskSetQueue
24     for (taskSet <- sortedTaskSets) {
25       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
26         taskSet.parent.name, taskSet.name, taskSet.runningTasks))
27     }
28 
29     // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
30     // of locality levels so that it gets a chance to launch local tasks on all of them.
31     var launchedTask = false
32     for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
33       do {
34         launchedTask = false
35         for (i <- 0 until shuffledOffers.size) {
36           val execId = shuffledOffers(i).executorId
37           val host = shuffledOffers(i).host
38           if (availableCpus(i) >= CPUS_PER_TASK) {
39             for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
40               tasks(i) += task
41               val tid = task.taskId
42               taskIdToTaskSetId(tid) = taskSet.taskSet.id
43               taskIdToExecutorId(tid) = execId
44               activeExecutorIds += execId
45               executorsByHost(host) += execId
46               availableCpus(i) -= CPUS_PER_TASK
47               assert (availableCpus(i) >= 0)
48               launchedTask = true
49             }
50           }
51         }
52       } while (launchedTask)
53     }
54 
55     if (tasks.size > 0) {
56       hasLaunchedTask = true
57     }
58     return tasks
59   }

4. launchTasks

1     // Launch tasks returned by a set of resource offers
2     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
3       for (task <- tasks.flatten) {
4         freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
5         executorActor(task.executorId) ! LaunchTask(task)
6       }
7     }
 1 class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
 2   extends SchedulerBackend with Logging
 3 {
 4   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
 5   var totalCoreCount = new AtomicInteger(0)
 6   val conf = scheduler.sc.conf
 7   private val timeout = AkkaUtils.askTimeout(conf)
 8 
 9   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
10     private val executorActor = new HashMap[String, ActorRef]
11     private val executorAddress = new HashMap[String, Address]
12     private val executorHost = new HashMap[String, String]
13     private val freeCores = new HashMap[String, Int]
14     private val totalCores = new HashMap[String, Int]
15     private val addressToExecutorId = new HashMap[Address, String]
1   // Driver to executors
2   case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
 1 private[spark] class TaskDescription(
 2     val taskId: Long,
 3     val executorId: String,
 4     val name: String,
 5     val index: Int,    // Index within this task's TaskSet
 6     _serializedTask: ByteBuffer)
 7   extends Serializable {
 8 
 9   // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
10   private val buffer = new SerializableBuffer(_serializedTask)
11 
12   def serializedTask: ByteBuffer = buffer.value
13 
14   override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
15 }

5. CoarseGrainedSchedulerBackend收到executor的注册之后,记录executor

 1     def receive = {
 2       case RegisterExecutor(executorId, hostPort, cores) =>
 3         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
 4         if (executorActor.contains(executorId)) {
 5           sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
 6         } else {
 7           logInfo("Registered executor: " + sender + " with ID " + executorId)
 8           sender ! RegisteredExecutor(sparkProperties)
 9           executorActor(executorId) = sender
10           executorHost(executorId) = Utils.parseHostPort(hostPort)._1
11           totalCores(executorId) = cores
12           freeCores(executorId) = cores
13           executorAddress(executorId) = sender.path.address
14           addressToExecutorId(sender.path.address) = executorId
15           totalCoreCount.addAndGet(cores)
16           makeOffers()
17         }

executor先向CoarseGrainedSchedulerBackend注册,然后CoarseGrainedSchedulerBackend发task(序列化后)到这个executor上去。

6. CoarseGrainedExecutorBackend跟CoarseGrainedSchedulerBackend通信。

 1 private[spark] class CoarseGrainedExecutorBackend(
 2     driverUrl: String,
 3     executorId: String,
 4     hostPort: String,
 5     cores: Int,
 6     sparkProperties: Seq[(String, String)])
 7   extends Actor with ActorLogReceive with ExecutorBackend with Logging {
 8 
 9   Utils.checkHostPort(hostPort, "Expected hostport")
10 
11   var executor: Executor = null
12   var driver: ActorSelection = null
13 
14   override def preStart() {
15     logInfo("Connecting to driver: " + driverUrl)
16     driver = context.actorSelection(driverUrl)
17     driver ! RegisterExecutor(executorId, hostPort, cores) //注册
18     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
19   }
20 
21   override def receiveWithLogging = {
22     case RegisteredExecutor =>
23       logInfo("Successfully registered with driver")
24       // Make this host instead of hostPort ?
25       executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
26         false)
27 
28     case RegisterExecutorFailed(message) =>
29       logError("Slave registration failed: " + message)
30       System.exit(1)
31 
32     case LaunchTask(data) =>  //收到task
33       if (executor == null) {
34         logError("Received LaunchTask command but executor was null")
35         System.exit(1)
36       } else {
37         val ser = SparkEnv.get.closureSerializer.newInstance()
38         val taskDesc = ser.deserialize[TaskDescription](data.value)
39         logInfo("Got assigned task " + taskDesc.taskId)
40         executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)
41       }

 7. executor.launchTask

1   def launchTask(
2       context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
3     val tr = new TaskRunner(context, taskId, taskName, serializedTask)
4     runningTasks.put(taskId, tr)
5     threadPool.execute(tr)
6   }

且听下回分解

原文地址:https://www.cnblogs.com/Torstan/p/4158650.html