Spark源码走读4——Scheduler

用户提交的Job到DAGScheduler后,会封装成ActiveJob,同时启动JobWaiter监听作业的完成情况。同时依据job中RDD的dependency和dependency属性(NarrowDependency,ShufflerDependecy),DAGScheduler会根据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。在每一个stage内部,根据stage产生出相应的task,包括ResultTask或是ShuffleMapTask,这些task会根据RDD中partition的数量和分布,产生出一组相应的task,并将其包装为TaskSet提交到TaskScheduler上去。

在上面有从作业提交、作业运行的例子上分析查看了源码。这一章从scheduler各个类的具体方法阅读源码。

DAGScheduler

DAGScheduler是高层级别的调度器。实现了stage-oriented调度。它计算一个DAG中stage的工作。并将这些stage输出落地物化。    最终提交stage以taskSet方式提交给TaskScheduler。DAGScheduler需要接收上下层的消息,它也是一个actor。这里主要看看他的一些事件处理。一下是的所处理的事件。


JobSubmitted



进入submitStage方法。submitStage提交stage,第一个提交的是没有父依赖关系的。


如果计算中发现当前的stage没有任何的依赖关系。则直接提交task。

源码中的getMissingParentStages是获取父stage。源码如下:


Ok继续submitStage,进入submitMissingTasks方法。该方法将stage根据parition拆分成task。然后生成TaskSet,并提交到TaskScheduler。该方法在之前有贴出来过,这里就不贴出来了。

DAGScheduler的主要功能:

1、接收用户提交的job。

2、以stage的形式划分job,并记录物化的stage。在stage内产生的task以taskSet的方式提交给taskScheduler。

TaskScheduler

TaskScheduler低级别的任务调度程序的接口,目前由TaskSchedulerImpl完全实现。该接口允许插入不同的任务调度。TaskScheduler接收DAGScheduler提交的taskSet,并负责发送任务到集群上运行。

TaskScheduler会根据部署方式而选择不同的SchedulerBackend来处理。针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合:

l  Local模式:TaskSchedulerImpl+ LocalBackend

l  Spark集群模式:TaskSchedulerImpl+ SparkDepolySchedulerBackend

l  Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend

l  Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend

TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。

TaskSchedulerImpl

TaskSchedulerImpl类就是负责为Task分配资源的。在CoarseGrainedSchedulerBackend获取到可用资源后就会通过makeOffers方法通知TaskSchedulerImpl对资源进行分配。

TaskSchedulerImpl的resourceOffers方法就是负责为Task分配计算资源的,在为Task分配好资源后又会通过lauchTasks方法发送LaunchTask消息通知Worker上的Executor执行Task。

下面看下TaskSchedulerImpl中的几个方法。

initialize:


initialize方法主要就是初始化选择调度模式,这个可以由用户自己配置。

Start

submitTasks


TaskScheduler中实际执行task时会调用Backend.reviveOffers,在spark内有多个不同的backend:


Stage

 一个stage是一组由相同函数计算出来的任务集合,它运行spark上的job。这里所有的任务都有相同的shuffle依赖。每个stage都是map函数计算,shuffle随机产生的,在这种情况下,它的任务的结果被输给stage,或者其返回一个stage,在这种情况下,它的任务直接计算发起的作业的动作(例如,count()),save()等)。都是ShuffleMapStage我们也可以跟踪每个节点上的输出分区。

Stage的构造如下:


Task

Task: 一个执行单元,在Spark有两种实现:

org.apache.spark.scheduler.ShuffleMapTask

org.apache.spark.scheduler.ResultTask

一个Spark工作会包含一个或者多个stages。一个ResultTask执行任务,并将任务输出driver应用。一个ShuffleMapTask执行的任务,并把任务输出到多个buckets(基于任务的分区)

TaskSet

由TaskScheduler提交的一组Task集合

TaskSetManager

在TaskSchedulerImpl单内使用taskset调度任务.此类跟踪每个任务,重试任务如果失败(最多的有限次数),并经由延迟调度处理局部性感知调度此使用taskset。其主要接口有它resourceOffer,它要求使用taskset是否愿意在一个节点上运行一个任务,statusUpdate,它告诉它其任务之一状态发生了改变


方法addPendingTask:

添加一个任务的所有没有被执行的任务列表,它是PendingTask。源码如下。


resourceOffer

解决如何在taskset内部schedule一个task。源码如下:



Conf

Property Name

Default

Meaning

spark.task.cpus

1

Number of cores to allocate for each task.

spark.task.maxFailures

4

Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.

spark.scheduler.mode

FIFO

The scheduling mode between jobs submitted to the same SparkContext. Can be set to FAIR to use fair sharing instead of queueing jobs one after another. Useful for multi-user services.

spark.cores.max

(not set)

When running on a standalone deploy cluster or aMesos cluster in "coarse-grained" sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos.

spark.mesos.coarse

false

If set to "true", runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job.

spark.speculation

false

If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.

spark.speculation.interval

100

How often Spark will check for tasks to speculate, in milliseconds.

spark.speculation.quantile

0.75

Percentage of tasks which must be complete before speculation is enabled for a particular stage.

spark.speculation.multiplier

1.5

How many times slower a task is than the median to be considered for speculation.

spark.locality.wait

3000

Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.

spark.locality.wait.process

spark.locality.wait

Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.

spark.locality.wait.node

spark.locality.wait

Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).

spark.locality.wait.rack

spark.locality.wait

Customize the locality wait for rack locality.

spark.scheduler.revive.interval

1000

The interval length for the scheduler to revive the worker resource offers to run tasks (in milliseconds).

spark.scheduler.minRegisteredResourcesRatio

0.0 for Mesos and Standalone mode, 0.8 for YARN

The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredResourcesWaitingTime.

spark.scheduler.maxRegisteredResourcesWaitingTime

30000

Maximum amount of time to wait for resources to register before scheduling begins (in milliseconds).

spark.localExecution.enabled

false

Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver.





原文地址:https://www.cnblogs.com/huwf/p/4273376.html