Spark-1.6.0中的Sort Based Shuffle源码解读

  从Spark-1.2.0开始,Spark的Shuffle由Hash Based Shuffle升级成了Sort Based Shuffle。即Spark.shuffle.manager从Hash换成了Sort。不同形式的Shuffle逻辑主要是ShuffleManager的实现类不同。
  在org.apache.spark.SparkEnv类中:

// Let the user specify short names for shuffle managers
 val shortShuffleMgrNames = Map(
  "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get( "spark.shuffle.manager" , "sort")
 val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

 val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode" , false)
val memoryManager: MemoryManager =
  if (useLegacyMemoryManager) {
    new StaticMemoryManager(conf , numUsableCores)
  } else {
    UnifiedMemoryManager(conf , numUsableCores)
  }

  可以看出,在Spark-1.6.0中可以支持三种模式的Shuffle,分别是hash shuffle,sort shuffle以及tungsten-sort shuffle。默认的是Sort Based Shuffle。
  如果需要更改Shuffle类型,需要设置的参数是spark.shuffle.manager,可选的参数有hash,sort(default),tungsten-sort,如果自定义了ShuffleManager类型,比如com.xx.yy.AbcShuffleManager,也可以将该参数设置为com.xx.yy.AbcShuffleManager。SparkEnv会根据配置的参数去查找该类。
同时,从Spark-1.6.0版本开始,引入了一个动态内存分配的功能,该功能默认是开启的,用户可以自己选择是否使用1.6.0之前版本Spark中的内存分配策略,通过配置参数spark.memory.useLegacyMode来决定,该参数默认为false。具体可以参考Spark内存管理-UnifiedMemoryManager和StaticMemoryManager

一、Sort Based Shuffle原理

  简单来说,Shuffle过程类似于MR程序中的Map-Reduce过程。可以分为Write和Read两个阶段。
在org.apache.spark.shuffle.sort.SortShuffleManager类的描述中写道:

在sort-based shuffle中,输入的records会根据它们key对应的partition ids进行排序,属于同一partition的记录不排序。然后将这些记录输出到一个map output文件中。Reducers从该输出文件的一个连续文件片段中读取属于它的分区的记录。当map输出文件太大内存无法装下时,这些排好序的文件块会spill到磁盘上,磁盘上的文件会最终会合并成一个按分区排好序的最终输出文件。
Sort-based shuffle的map输出文件有两种输出方式:
当同时满足如下三个条件时,以序列化的方式进行排序
  1、shuffle过程不需要进行aggregation或者输出不需要排序
  2、shuffle的序列化支持序列化值重新排序(比如KryoSerializer和Spark SQL常用的序列化器)
  3、shuffle产生的分区数小于16777216个
在上面三个条件之外的情况,都以非序列化的方式进行排序
序列化的排序方式:
在shuffle过程中,当records进入到shuffle writer同时就会被序列化,在整个排序过程中以序列化的形式缓存,这种方式的好处是:
  1、它的sort操作是在序列化的二进制数据上完成,而不是Java对象,这样减少了reduce时的内存消耗和GC压力。但是它会要求序列化器能够允许序列化的数据在不进行反序列化的操作情况下移动数据位置。
  2、它使用了一个ShuffleExternalSorter来对partition id和record pointer进行排序,在ShuffleExternalSorter中对排好序的每一个记录仅仅只用到8个字节,所以可以在内存中缓存更多的记录。
  3、spill过程中会对同一分区的序列化好了的记录在不进行反序列化的情况下进行合并。
  4、如果spill过程中的压缩器支持压缩数据的合并操作时,spill操作的最后将压缩好的spill文件进行合并生成最终spill文件时就仅仅只需要将每个输出文件进行简单的合并即可。可以避免在merge过程中进行解压缩和copy操作。

  Sort-based Shuffle是在Shuffle过程中有排序的操作,但是这个排序是部分排序。即只根据partition id对每个partition进行排序,但是同一个partition中的记录并不会被排序。但是如果是sortByKey操作需要对每条记录进行排序的话的话,各个partition中Record间的排序则在Reducer中完成。也就是说,假如有100条记录需要进行处理,并且处理后这100条记录会输出到10个partition中,假设编号为1~10,那么只会对1~10这10个输出分区进行排序,同属于分区1的记录并不会排序。对应下图中的FileSegment之间会进行排序,但是FileSegment中的记录不排序。
Sort Based Shuffle
  上图简单描述了Sort Based Shuffle的过程。每个Shuffle Map Task不会为每个Reducer单独产生一个文件,而是一个Map Task只生产一个最终文件,这个文件中根据不同partition id进行排序,然后有一个Index引导文件使得每个Reducer能很快的定位到其需要处理的FileSegment。
  

二、Sort Based Shuffle Write

  先找到Shuffle过程的入口。

1、ShuffleMapTask类

  在Scheduler模块中,一个DAG中除了最后一个Stage是FinalStage外,中间依赖的Stage都是ShuffleMapStage,在这个Stage中对应的Task类型都是ShuffleMapTask。ShuffleMapTask在Executor上运行时,最终调用的方法是ShuffleMapTask#runTask。
 其中主要的代码如下

val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get

  从SparkEnv中获得manager,即上文中提到的org.apache.spark.shuffle.SortShuffleManager。然后由该manager为当前ShuffleMapTask所对应的分区生成一个writer对象,这个writer是SortShuffleWriter类型。最后调用writer.write方法,将该分区循环写出。

2、SortShuffleWriter

  SortShuffleWriter#write方法的源码如下:

override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

  这里面前半段主要是生成一个sorter对象,根据是否有mapSideCombine来生成不同的ExternalSorter对象。不做mapSideCombine的话,在构造ExternalSorter时不会传入聚合函数,也不对这个partition中的记录进行排序。如果该map task是由sortByKey操作触发的,那么根据key的排序会在reduce端进行。
  得到sorter对象后,调用insertAll方法对records做进一步处理。
  

Map端Combine
在PairRDDFunctions#combineByKey中我们可以看到:
def combineByKey [C](
createCombiner: V => C,
mergeValue: ( C, V ) => C,
mergeCombiners: ( C, C ) => C,
partitioner: Partitioner ,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K , C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners ,
partitioner , mapSideCombine, serializer)(null)
}

该方法有一个Boolean的传入值mapSideCombine,并且默认为true。也就是说,在默认情况下调用该方法时就会执行mapSideCombine操作了。
mapSideCombine会使一个maptask输出的值在进行reduce操作之前先进行一定的合并。相当于先对一个分区的数据根据传入参数进行一次reduce操作,这样数据量会缩减,提高后续shuffle操作从性能。

3、ExternalSorter

(1)map和buffer对象
  在ExternalSorter中有两个比较重要的属性,map和buffer,这两个属性在后面有很重要的作用。定义如下:

  private var map = new PartitionedAppendOnlyMap[K, C]
  private var buffer = new PartitionedPairBuffer[K, C]

虽然分别实现的类是PartitionedAppendOnlyMap和PartitionedPairBuffre,但是在这两个类的源码中还是能够看出,这两个类的特性在类名中得到了很好的体现。
  对map来说,只会存储key不同的值,如果遇到相同的key,会把key对应的value进行更新。其底层保存数据的结构还是一个Array类型的data变量,偶数位存的是key的值,基数位存的是value。当需要更新时,可以看下面这段逻辑,根据k找到其在Array中的位置,然后更新key和value的值。从0位开始,偶数位保存的是key值,紧随其后的那位上保存的是value值。

data(2 * pos) = k
data(2 * pos + 1) = value.asInstanceOf[AnyRef]

  对buffer来说,遇到的每一个值都会写入其中。其底层也是维持了一个Array类型的数据结构,但是其插入逻辑如下,在当前Array最后保存新增的key和value:

data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
data(2 * curSize + 1) = value.asInstanceOf[AnyRef]

(2)insertAll(records: Iterator[Product2[K, V]])方法
  在该方法中,对有mapSideCombine和没有mapSideCombine采取了不同的处理方法。
- 有mapSideCombine

// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
  if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
  addElementsRead()
  kv = records.next()
  map.changeValue((getPartition(kv._1), kv._1), update)
  maybeSpillCollection(usingMap = true)
}

  循环处理records中的每一条记录,处理一条记录就在addElementsRead()中将_elementsRead加1,记录处理的记录数,然后更新map中的值。这里传入的update是一个方法,进行map端的combine操作,如果遇到记录过的相同key,就将value使用传入的aggregator进行聚合,如果遇到一个新key,就将该key对应的value计入一个新的combiner中。
  PartitionedAppendOnlyMap的类继承关系,及如下图changeValue的调用如下图:
  这里写图片描述
  该方法最终进入AppendOnlyMap#changeValue方法中,按照在3.(1)提到的data对象进行更新。每次更新完一条记录后,会对该记录进行判断,满足抽样条件的话就会进行一次抽样。这里的抽样过程主要是为了后续判断该data使用的内存大小所用,在后面会有详细介绍。
- 没有mapSideCombine
  这里的主要逻辑是:

while (records.hasNext) {
  addElementsRead()
  val kv = records.next()
  buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
  maybeSpillCollection(usingMap = false)
}

  仍然是循环取出records中的每一条记录。取出一条便将_elementsRead加1,然后将数据存入上面的buffer变量中。buffer.insert方法处理过程如下:

  /** Add an element into the buffer */
  def insert(partition: Int, key: K, value: V): Unit = {
    if (curSize == capacity) {
      growArray()
    }
    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
    data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
    curSize += 1
    afterUpdate()
  }

curSize记录当前data对象中存储记录的个数,插入一条记录就会加1。capacity是当前data中能够存储的记录总个数。data的初始长度是2 * 64,即能存储64个record的key-value对。data中存储record的上限是2^30 -1个,当不超过该上限时,growArray方法会以当前capacity两倍(但是最多达到上限)容量创建一个新的data数组将原来data中的数据copy到新数组中,同时会对新的data进行采样。有关采样的过程及用途,在后面内存分析时会讲到。
然后直接在data最后新增两位保存新的key和value值,更新curSize。每插入一条记录最后调用afterUpdate方法,对当前data中的记录进行一次判断是否需要进行一次采样。
(3)maybeSpillCollection判断是否需要spill
  在(2)中insertAll方法对map和buffer进行更新后,接下来就会调用maybeSpillCollection方法决定map和buffer是否需要进行spill。

  /**
 * Spill the current in-memory collection to disk if needed.
 *  * @param usingMap whether we're using a map or buffer as our current in-memory collection
   */
  private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]
      }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }

  传入参数usingMap是用来标识使用的是map还是buffer对象,对前面有印象的话应该知道使用map对象还是buffer对象是由有没有mapSideCombine来决定的,由于map和buffer对象底层还是一个Array类型的data对象,只是对数据更新和插入的处理方式有些不同。所以接下来都以buffer来作进一步的分析。
  这里的主要逻辑是,首先调用buffer的estimateSize方法,计算当前buffer对象的内存大小estimatedSize,然后根据该大小调用方法maybeSpill判断是否需要进行spill操作,spill后会把buffer清空,重新进行下一轮的操作。那么接下来就有两个重点:计算buffer的大小和判定是否需要spill

- buffer.estimateSize方法计算内存
  PartitionedPairBuffer和PartitionedAppendOnlyMap都继承了trait SizeTracker。所以,不管是map还是buffer调用的estimateSize都是相同的。
  应该还记得前面提到过对buffer插入数据时,会有一个采样的操作。有关采样的相关过程也在SizeTracker中。
SizeTracker中的属性:

private val samples = new mutable.Queue[Sample]//一个队列,用于存储对数据的采样样本
private val SAMPLE_GROWTH_RATE = 1.1//采样间隔次数增长率
private var bytesPerUpdate: Double = _//根据samples中最后两个样本计算出的记录内存平均增长率
private var numUpdates: Long = _//buffer的更新次数
private var nextSampleNum: Long = _//下一次采样操作的次数
......
case class Sample(size: Long, numUpdates: Long)

  在上面的代码片段中有一些属性和Sample类结构。这些在estimateSize方法中都有用到。在Spark中,对shuffle过程数据的内存大小,是根据采样样本的大小来估算的。
  从前面我们知道,buffer中每插入一条记录都会判断是否需要采样,在SizeTracker#afterUpdate方法中采样的依据是当前buffer的更新次数numUpdates是否等于下一次进行采样操作的次数nextSampleNum,如果等于,则调用SizeTracker#takeSample方法进行一次采样,然后nextSampleNum变成ceil(numUpdates * SAMPLE_GROWTH_RATE)。然后根据samples中最后两个样本计算出buffer每次更新的内存平均增长率bytesPerUpdate = (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)。从这里可以看出,当buffer中记录比较少时,采样非常频繁,但是如果该buffer中容纳的记录越多,到后面进行一次采样的间隔次数就会越多。
  估算的内存大小为:

def estimateSize(): Long = {
    assert(samples.nonEmpty)
    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
    (samples.last.size + extrapolatedDelta).toLong
  }

由samples中最后一个样本的大小,加上buffer记录距离上次采样的次数numUpdates - samples.last.numUpdates,乘以buffer每次新增一个record时的内存平均增长率bytesPerUpdate。这样可以在最短的时间内对存储了大量record的buffer内存占用大小进行计算,但是由于是基于采样的方法估算的内存大小,有时候会由于数据本身的问题导致计算不准确等问题。有可能偶尔出现OOM的情况。
- maybeSpill方法判断是否进行spill
  在Spark-1.6之前,可以由参数spark.shuffle.spill设置为true或者false来选择打开或者关闭内存数据spill到磁盘的功能,但是在1.6版本中,该参数默认为true,并且即使设置为false,也不会起作用了,spark在需要时会把内存数据spill到磁盘。
  根据上一步估算到的当前map或buffer的内存大小estimatedSize,如果达到spill的条件,将该map或者buffer中的数据spill到磁盘,然后重新初始化一个新的map或buffer,在下面maybeSpill方法中传入参数c就是当前的buffer对象,currentMemory是上一步估算的buffer占用的内存大小estimateSize。

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    //buffer中每插入32条记录,并且当前估算的buffer内存达到了spill的内存阈值
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted =
        taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      //ExternalSorter#spill方法
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

虽然前面内存估算是基于采样来进行的,但是如果对buffer中每新增一条记录就判断一次是否需要spill肯定是一个很耗费时间与资源的过程。从上面的代码可以看出,根据buffer中插入记录的数目elementRead,每32次并且如果estimateSize达到了spill的内存阈值myMemoryThreshold才会判断是否spill。myMemoryThreshold由参数spark.shuffle.spill.initialMemoryThreshold(默认值5 * 1024 * 1024,5MB)来确定。
如果同时满足上面两个条件,就会向taskMemoryManager申请,申请的是Execution部分的内存。这里的taskMemoryManager,以及内存申请的过程,可以参考文章Spark内存管理-UnifiedMemoryManager和StaticMemoryManager,如果内存不足,则分配到的内存granted为0。申请的内存总数是amountToRequest,计算公式是amountToRequest = 2 * currentMemory - myMemoryThreshold。这里可以把myMemoryThreshold理解为当前Executor可提供给当前shuffle操作的最少内存数,每隔32次插入数据,如果当前buffer使用的内存数仍然比myMemoryThreshold小,那么myMemoryThreshold就可以继续写入记录,直到内存使用量超过myMemoryThreshold时才会尝试向taskMemoryManagre申请内存,申请到内存后,myMemoryThreshold就会增大,增大后,如果仍然不足以存储新插入数据的buffer,那么就会触发spill操作。对buffer来说,每次spill都会重新初始化一次,那么上一次spill时buffer中的所有数据,可以是32次,64次,96次…插入后的结果,最坏的情况是32次插入就会导致currentMemory > myMemoryThreshold如果此次不spill,Spark会认为下次再经过32次插入后,buffer的currentMemory会翻番,所以向Execution内存池申请能够存储2 * currentMemory内存的空间。申请到内存后,将granted累加到myMemoryThreshold上,如果分配到的内存太少,即使加上新分配的内存,myMemoryThreshold仍然不足currentMemory,就会触发spill操作。
触发spill操作的另一个条件是_elementsRead > numElementsForceSpillThreshold,,当前buffer中的记录数超过参数spark.shuffle.spill.numElementsForceSpillThreshold(默认值是Long.MaxValue)。
在触发spill操作后,spill次数_spillCount累加,并记录此次spill出去的数据大小。调用spill方法进行spill操作,然后通过releaseMemory方法把Execution内存池的ON_HEAP内存释放,充值myMemoryThreshold为参数设定值。继续进行下一轮。

4、Sort Based Shuffle Write内存分析

  这个过程中最耗内存的对象是上面的map或者buffer,这部分内存再加上spill操作时的缓存内存基本上就构成了Shuffle Wtire过程中整个内存的使用情况。
  buffer中的内存大小是上面提到的PartitionedAppendOnlyBuffer占用的实际内存大小,如果一个Executor有C个Core,则C个Core共享整个Executor的内存,并且同时处理Task,所以buffer部分所有内存大小为C * PartitionedAppendOnlyBuffer
  在spill过程中,调用的是ExternalSorter#spill方法,我们来看一下spill的过程,
首先由diskBlockManager创建一个shuffle临时文件,生成blockId和file,

val (blockId, file) = diskBlockManager.createTempShuffleBlock()

blockId和file如下图所示:
blockId和file
然后获取一个往磁盘写临时文件的DiskBlockObjectWriter类型的diskWriter对象,

writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

writer并不是接收到一条记录就往磁盘写一条记录,在里面有一个fileBuffer来缓存,每装满一次才会真正往磁盘spill一次,这个fileBufferSize的大小可以由参数spark.shuffle.fill.buffer(默认值为32K)来确定。
同时,在spill的过程中,每接收一条记录写入fileBuffer中的同时,也会记录fileBuffer中的记录数objectsWritten。每一个batch的数据写入磁盘时需要进行序列化,为了避免序列化过程中出现内存不足的情况,对每一个batch中的记录数也作了一个限制spark.shuffle.spill.batchSize(默认为10000),这个参数不宜设置为过小,太小的话频繁的序列化反序列化也是很耗费时间的。可以随着上面的fill.buffer一起增大或减小。每写满10000条记录,上面代码中的writer对象会调用flush方法往磁盘写入一次,然后重新生成一个writer对象。
  最后在spills变量中记录每次spill的相关记录

private val spills = new ArrayBuffer[SpilledFile]

在本次调试过程中,spills中的内容如下:
spills变量
对应spill到磁盘上的文件:
spill磁盘文件

三、Sort Based Shuffle Read

  Sort Based Shuffle Read类似于Write过程,从获取Reader对象开始。

1、ShuffledRDD

  read过程从ShuffledRDD#compute方法开始,首先获取Reader对象。

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

从前面提到的SortShuffleManager类的getReader获取到Reader对象后调用read方法开始读取shuffle write过程中spill到磁盘的临时文件。
Reader对象是BlockStoreShuffleReader类型。

2、BlockStoreShuffleReader

  接下来进入BlockStoreShuffleReader#read方法中。

override def read(): Iterator[Product2[K, C]] = {...}

  该方法最终返回一个Iterator对象,这个Iterator对象会经过下面一系列的转换:

var blockFetcherItr = new ShuffleBlockFetcherIterator(...)
val wrappedStreams = blockFetcherItr.map { ... }
val recordIter = wrappedStreams.flatMap { ... }
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]] (...)
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

  经过上面代码中的一系列变换后,最后得到一个aggregatedIter变量。接下来重点分析该变量中的过程。

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) { //如果需要进行map端combine
        // 获得的是已经聚合了的记录
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else { //不需要map端combine
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    }

在上面的过程中,对于有map端combine的读入记录,由于在map时已经对相同key的值进行了一定的处理,所以得到的combinedKeyValuesIterator对象是一个Value不为空的iterator,接下来调用Aggregator#combineCombinersByKey,而对于没有map端combine的记录,combinedKeyValuesIterator对象的Value为Nothing,接下来调用Aggregator#combineValuesByKey方法。

(1)Aggregator#combineCombinersByKey和Aggregator#combineValuesByKey
  在combineCombinersByKey方法中

  def combineCombinersByKey(
      iter: Iterator[_ <: Product2[K, C]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

  在combineValuesByKey方法中

  def combineValuesByKey(
      iter: Iterator[_ <: Product2[K, V]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

上面两个方法,首先获得一个ExternalAppendOnlyMap类型的combiners变量,然后调用combiners.insertAll方法处理读入记录。上面两个方法中的不同之处在于,构造ExternalAppendOnlyMap时的传入参数不同。

(2)ExternalAppendOnlyMap
  在ExternalAppendOnlyMap#insertAll方法中会将Shuffle Read读取的记录,插入一个currentMap对象中

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
......
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))

这个SizeTrackingAppendOnlyMap在前面shuffle write中也提到过,是PartitionedAppendOnlyMap的父类。接下来的过程也与shuffle write过程类似,当currentMap申请不到足够的内存时,就会触发spill操作,伴随着每一次的spill,会有一个ArrayBuffer类型的spilledMaps记录每次spill到磁盘上的文件的详细信息。和writer过程的spill不同的是,shuffle read的每次spill都会将内存中的记录排好序,具体代码可以参考ExternalSorter#spill方法。
  combine完成后,返回一个iterator给前面代码中的aggregatedIter变量。aggregatedIter的类型由当前shuffle read过程是否发生过spill行为来决定。

if (spilledMaps.isEmpty) {
  CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
} else {
  new ExternalIterator()
}

如果发生过spill,那么aggregatedIter的类型是ExternalIterator的,这个类型的iterator会将spill到磁盘上的数据以及内存中的数据进行合并。如果没有发生过spill,由于数据都在内存中,所以只需要一个CompletionIterator读取内存中的数据就行。

(3)ExternalIterator类型
  ExternalIterator类型是ExternalAppendOnlyMap的内部类。由于涉及到处理spill到磁盘上的数据,以及内存中的数据,过程比较复杂,所以接下来会对这个类型进行梳理。
  在这个类中,唯一一个耗费内存资源的是mergeHeap优先队列,

private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

这个StreamBuffer是ExternalIterator的内部类,每个StreamBuffer对象中存储了on-disk或者in-memory数据流中的所有数据。StreamBuffer的构造函数中iterator是用来构造该StreamBuffer对象的数据流中数据的引用,pairs保存的是这个iterator中key的hash值最小的那一组key-value,如果存在hash冲突的话,pairs中保存的则是hash值相同的多组key-value对。

private class StreamBuffer(
        val iterator: BufferedIterator[(K, C)],
        val pairs: ArrayBuffer[(K, C)])
      extends Comparable[StreamBuffer]

StreamBuffer实现了Comparable#compareTo方法,两个StreamBuffer对象的大小通过比较pairs中第一个值(即某个key值)的hash值大小来确定。
  在ExternalIterator中,会将currentMap中的记录在内存中进行排序,然后将spill到磁盘上的文件spilledMaps加载进来。
  对in-memory的SizeTrackingAppendOnlyMap对象currentMap的排序,实现方法在AppendOnlyMap#destructiveSortedIterator中。将currentMap的底层实现data数组中的记录,null值全部后移之后,调用TimSort#sort方法进行排序。
  
(4)CompletionIterator类型
  是一个抽象类,其中有一个未实现的completion方法。

abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] {
// scalastyle:on

  private[this] var completed = false
  def next(): A = sub.next()
  def hasNext: Boolean = {
    val r = sub.hasNext
    if (!r && !completed) {
      completed = true
      completion()
    }
    r
  }

  def completion(): Unit
}

private[spark] object CompletionIterator {
  def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A, I] = {
    new CompletionIterator[A, I](sub) {
      def completion(): Unit = completionFunction
    }
  }
}

  在构造CompletionIterator类型的对象时,会同时传入一个completionFunction方法,如前面代码所示,传入的是currentMap.iterator。

原文地址:https://www.cnblogs.com/wuyida/p/6300246.html