创建RpcEnv

感觉这篇文章不错

2.1.2、创建RpcEnv

 -  RpcEndpoint

 -  RpcEndpointRef

    val systemName = if (isDriver) driverSystemName else executorSystemName
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
      securityManager, clientMode = !isDriver)
View Code

 进入SparkEnv create() , 实际调用`new NettyRpcEnvFactory().create(config)`

  def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      clientMode)
    //
    new NettyRpcEnvFactory().create(config)
  }
View Code

看看NettyRpcEnvFactory.create中具体做了什么

2.1.2.1、创建java序列化器

    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
View Code

2.1.2.2、创建一个NettyRpcEnv, 如果是个clientMode 就返回这个NettyRpcEnv

    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager)
View Code

2.1.2.3、非ClientMode, 调用`Utils.startServiceOnPort`, 传入startNettyRpcEnv, 是一个匿名函数,

    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
     Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } }

 调用`Utils.startServiceOnPort`, 通过startService启动一个服务在指定host,port,  实际就是回调上面的startNetyRpcEnv

所以我们返回看`nettyEnv.startServer(config.bindAddress, actualPort)`的功能

  /**
   * 创建一个TransportServer
   * @param bindAddress
   * @param port
   */
  def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
//创建server, server = transportContext.createServer(bindAddress, port, bootstraps) //dispatcher注册一个RpcEndpoint dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }

 待续。。。

 

原文地址:https://www.cnblogs.com/chengbao/p/10611285.html