spark核心原理之SparkContext原理(1)

Spark是最为流行的分布式计算框架,这篇文章简要介绍spark 1.X版本任务调度的基本部件及其原理,包括SparkContext,SparkEnv,Executor,TaskScheduler,DAGScheduler以及其他部件。本文参考自《深入理解Spark》(by耿嘉安)一书。

1  SparkContext

SparkDriver用于用户提交任务,SparkDriver初始化首先需要SparkContext初始化,SparkContext配置了整个任务需要的上下文信息,其中配置参数由SparkConf初始化,SparkConf维护了一个ConcurrentHashMap记录系统配置和用户配置。

SparkContext初始化过程主要包括:

1)创建Spark执行环境SparkEnv;

2)创建RDD清理器metadataCleaner;

3)创建并初始化Spark UI;

4)Hadoop相关配置及Executor环境变量的设置;

5)创建任务调度TaskScheduler;

6)创建和启动DAGScheduler;

7)TaskScheduler的启动;

8)初始化块管理器BlockManager

9)启动测量系统MetricsSystem;

10)创建和启动Executor分配管理器ExecutorAllocationManager;

11)ContextCleaner的创建与启动;

12)Spark环境更新;

13)创建DAGSchedulerSource和BlockManagerSource;

14)将SparkContext标记为激活。

SparkContext默认只有一个实例,用户可以自定义配置。

2 SparkEnv

SparkEnv用于保存Spark任务的执行环境,SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。此进程用于创建Executor。创建SparkEnv主要使用SparkEnv的createDriverEnv。其主要过程包括如下:

1)创建安全管理器SecurityManager;

SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,使用HTTP连接设置口令认证。

2)创建基于Akka的分布式消息系统ActorSystem;

ActorSystem是Akka的创建分布式消息系统的基础类,akka在spark2.0中被替换为netty。

actor是一个封装了状态和行为的对象,每个actor都通过message交流,从自己的mailbox中读取别的actor发送的消息。Actor system 可以看做多个actors 的协作整体。actors可以通过这个整体单元来共享一些通用组件。例如调度服务、配置、日志服务等等。配置不同的Actor system可以共存在同一个jvm中。ActorSystem是重量级的对象,会创建1...N个线程,所以一个application一个ActorSystem。

3)创建Map任务输出跟踪器mapOutputTracker;

mapOutputTracker用于记录Map阶段任务的输出状态,每一个map任务或reduce任务都用mapid或reduceid唯一标识,通过mapOutputTracker可以让reduce阶段的任务找到并拉取map阶段任务输出数据。MapOutputTrackerMaster通过mapStatuses维护记录map任务输出状态,其中Executor会通过akka机制向MapOutputTracker中的MasterActor发送消息维护更新map任务信息。所以Executor初始化MapOutputTrackerWorker,而Driver创建MapOutputTrackerMaster。

4)实例化ShuffleManager;

ShuffleManager通过反射生成的SortShuffleManager实例,管理本地或远程shuffer。spark.shuffle.manager属性可修改为使用HashShuffleManager。SortShuffleManager通过内聚IndexShuffleBlockManager调用BlockManager中的DiskBlockManager将map结果根据shuffleId、mapId写文件,对应map过程也可调用MapOutputTrackerMaster的mapStatuses从本地或者远程节点读取文件,对应包含shuffer的reduce过程。

5)创建ShuffleMemoryManager;

ShuffleMemoryManager通过维护thread-Memory这一hashMap来记录所有shuffer任务的占用内存字节数,shuffer所有线程的最大内存占用计算公式如下:Java运行时最大内存*Spark的shuffle最大内存占比*Spark的安全内存占比。

6)创建块传输服务BlockTransferService,默认为NettyBlockTransferService,用于文件块在节点之间的远程传输。

7)创建BlockManagerMaster;

负责管理Block相关操作,Driver端创建BlockManagerMasterActor,注册到Actor-System中。Executor则从ActorSystem中获取BlockManagerMasterActor。最终BlockManagerMaster获取到对BlockManagerMasterActor的引用,从而进行相关操作。

8)创建块管理器BlockManager,负责对Block进行管理。具体细节在存储系统中详细介绍。

9)创建广播管理器BroadcastManager;

此部件主要用于将配置信息和序列化后的RDD,job等信息本地存储,或广播到其他节点进行备份,是通过工厂模式,用反射的方式创建实例,用户可以可以配置属性spark.broadcast.factory指定。

10)创建缓存管理器CacheManager;

用于缓存RDD计算的中间结果,为了迭代计算的效率,spark将中间结果RDDcache下来,下一次就无需重新创建RDD。

11)创建HTTP文件服务器HttpFileServer,主要提供对jar及其他文件的http访问,服务器用jetty内嵌实现。

12)创建测量系统MetricsSystem;

至此。sparkEnv初始完毕。

原文地址:https://www.cnblogs.com/lichongjie/p/7136273.html