BlockTransferService 实现

 spark的block管理是通过BlockTransferService定义的方法从远端获取block、将block存储到远程节点。shuffleclient生成过程就会引入blockTransferService。

类的定义如下:

 定义了目标节点的主机名和端口号,还定义了批量获取,批量保存,单个block的同步获取和保存。初始化服务和关闭服务方法。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.network

import java.io.Closeable
import java.nio.ByteBuffer

import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag

import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.ThreadUtils

private[spark]
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {

  /**
    * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    * local blocks or put local blocks.
    *
    * 通过BlockDataManager来初始化BlockTransferService,可以获取和保存blocks
    */
  def init(blockDataManager: BlockDataManager): Unit

  /**
    * Tear down the transfer service.
    * 关闭服务
    */
  def close(): Unit

  /**
    * Port number the service is listening on, available only after [[init]] is invoked.
    */
  def port: Int

  /**
    * Host name the service is listening on, available only after [[init]] is invoked.
    */
  def hostName: String

  /**
    * 从远程节点获取blocks信息。
    * Fetch a sequence of blocks from a remote node asynchronously,
    * available only after [[init]] is invoked.
    *
    * 可以批量按顺序获取blcok,批量获取到block信息。当读取到一个block信息时候触发listener无需等待全部block fetched
    *
    * Note that this API takes a sequence so the implementation can batch requests, and does not
    * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
    * the data of a block is fetched, rather than waiting for all blocks to be fetched.
    */
  override def fetchBlocks(
                            host: String,
                            port: Int,
                            execId: String,
                            blockIds: Array[String],
                            listener: BlockFetchingListener): Unit

  /**
    * Upload a single block to a remote node, available only after [[init]] is invoked.
    * 上传single block 到一个远端节点
    */
  def uploadBlock(
                   hostname: String,
                   port: Int,
                   execId: String,
                   blockId: BlockId,
                   blockData: ManagedBuffer,
                   level: StorageLevel,
                   classTag: ClassTag[_]): Future[Unit]

  /**
    * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
    * 获取1个block信息。并且同步阻塞等待。
    * It is also only available after [[init]] is invoked.
    * 获取block。主机、端口、execId、blockId
    */
  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
    // A monitor for the thread to wait on.
    val result = Promise[ManagedBuffer]()
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
          result.failure(exception)
        }

        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          val ret = ByteBuffer.allocate(data.size.toInt)
          ret.put(data.nioByteBuffer())
          ret.flip()
          result.success(new NioManagedBuffer(ret))
        }
      })
    ThreadUtils.awaitResult(result.future, Duration.Inf)
  }

  /**
    * Upload a single block to a remote node, available only after [[init]] is invoked.
    * 上传一个block,并且同步阻塞等待。
    * This method is similar to [[uploadBlock]], except this one blocks the thread
    * until the upload finishes.
    */
  def uploadBlockSync(
                       hostname: String,
                       port: Int,
                       execId: String,
                       blockId: BlockId,
                       blockData: ManagedBuffer,
                       level: StorageLevel,
                       classTag: ClassTag[_]): Unit = {
    val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
    ThreadUtils.awaitResult(future, Duration.Inf)
  }
}

 BlockTransferService默认为NettyBlockTransferService,基于Netty的网络应用框架,提供网络连接。

有两个重要方法fetchBlocks、uploadBlock。即获取和保存block信息。

fetchBlocks:

//获取blocks数据,需要用主机名称,端口号,excutorid和blockids
  override def fetchBlocks(
                            host: String,
                            port: Int,
                            execId: String,
                            blockIds: Array[String],
                            listener: BlockFetchingListener): Unit = {
    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
    try {
      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
        override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
          //clientFactory维护了一个client数组,如果指定主机和端口的连接,获取或者创建一个与目标主机和端口的socket连接
          val client = clientFactory.createClient(host, port)
          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
        }
      }

      val maxRetries = transportConf.maxIORetries()
      if (maxRetries > 0) {
        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
        // a bug in this code. We should remove the if statement once we're sure of the stability.
        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
      } else {
        blockFetchStarter.createAndStart(blockIds, listener)
      }
    } catch {
      case e: Exception =>
        logError("Exception while beginning fetchBlocks", e)
        blockIds.foreach(listener.onBlockFetchFailure(_, e))
    }
  }

 通过TransportClientFactory创建一个读取客户端,实现如下:

  

  public TransportClient createClient(String remoteHost, int remotePort)
      throws IOException, InterruptedException {
    // Get connection from the connection pool first.
    // If it is not found or not active, create a new one.
    // Use unresolved address here to avoid DNS resolution each time we creates a client.
    final InetSocketAddress unresolvedAddress =
      InetSocketAddress.createUnresolved(remoteHost, remotePort);

    // Create the ClientPool if we don't have it yet.
    ClientPool clientPool = connectionPool.get(unresolvedAddress);
    if (clientPool == null) {
      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
      clientPool = connectionPool.get(unresolvedAddress);
    }

    int clientIndex = rand.nextInt(numConnectionsPerPeer);
    TransportClient cachedClient = clientPool.clients[clientIndex];

    if (cachedClient != null && cachedClient.isActive()) {
      // Make sure that the channel will not timeout by updating the last use time of the
      // handler. Then check that the client is still alive, in case it timed out before
      // this code was able to update things.
      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
        .get(TransportChannelHandler.class);
      synchronized (handler) {
        handler.getResponseHandler().updateTimeOfLastRequest();
      }

      if (cachedClient.isActive()) {
        logger.trace("Returning cached connection to {}: {}",
          cachedClient.getSocketAddress(), cachedClient);
        return cachedClient;
      }
    }
View Code

 意思是维护一个client数组,当所需的客户端不存在的时候,创建一个新的网络连接,然后将谅解保存到client数组中。

final long preResolveHost = System.nanoTime();
    final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
    if (hostResolveTimeMs > 2000) {
      logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    } else {
      logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    }

uploadBlock:

override def uploadBlock(
                            hostname: String,
                            port: Int,
                            execId: String,
                            blockId: BlockId,
                            blockData: ManagedBuffer,
                            level: StorageLevel,
                            classTag: ClassTag[_]): Future[Unit] = {
    val result = Promise[Unit]()
    val client = clientFactory.createClient(hostname, port)

    // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
    // Everything else is encoded using our binary protocol.
    val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))

    // Convert or copy nio buffer into array in order to serialize it.
    val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
    //通过Netty发送message,构造的Channel对象,new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer为message
    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
      new RpcResponseCallback {
        override def onSuccess(response: ByteBuffer): Unit = {
          logTrace(s"Successfully uploaded block $blockId")
          result.success((): Unit)
        }

        override def onFailure(e: Throwable): Unit = {
          logError(s"Error while uploading block $blockId", e)
          result.failure(e)
        }
      })

    result.future
  }

  同样通过TransportClientFactory创建一个读取客户端,通过

client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
      new RpcResponseCallback {
        override def onSuccess(response: ByteBuffer): Unit = {
          logTrace(s"Successfully uploaded block $blockId")
          result.success((): Unit)
        }

        override def onFailure(e: Throwable): Unit = {
          logError(s"Error while uploading block $blockId", e)
          result.failure(e)
        }
      })

    result.future
  }

方法将数据保存到远端节点。其中new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer为消息体内容,

new RpcResponseCallback {
        override def onSuccess(response: ByteBuffer): Unit = {
          logTrace(s"Successfully uploaded block $blockId")
          result.success((): Unit)
        }

        override def onFailure(e: Throwable): Unit = {
          logError(s"Error while uploading block $blockId", e)
          result.failure(e)
        }
      }为回掉函数。
原文地址:https://www.cnblogs.com/fantiantian/p/9510104.html