Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本课主题

  • Master 资源调度的源码鉴赏  

资源调度管理

  1. 任务调度与资源是通过 DAGScheduler、TaskScheduler、SchedulerBackend 等进行的作业调度
  2. 资源调度是指应用程序如何获得资源
  3. 任务调度是在资源调度的基础上进行的,没有资源调度那么任务调度就成为了无源之水无本之木

Master 资源调度的源码鉴赏

  1. 因为 Master 负责资源管理和调度,所以资源调度方法 scheduer 位于 Master.scala 这个类中,当注册程序或者资源发送改变的时候都会导致 Scheduler 的调用,例如注册的时候。

    当这个应用程序向 Master 注册的时候,会把 ApplicationInfo 的信息放在一个 case class 里,过程中会新增新的 AppId,当注册的时候会分发给不同的数据结构记录起来,比如说 idToApp、endpointToApp、



  2. Scheduler 调用的时机,每次都有新的应用程序提交或者集群資源狀況发生改变的时候(包括 Executor 增加或者減少、Worker 增加或者減少等)具体代码运行顺序:scheduler( ) --> Random.shuffle( ) --> 有一个for循环过滤出ALIVE的Worker --> 过滤出付合Memory和Cores的Worker --> 然后调用 lanuchDriver( ) --> startExecutorsOnWorker( )

    WorkerState 有以下几种:ALIVE, DEAD, DECOMMISSIONED, UNKNOWN

    • 当前 Master 必需是 Alive 的方式才可以进行资源调度,一开始的时候会判断一下状态,如果不是 Alive 的状态会直接返回,也就是 StandByMaster 不会进行 Application 的资源调用
    • 使用 Random.shuffle 把 Master 中保留的集群中所有 Worker 的信息随机打乱;其算法內部是循环随机交换所有 Worker 在 Master 緩存的數数据结构中的位置
    • 接下来要判断所有 Worker 中哪些是 ALIVE 级別的 Worker 才能夠参与资源的分配工作
    • 当 SparkSubmit 指定 Driver 在 Cluster 模式的情況下,此时 Driver 会加入 waitingDrivers 等待列表中,在每个 DriverInfo 中的 DriverDescription 中要启动 Driver 时候对 Worker 的內存及 CPU 要求等內容:

    • 在符合资源要求的情況下然后采用随机打乱后的的一个 Worker 来启动 Driver,Master 发指令给 Worker 让远程的 Worker 启动 Driver,这就可以保证负载均衡。先启动 Driver 才会发生后续的一切的资源调度的模式
       

正式启动在Worker中启动Executor

Spark 默认为应用程序启动 Executor 的方式是 FIFO 的方式,也就是说所有的提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的分配的基础上才能夠满足下一各应用程序资源的分配。正式启动在Worker中启动Executor:为应用程序具体分配 Executor 之前要判断应用程序是否还需要分配 core 如果不需要则不会会应用程序分配 Executor

  • 具体分配 Executor 之前要求 Worker 必需是 Alive 的状态且必需满足 Application 对每个 Executor 的內存和 Cores 的要求,并且在此基础上进行排序,谁的 Cores 多就排在前面。计算资源由大到小的 usableWorkers 数据库结构。把最好的资源放在前面。


    在 FIFO 的情況下默认是 spreadOutApps 来让应用程序尽可能多的运行在所有的 Node 上。
  • 然后调用 scheduleExecutorsOnWorkers 方法,为应用程序分配 executor 有两种情況,第一种方式是尽可能在集群的所有 Worker 上分配 Executor ,这种方式往往会来带潜在的更好的数据本地性。具体在集群上分配 Cores 的时候会尽可能的满足我们的要求,如果是每个 Worker 下面只能够为当前的应用程序分配一个 Executor 的话,每次是分配一个 Core! (每次为这个 Executor 增加一个 Core)。每次给 Executor 增加的时候都是增加一个 Core, 如果是 spreadout 的方式,循环一轮下一轮,假设有4个 Executors,如果 spreadout 的方式,它会在每个 Worker 中启动一个 Executor,  第一次为每个 Executor 分配一个线程,第二次再次循环后再分配一个线程。
  • 然后调用 allocateWorkerResourceToExecutors 方法
  • 然后会调用 addExecutor 方法 

  • 新增 Executor 后然后就真正的启动 Executor,准备具体要为当前应用程序分配的 Executor 信息后,Master 要通过远程通信发指令给 Worker 來具体启动 ExecutorBackend 进程,紧接给我们应用程序的 Driver 发送一个 ExecutorAdded 的信息。(Worker收到由Master发送LaunchExector信息之后如何处理可以参考我的下一篇博客!)


    LaunchExecutor case class 数据结构,Master 会把这个数据发送到 Worker 

    Master 会把这个数据发送到 Driver 

[总结部份]

更新中......

原文地址:https://www.cnblogs.com/sky-sql/p/9078719.html