创建Spark执行环境SparkEnv

SparkDriver 用于提交用户的应用程序, 

一、SparkConf

负责SparkContext的配置参数加载, 主要通过ConcurrentHashMap来维护各种`spark.*`的配置属性

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

    import SparkConf._

    /** Create a SparkConf that loads defaults from system properties and the classpath */
    def this() = this(true)

    /**
     * 维护一个ConcurrentHashMap 来存储spark配置
     */
    private val settings = new ConcurrentHashMap[String, String]()

    @transient private lazy val reader: ConfigReader = {
        val _reader = new ConfigReader(new SparkConfigProvider(settings))
        _reader.bindEnv(new ConfigProvider {
            override def get(key: String): Option[String] = Option(getenv(key))
        })
        _reader
    }

    if (loadDefaults) {
        loadFromSystemProperties(false)
    }

    /**
     * 加载spark.*的配置
     * @param silent
     * @return
     */
    private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
        // Load any spark.* system properties, 只加载spark.*的配置
        for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
            set(key, value, silent)
        }
        this
    }
}
View Code

二、SparkContext

2.1、创建Spark执行环境SparkEnv

SparkEnv是Spark的执行环境对象, 其中包括众多与Executor执行相关的对象。

创建, 主要通过SparkEnv.createSparkEnv, SparkContext初始化,只创建SparkEnv

  def isLocal: Boolean = Utils.isLocalMaster(_conf)

  // An asynchronous listener bus for Spark events
  //采用监听器模式维护各类事件的处理
  private[spark] val listenerBus = new LiveListenerBus(this)

  // This function allows components created by SparkEnv to be mocked in unit tests:
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    //创建DriverEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  }
View Code

继续进入createDriverEnv, 发现调用的是create方法, 该方法是为Driver或Executor创建SparkEnv

点击createExecutorEnv发现是CoarseGrainedExecutorBackend调用

下面具体看看create()中做了什么操作

 2.1.1、创建SecurityManager

    //创建SecurityManager
    val securityManager = new SecurityManager(conf, ioEncryptionKey)
    ioEncryptionKey.foreach { _ =>
      if (!securityManager.isSaslEncryptionEnabled()) {
        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
          "wire.")
      }
    }
View Code

2.1.2、创建RpcEnv

    val systemName = if (isDriver) driverSystemName else executorSystemName
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
      securityManager, clientMode = !isDriver)
View Code

 2.1.3、通过反射创建序列化器, 此处默认创建JavaSerializer

    // Create an instance of the class with the given name, possibly initializing it with our conf
    def instantiateClass[T](className: String): T = {
      val cls = Utils.classForName(className)
      // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
      // SparkConf, then one taking no arguments
      try {
        cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
          .newInstance(conf, new java.lang.Boolean(isDriver))
          .asInstanceOf[T]
      } catch {
        case _: NoSuchMethodException =>
          try {
            cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
          } catch {
            case _: NoSuchMethodException =>
              cls.getConstructor().newInstance().asInstanceOf[T]
          }
      }
    }

    // Create an instance of the class named by the given SparkConf property, or defaultClassName
    // if the property is not set, possibly initializing it with our conf
    def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
      instantiateClass[T](conf.get(propertyName, defaultClassName))
    }

    val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${serializer.getClass}")
View Code

2.1.3、创建SerializeManager 

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

    val closureSerializer = new JavaSerializer(conf)
View Code

2.1.4、创建BroadcastManager

  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
View Code

2.1.5、创建MapOutputTracker

    def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
      } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
      }
    }

    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

    //创建MapOutputTracker 区分Driver, Executor
    val mapOutputTracker = if (isDriver) {
      //Driver需要BroadcastManager
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }

    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
    // requires the MapOutputTracker itself
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
View Code

2.1.6、创建ShuffleManager

    // Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
View Code

2.1.7、创建 BlockManager

    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

    val blockManagerPort = if (isDriver) {
      conf.get(DRIVER_BLOCK_MANAGER_PORT)
    } else {
      conf.get(BLOCK_MANAGER_PORT)
    }

    val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)
View Code

2.1.8、创建MetricsSystem

    val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }
View Code

2.1.9、创建SparkEnv实例

    val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)
View Code

2.1.10、创建临时文件

    // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
    // called, and we only need to do it for driver. Because driver may run as a service, and if we
    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
    if (isDriver) {
      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
      envInstance.driverTmpDir = Some(sparkFilesDir)
    }
View Code

  

原文地址:https://www.cnblogs.com/chengbao/p/10604758.html