spark storage之SparkEnv

此文旨在对spark storage模块进行分析,整理自己所看所得,等以后再整理。

ok,首先看看SparkContext中sparkEnv相关代码:

1   private[spark] def createSparkEnv(
2       conf: SparkConf,
3       isLocal: Boolean,
4       listenerBus: LiveListenerBus): SparkEnv = {
5     SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
6   }
7 
8   private[spark] def env: SparkEnv = _env

话说这怎么插入代码找不到Scala。。

SparkContext中调用Object SparkEnv的createDriverEnv来创建SparkEnv。从这个入口进入看看sparkEnv做了什么:

 1   /**
 2    * Create a SparkEnv for the driver.
 3    */
 4   private[spark] def createDriverEnv(
 5       conf: SparkConf,
 6       isLocal: Boolean,
 7       listenerBus: LiveListenerBus,
 8       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
 9     assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
10     assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
11     val hostname = conf.get("spark.driver.host")
12     val port = conf.get("spark.driver.port").toInt
13     create(
14       conf,
15       SparkContext.DRIVER_IDENTIFIER,
16       hostname,
17       port,
18       isDriver = true,
19       isLocal = isLocal,
20       listenerBus = listenerBus,
21       mockOutputCommitCoordinator = mockOutputCommitCoordinator
22     )
23   }
View Code

首先确定了driver的host和port,然后执行create。

  1   private def create(
  2       conf: SparkConf,
  3       executorId: String,
  4       hostname: String,
  5       port: Int,
  6       isDriver: Boolean,
  7       isLocal: Boolean,
  8       listenerBus: LiveListenerBus = null,
  9       numUsableCores: Int = 0,
 10       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
 11 
 12     // Listener bus is only used on the driver
 13     if (isDriver) {
 14       assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
 15     }
 16 
 17     val securityManager = new SecurityManager(conf)
 18 
 19     // Create the ActorSystem for Akka and get the port it binds to.
 20     val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
 21     val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
 22     val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
 23 
 24     // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
 25     if (isDriver) {
 26       conf.set("spark.driver.port", rpcEnv.address.port.toString)
 27     } else {
 28       conf.set("spark.executor.port", rpcEnv.address.port.toString)
 29     }
 30 
 31     // Create an instance of the class with the given name, possibly initializing it with our conf
 32     def instantiateClass[T](className: String): T = {
 33       val cls = Utils.classForName(className)
 34       // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
 35       // SparkConf, then one taking no arguments
 36       try {
 37         cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
 38           .newInstance(conf, new java.lang.Boolean(isDriver))
 39           .asInstanceOf[T]
 40       } catch {
 41         case _: NoSuchMethodException =>
 42           try {
 43             cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
 44           } catch {
 45             case _: NoSuchMethodException =>
 46               cls.getConstructor().newInstance().asInstanceOf[T]
 47           }
 48       }
 49     }
 50 
 51     // Create an instance of the class named by the given SparkConf property, or defaultClassName
 52     // if the property is not set, possibly initializing it with our conf
 53     def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
 54       instantiateClass[T](conf.get(propertyName, defaultClassName))
 55     }
 56 
 57     val serializer = instantiateClassFromConf[Serializer](
 58       "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
 59     logDebug(s"Using serializer: ${serializer.getClass}")
 60 
 61     val closureSerializer = instantiateClassFromConf[Serializer](
 62       "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
 63 
 64     def registerOrLookupEndpoint(
 65         name: String, endpointCreator: => RpcEndpoint):
 66       RpcEndpointRef = {
 67       if (isDriver) {
 68         logInfo("Registering " + name)
 69         rpcEnv.setupEndpoint(name, endpointCreator)
 70       } else {
 71         RpcUtils.makeDriverRef(name, conf, rpcEnv)
 72       }
 73     }
 74 
 75     val mapOutputTracker = if (isDriver) {
 76       new MapOutputTrackerMaster(conf)
 77     } else {
 78       new MapOutputTrackerWorker(conf)
 79     }
 80 
 81     // Have to assign trackerActor after initialization as MapOutputTrackerActor
 82     // requires the MapOutputTracker itself
 83     mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
 84       new MapOutputTrackerMasterEndpoint(
 85         rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
 86 
 87     // Let the user specify short names for shuffle managers
 88     val shortShuffleMgrNames = Map(
 89       "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
 90       "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
 91       "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
 92     val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
 93     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
 94     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 95 
 96     val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
 97 
 98     val blockTransferService =
 99       conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
100         case "netty" =>
101           new NettyBlockTransferService(conf, securityManager, numUsableCores)
102         case "nio" =>
103           logWarning("NIO-based block transfer service is deprecated, " +
104             "and will be removed in Spark 1.6.0.")
105           new NioBlockTransferService(conf, securityManager)
106       }
107 
108     val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
109       BlockManagerMaster.DRIVER_ENDPOINT_NAME,
110       new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
111       conf, isDriver)
112 
113     // NB: blockManager is not valid until initialize() is called later.
114     val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
115       serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
116       numUsableCores)
117 
118     val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
119 
120     val cacheManager = new CacheManager(blockManager)
121 
122     val httpFileServer =
123       if (isDriver) {
124         val fileServerPort = conf.getInt("spark.fileserver.port", 0)
125         val server = new HttpFileServer(conf, securityManager, fileServerPort)
126         server.initialize()
127         conf.set("spark.fileserver.uri", server.serverUri)
128         server
129       } else {
130         null
131       }
132 
133     val metricsSystem = if (isDriver) {
134       // Don't start metrics system right now for Driver.
135       // We need to wait for the task scheduler to give us an app ID.
136       // Then we can start the metrics system.
137       MetricsSystem.createMetricsSystem("driver", conf, securityManager)
138     } else {
139       // We need to set the executor ID before the MetricsSystem is created because sources and
140       // sinks specified in the metrics configuration file will want to incorporate this executor's
141       // ID into the metrics they report.
142       conf.set("spark.executor.id", executorId)
143       val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
144       ms.start()
145       ms
146     }
147 
148     // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
149     // this is a temporary directory; in distributed mode, this is the executor's current working
150     // directory.
151     val sparkFilesDir: String = if (isDriver) {
152       Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
153     } else {
154       "."
155     }
156 
157     val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
158       new OutputCommitCoordinator(conf, isDriver)
159     }
160     val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
161       new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
162     outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
163 
164     val executorMemoryManager: ExecutorMemoryManager = {
165       val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
166         MemoryAllocator.UNSAFE
167       } else {
168         MemoryAllocator.HEAP
169       }
170       new ExecutorMemoryManager(allocator)
171     }
172 
173     val envInstance = new SparkEnv(
174       executorId,
175       rpcEnv,
176       serializer,
177       closureSerializer,
178       cacheManager,
179       mapOutputTracker,
180       shuffleManager,
181       broadcastManager,
182       blockTransferService,
183       blockManager,
184       securityManager,
185       httpFileServer,
186       sparkFilesDir,
187       metricsSystem,
188       shuffleMemoryManager,
189       executorMemoryManager,
190       outputCommitCoordinator,
191       conf)
192 
193     // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
194     // called, and we only need to do it for driver. Because driver may run as a service, and if we
195     // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
196     if (isDriver) {
197       envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
198     }
199 
200     envInstance
201   }
View Code

1、创建ActorSystem,并且绑定port

  对于每次创建SparkEnv,都要创建一个rpcEnv来进行通信,在这里创建使用的是

1 val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
View Code

  这里的ActorSystemName对于Drvier和executor都有着约定好的名称,driver为‘sparkDriver’,executor为'sparkExecutor'。在创建rpcEnv时,使用的是

 1   private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
 2     // Add more RpcEnv implementations here
 3     val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
 4     val rpcEnvName = conf.get("spark.rpc", "akka")
 5     val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
 6     Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
 7   }
 8 
 9   def create(
10       name: String,
11       host: String,
12       port: Int,
13       conf: SparkConf,
14       securityManager: SecurityManager): RpcEnv = {
15     // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
16     val config = RpcEnvConfig(conf, name, host, port, securityManager)
17     getRpcEnvFactory(conf).create(config)
18   }
View Code

  系统默认的RpcEnvFactory是‘org.apache.spark.rpc.akka.AkkaRpcEnvFactory’,代码写成这样大概是为了。。。拓展性?下面是AkkaRpcEnvFactory的内容

1 private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {
2 
3   def create(config: RpcEnvConfig): RpcEnv = {
4     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
5       config.name, config.host, config.port, config.conf, config.securityManager)
6     actorSystem.actorOf(Props(classOf[ErrorMonitor]), "ErrorMonitor")
7     new AkkaRpcEnv(actorSystem, config.conf, boundPort)
8   }
9 }
View Code

  使用name,hostname,port,conf和securityManager创建了一个ActorSystem,并返回一个AkkaRpcEnv。不深究的话这个rpcEnv可以认为是一个通信模块,这个用来作为driver和executor的blockManager之间的通信。

2、在conf中设置driver或者executor的rpc port

3、创建一个serializer和closureSerializer(abstract class Serializer)

4、创建MapOutputTracker

5、对shuffle managers绑定短名称。。。话说这东西用map映射感觉也不怎么方便啊

6、创建shuffle manager(private[spark] trait ShuffleManager)和shuffle memory manager,并绑定

7、blockManager相关的初始化,这个是之后driver与executor通信的模块与资源管理模块。

 1     def registerOrLookupEndpoint(
 2         name: String, endpointCreator: => RpcEndpoint):
 3       RpcEndpointRef = {
 4       if (isDriver) {
 5         logInfo("Registering " + name)
 6         rpcEnv.setupEndpoint(name, endpointCreator)
 7       } else {
 8         RpcUtils.makeDriverRef(name, conf, rpcEnv)
 9       }
10     }
View Code

再次看这一段代码,可以看到,当创建SparkEnv的角色是driver时,setupEndpoint,启动这个Actor,name是BlockManagerMaster,当创建SparkEnv的角色时executor时,会通过

1   def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
2     val driverActorSystemName = SparkEnv.driverActorSystemName
3     val driverHost: String = conf.get("spark.driver.host", "localhost")
4     val driverPort: Int = conf.getInt("spark.driver.port", 7077)
5     Utils.checkHost(driverHost, "Expected hostname")
6     rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
7   }
View Code

获得driver的host和port,然后创建driver的EndPoint的Ref引用。

8、创建了broadcastManager, cacheManager, httpFileServer(仅driver,设定了conf配置中的spark.fileserver.uri)

9、创建metricsSystem。

  spark的测量模块包含3个部分,instance指定谁来使用这个测量模块。在spark中有若干个角色,如master, worker, client driver这些角色使用测量模块用以监控。现在spark中已经有master, worker, executor, driver, applications的实现。source指定数据来源,在spark中有spark内部来源(如MasterSource, WorkerSource等收集spark内部状态数据)和common source(更低层,如JVMSource,获取低层状态。由config配置并通过reflection载入)。sink指定metric data的目的地,可以指定多个。

10、创建sparkFilesDir,如果是driver创建一个临时路径,如果是executor使用当前工作目录。

11、创建executorMemoryManager

12、用刚才创建好的这些完成sparkEnv的创建。

  我们可以看到,SparkEnv包含了丰富的内容,有测量模块,block管理模块,甚至序列化模块。其他方面先跳过,来看一下这个sparkEnv创建好了怎么用,block又是怎么管理的。

 1     // Create and start the scheduler
 2     val (sched, ts) = SparkContext.createTaskScheduler(this, master)
 3     _schedulerBackend = sched
 4     _taskScheduler = ts
 5     _dagScheduler = new DAGScheduler(this)
 6     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 7 
 8     // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
 9     // constructor
10     _taskScheduler.start()
View Code

  后面的分析将基于deploy模式进行。现在deploy模式的scheduler创建完毕了,创建了个TaskSchedulerImpl,并且和唯一的DAGScheduler和SparkDeploySchedulerBackend绑定了。SparkDeploySchedulerBackend又创建了AppClient作为driver client端和Master保持通信。继承的CoarseGrainedSchedulerBackend又和executor进行通信。顾名思义,SchedulerBackend与executor通信的是task层面的,blockmanager通信的是存储层面的。那这两个有什么具体的区别?下篇文章应该是写这个。接下来看看Executor端的SparkEnv创建时的情况:

  首先SparkDeploySchedulerBackend创建了AppDescription,主要功能是app的描述信息,这个appDesc包含一个command用以在worker端启动executor,在deploy模式下,默认是如下: Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)(这个Command是一个case class)。也就是说启动的mainclass是CoarseGrainedExecutorBackend。这个AppDesc会随着RegisterApplication消息发送给Master端,Master端在通过调度之后发送消息LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)通知Worker启动Executor,worker收到启动命令之后执行executorRunner来启动并返回给master一个ExecutorStateChanged消息告诉Master已经启动。

  在ExecutorRunner中,会通过processbuilder来启动我们的Executor,也就是private[spark] object CoarseGrainedExecutorBackend extends Logging{}这个。启动过程中传递过来的有SparkConf的详细信息:

1       val driverConf = new SparkConf()
2       for ((key, value) <- props) {
3         // this is required for SSL in standalone mode
4         if (SparkConf.isExecutorStartupConf(key)) {
5           driverConf.setIfMissing(key, value)
6         } else {
7           driverConf.set(key, value)
8         }
9       }

  这样就获得driver的Actor的host和port。接下来就是和driver几乎相同的sparkEnv创建。

原文地址:https://www.cnblogs.com/gaoze/p/5190149.html