sort-based shuffle的核心:org.apache.spark.util.collection.ExternalSorter

依据Spark 1.4版

在哪里会用到它

ExternalSorter是Spark的sort形式的shuffle实现的关键。SortShuffleWriter使用它,把RDD分区中的数据写入文件。

  override def write(records: Iterator[Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {//根据是否需要mqp-side combine创建不同的sorter
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      sorter = new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {
      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer) //如果不需要map-side combine 就不再需要Aggregator和Ordering
      sorter.insertAll(records)
    }
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//写数据文件
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)//写索引文件

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

ExternalSorter的注释

这个类的注释提供了关于它的设计的很多信息,先翻译一下。

这个类用于对一些(K, V)类型的key-value对进行排序,如果需要就进行merge,生的结果是一些(K, C)类型的key-combiner对。combiner就是对同样keyvalue进行合并的结果。它首先使用一个Partitioner来把key分到不同的partition,然后,如果有必要的话,就把每个partition内部的key按照一个特定的Comparator来进行排序。它可以输出只一个分区了的文件,其中不同的partition位于这个文件的不同区域(在字节层面上每个分区是连续的),这样就适用于shuffle时对数据的抓取。

 

如果combining没有启用,C和V的类型必须相同 -- 在最后我们会对对象进行强制类型转换。

 

注意:仅管ExternalSorte是一个比较通用的sorter,但是它的一些配置是和它在基于sortshuffle的用处紧密相连的(比如,它的block压缩是通过'spark.shuffle.compress'控制的) 如果在非shuffle情况下使用ExternalSorter时我们想要另外的配置,可能就需要重新审视一下它的实现。

 

构造函数参数:

 1. aggregator, 类型为Option[Aggregator], 提供combine函数,用于merge

 2. partitioner, 类型为Optinon[Partitioner], 如果提供了partitioner,就先按partitionID排序,然后再按key排序

 3. ordering, 类型为Option[Ordering], 用来对每个partition内部的key进行排序;必须是一个          4. total ordering(即,所有key必须可以互相比较大小,与partitial ordering不同)

 4. serializer, 类型为Option[Serializer], 用于spill数据到磁盘。

 

注意,如果提供了Ordering 那么我们就总会使用它进行排序(是指对partition内部的key排序),因此,只有在真正需要输出的数据按照key排列时才提供ordering。例如,在一个没有map-side combinemap任务中,你应该会需要传递None作为ordering,这样会避免额外的排序。另一方面,如果你的确需要combining 提供一个Ordering会更好。

 

用户应该这么和这个类型进行交互:

  1. 初始化一个ExternalSorter
  2. 调用insertAll, 提供要排序的数据
  3. 请求一个iterator()来遍历排序/聚合后的数据。或者,调用writePartitionedFiles来创建一个包含了排序/聚合后数据的文件,这个文件可以用于Sparksort shuffle

 

这个类在内部是这么工作的:

 

  • 我们重复地将数据填满内存中的buffer,如果我们想要combine,就使用PartitionedAppendOnlyMap作为buffer, 如果不想要combine,就使用PartitionedSerializedPairBuffer或者PartitionedPariBuffer。在这里buffer内部,我们使用partition Id对元素排序,如果需要,就也按key排序(对同样partition Id的元素)。为了避免重复调用partitioner,我们会把recordpartition ID存储在一起。
  • buffer达到了容量上限以后,我们把它spill到文件。这个文件首先按partition ID排序,然后如果需要进行聚合,就用key或者key的hashcode作为第二顺序。对于每个文件,我们会追踪在内存时,每个partition里包括多少个对象,所以我们在写文件 时候就不必要为每个元素记录partition ID了。
  • 当用户请求获取迭代器或者文件时,spill出来的文件就会和内存中的数据一起被merge,并且使用上边定义的排列顺序(除非排序和聚合都没有开启)。如果我们需要按照key聚合,我们要不使用Ordering参数进行全排序,要不就读取有相同hash codekey,并且对它们进行比较来确定相等性,以进行merge
  • 用户最后应该使用stop()来删除中间文件。

 

作为一种特殊情况,如果OrderingAggregator都没有提供,并且partition的数目少于spark.shuffle.sort.bypassMergeThreshold, 我们会绕过merge-sort,每次spill时会为每个partition单独写一个文件,就像HashShuffleWriter一样。我们然后把这些文件连接起来产生一个单独的排序后的文件,这时就没有必要为每个元素进行两次序列化和两次反序列化(merge中就需要这么做)。这会加快groupBy, sort等没有部分聚合的操作的map端的效率。

它的功能 

根据注释所述,这个类的功能包括:

1. 把kv对按partitioner分到不同的分区

2. 如果需要,就对相同key对应的value进行聚合

3. 把输出的kv对写到一个文件,在文件内部,kv对按照partition ID排序,如果需要的话,就对每个partition内部的kv排序。

前两个功能,是hash-based shuffle也会做的,而第3个功能,是sort-based shuffle特有的。

为了实现这些功能,它要解决以下的问题:

  1. 考虑到内存的限制,需要进行外部排序,需要spill到磁盘文件,  需要对这些文件进行merge。那么如何追踪内存中数据结构的大小,spill到磁盘后的文件应该如何组织其结构?如果进行merge?
  2. 如何实现aggregation?在填充数据到内存里的buffer时,需要进行aggregate, spill出来的文件在merge时,位于不同文件里的相同key对应的value也需要aggregate。
  3. 如何确定最终文件里每个partition以byte为单位的大小。由于压缩流和序列化流对文件输出流的包装,以及中间的buffer的影响,这个大小只能在关闭这些流之后才能获得。这样的话,最终写成的文件会是很多输出流的输出追加在一起的结果。

它的实现


它的整个实现比较繁杂,但按照通常的使用方式,大体包括写入buffer、spill、merge三个部分。

buffer

首先,充分利用内存作为buffer,直接对内存中的对象进行操作可以提高效率,减少序列化、反序列化和IO的开销。比如在内存中先对部分value进行聚合,会减少要序列化和写磁盘的数据量;在内存中对kv先按照partition组合在一起,也有利于以后的merge,而且越大的buffer写到磁盘中的文件越大,这意味着要合并的文件就越少。

所以,就像注释中提到的,ExternalSorter可能会用到三种类型的buffer,以应对不同的情况,提高效率。这三种buffer是

  • PartitionedAppendOnlyMap
  • PartitionedSerializedPairBuffer
  • PartitionedPairBuffer

下面看一下这三种数据结构的特性以及适用的情境

PartitionedAppendOnlyMap

它的继承结构是这样的

下面分别看一下它的父类 

SizeTracker

这是一个trait,把它混入到集合类中用来追踪这个集合的估计大小。之所以PartitionedAppendOnlyMap需要继承SizeTracker,是为了确定spill的时机。


调用SizeEstimator的时机

它有一个afterUpdate方法,当被混入的集合的每次update操作以后,需要执行SizeTracker的afterUpdate方法,afterUpdate会判断这是第几次更新,需要的话就会使用SizeEstimator的estimate方法来估计下集合的大小。由于SizeEstimator的调用开销比较大,注释上说会是数毫秒,所以不能频繁调用。所以SizeTracker会记录更新的次数,发生estimate的次数是指数级增长的,基数是1.1,所以调用estimate时更新的次数会是1.1, 1.1 * 1.1, 1.1 * 1.1 *1.1, ....

这是指数的初始增长是很慢的, 1.1的96次方会是1w, 1.1 ^ 144次方是100w,即对于1w次update,它会执行96次estimate,对10w次update执行120次estimate, 对100w次update执行144次estimate,对1000w次update执行169次。

估计集合大小的方法

  1. 每到需要estimate的更新后,就调用SizeEstimator估计一下当前集合的大小。用集合的大小和更新次数组装成一个Sample对象(一个只有这两个field的case class),把这个Sample放个一个存放Sample history和队列。然后取这个队列里最后两个Sample,算出来这两个Sample之间每次update这个集合size增长了多少,记为bytesPerUpdate。方法是这两个Sample里大小的差值除以它们update次数的差值。

(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)

    2. SizeTracker的estimateSize被调用时,以bytePerUpdate作为最近平均每次更新时的bytePerUpdate,用当前的update次数减去最后一个Sample的update次数,然后乘以bytePerUpdate,结果加上最后一个Sample记录的大小。

 val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
    (samples.last.size + extrapolatedDelta).toLong

  estimateSize方法之所以这么设计,是为了尽量减少对SizeEstimator的调用。因为集合会在每次update之后调用estimateSize来决定是否需要spill。

感觉SizeTracker有两个地方不太好

  1. 对update的定义有些宽泛。以SizeTrackingAppendOnlyMap为例, 它会在update和changeValue两个方法中都调用afterUpdate。其中changeValue在使用中既被当作insert插入新的kv对,也会用于对已有的kv对进行update。对有些update方式来说,明确区分insert和对已有值的更新会使得估计更准确,比如word count中的reduceByKey,它执行对已有值的更新时,不会改变集合的大小,而只有新加入的kv会。
  2. 调用SizeEstimator时的update次数简单地以指数增长,这种策略过于宽泛。对于一批update,保证它引发的对SizeEstimate的estimate的调用耗费的时间在一定可接受的值即可。SizeTrackingAppendOnly在shuffle中被使用,做为buffer,它的元素不会太多,所以update的次数有限,使得estimate的调用不会间隔太多update。但是如果update的次数太多,后期的estimate次数会特别少,比如在100w和1000w更新次数之间,平均每37.5w次才会调用一次estimate。调用SizeEstimator的时机应考虑到当前集合的大小、集合元素大复杂程度,在这种大小的集合上调用一次SizeEstimator的开销,当前与上一次调用隔了多少次update等因素。或许应该提供接口或配置项,让用户有机会提供关于集合内数据的较准确的信息。或者在SizeTracker的estimateSize调用后,让用户可以根据情况强制SizeTracker给出一个更准确的值,比如如果得到的size显示需要进行spill了。

  希望Spark能在以后以它进行改进。如果对集合的大小估计不准,就不能充分内存,这对于shuffle的效率影响非常大。

AppendOnlyMap

当需要对Value进行聚合时,会使用AppendOnlyMap作为buffer。它是一个只支持追加的map,可以修改某个key对应的value, 但是不能删除已经存在的key。使用它是因为在shuffle的map端,删除key不是必须的。那么append only能带来什么好处呢?

1. 省内存

AppendOnlyMap也是一个hash map, 但它不是像java.util.collection的HashMap一样在Hash冲突时采用链接法,而是采用二次探测法。这样,它就不需要采用entry这种对kv对的包装,而是把kv对写同一个object数组里,减少了entry的对象头带来的内存开销。但是二次探测法有个缺点,就是删除元素时比较复杂,不能只是简单地把数组中相应位置的kv都置成null,这样查找元素时就没办法了,通常会把被删除的元素标记为已被删除,这就又需要额外的内存。而当这个hash map只支持insert和update时,情况就简单了,不仅可以减少链接法时构造链表需要的内存,而且不需要另外的内存做删除标记。在相同的load factor时,会比HashMap更省内存。

  // Holds keys and values in the same array for memory locality; specifically, the order of
  // elements is key0, value0, key1, value1, key2, value2, etc.
  private var data = new Array[AnyRef](2 * capacity)

2. 省内存,可以用数组排序算法,排序效率高

由于所有元素都在一个数组里,所以在对这个map里的kv对进行排序时,可以直接用数组排序的算法在数组内做,节省了内存,效率也比较高。ExternalSorter的destructiveSortedIterator就是这么做的。它把所有的kv对移动数组的前端,然后进行排序

new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

3. 支持函数式地update操作,适合进行aggregate。

AppendOnlyMap一个changeValue方法,它的签名是这样的

def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { ... }

在启用aggregate时,会把aggregate的逻辑和kv里的value组装成updateFunc, 来对每个key调用changeValue。要明白这个逻辑首先得看下Aggregator这个类的定义

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) { ... }

shuffle中的aggregate操作实际是把一个KV对的集合,变成一个KC对的map, C是指combiner,是V聚合成的结果。Aggregator的三个类型参数K, V, C即代表Key的类型, Value的类型和Combiner的类型。

  • createCombiner描述了对于原KV对里由一个Value生成Combiner,以作为聚合的起始点。
  • mergeValue描述了如何把一个新的Value(类型为V)合并到之前聚合的结果(类型为C)里
  • mergeCombiner描述了如何把两个分别聚合好了的Combiner再聚合

这三个函数就描述了aggregate所遇到的各种情况。

先看下reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

它接收的函数参数的类型为 (V, V) => V, 也就是说Value和Combiner的类型是一样的,所以它会生成一个Aggregator[K, V, V],它的三个构造器参数分别为

(v: V) => v, func, func。 这符合reduceByKey的意义。

与此不同的是aggregateByKey,由于它指定了一个初始值zeroValue,所以初始的Combiner应该是把这个初始值和Value聚合的结果。为此,它是这么做的

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
  }

首先,createCombiner每次调用时,需要一个属于自己的zeroValue的拷贝,否则变成共享的就麻烦了,比如当zeroValue是一个集合时。所以aggregateByKey的createCombiner方法每次运行会反序列化一个zeroValue,然后调用mergeValue函数(也就是seqOp函数)创建初始的Combiner。

与此类似的是groupByKey,它的createCombiner函数是构造一个只有Value一个元素的集合,mergeValue函数即是把Value添加到这个集合,而mergeCombiner函数是对集合进行合并。

可见Aggregator的确能描述各种不同的聚合策略。那么Aggregator的这三个函数是如何被用于AppendOnlyMap的呢?

首先,只有在需要对Value进行聚合时,才会使用AppendOnlyMap作为buffer。而此时,在ExternalSorter的insertAll函数中,是这么使用它的

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
//        Runtime.getRuntime.maxMemory()
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    }

注意它是如何用createCombiner和mergeValue两个函数组装成AppendOnlyMap的changeValue函数所需要的update函数。这种直观地对函数地组合的确是函数式编程的一种优势。

SizeTrackingAppendOnlyMap

它继承自AppendOnlyMap和SizeTracker,覆盖了AppendOnlyMap的三个方法

  • update和changeValue。 在调用AppendOnlyMap的相应方法后,调用SizeTracker的afterUpdate方法
  • growTable。在调用AppendOnlyMap的growTable方法后,调用SizeTracker的resetSamples方法。

实际上SizeTrackingAppendOnlyMap对于SizeTracker的使用有些简单粗暴。比如在growTable之后,AppendOnlyMap的data数组会增长,所以之前的bytesPerUpdate就不准确了,但这时候直接调用resetSamples会清空之前的采样,重置update次数。而AppendOnlyMap的data数组额外占据的空间可以根据它的capacity的变化算出来,这使得之前的bytesPerUpdate的值可以继续使用。对于一个很多的集合调用resetSamples,会使得对它的采样更密集,并不是一个特别好的做法。

WritablePartitionedPairCollection

这个trait的大部分方法都是未实现的,它描述了一个分区的kv集合应具有的性质。这个集合的特点在于,它的destructiveSortedWritablePartitionedIterator应该返回一个WritablePartitionedIterator对象。WritablePartitionedIterator可以使用BlockObjectWriter来写入它的元素。

private[spark] trait WritablePartitionedIterator {
  def writeNext(writer: BlockObjectWriter): Unit

  def hasNext(): Boolean

  def nextPartition(): Int
}

此外它的伴生对象会提供两种Comparator

  • PartitionComparator  按照partition ID排序
  • PartitionKeyComparator 它先按partition ID排序,再按key排序。按key排序时使用的Comparator是作为参数提供的。

此外, WriteablePartitionedIterator的伴生对象有一个fromIterator方法,它接受一个Iterator[((Int, _), _)]类型的迭代器,返回一个特殊的WritablePartitionedIterator对象,此对象的特点在于它的writeNext方法只写入Key和Value,并不写入Partition ID。ExternalSorter的三种buffer都是使用这个fromIterator方法,从自身的iterator生成WritablePartitionIterator。所以,它们三个的iterator方法返回的迭代器的KV对中,K的类型就是(Int, Key的类型)。

PartitionedAppendOnlyMap

它继承自WritablePartitionedPairCollection和SizeTrackingAppendOnlyMap, ExternalSorter在需要进行aggregate元素的情况下,用它做为buffer。

需要注意一下它的类型参数

private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] 

它本身是一个KV集合,Key的类型是K, Value的类型是V。但是它继承了SizeTrackingAppendOnlyMap[(Int, K), V],这意味着SizeTrackingAppendOnlyMap继承的AppendOnlyMap的类型是AppendOnlyMap[(Int, K), V]。也就是说PartitionedAppendOnlyMap继承的AppendOnlyMap的Key的类型为(Int, K), Value的类型为V。

这个Int就是Partition ID。所以PartitionedAppendOnlyMap定义了一个partitioneDestructiveSortedIterator方法,返回一个Iterator[(Int, K), V]。

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    destructiveSortedIterator(comparator)
  }

这个迭代器的排序方式由keyComparator决定,如果keyComparator是None,就用WritablePartitionedPairCollection的partitionComparator只按partition排序, 如果是Some,就按照WritablePartitionedPairCollection的partitionKeyComparator排序,也就是先按partition ID排序,再使用keyComparator按key排序。

PartitionedPairBuffer和PartitionedSerializedPairBuffer

下面看一下另两种buffer: PartitionedPairBuffer和PartitionedSerializedPairBuffer。它们都不支持aggregation,但是ExternalSorter是如何在二者间选择的呢?

  private val useSerializedPairBuffer =
    !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
    ser.supportsRelocationOfSerializedObjects

如果useSerializedPairBuffer为true,就会使用PartitionedSerailizedPairBuffer。而它为true必须有三个条件同时满足:

  • 没有提供Ordering。即不需要对partition内部的kv再排序。
  • spark.shuffle.sort.searlizedMapOutputs为true。它默认即为true
  • serializer支持relocate序列化以后的对象。即在序列化输出流写了两个对象以后,把这两个对象对应的字节块交换位置,序列化输出流仍然能读出这两个对象。一般而言,如果序列化流是无状态的,并且在序列化流的开始和结束时不记特殊的元数据,就会支持这个性质。这个性质JavaSerializer是不支持的,而KryoSerializer有条件支持
     private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
        newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
      }

 PartitionedPairBuffer

private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
  extends WritablePartitionedPairCollection[K, V] with SizeTracker

它继承自WritablePartitionedPairCollection以及SizeTracker。底层存储用一个object数组。它所存储的pair,即KV对,Key的类型为(Int, K),即Partition ID和key。

同一个kv对的key和value被放在这个数组相邻的位置,和AppendOnlyMap相同。

PartitionedSerializedPairBuffer

它也是存储了partitionId, key, value这三种数据。

这个buffer的特点是用字节数组来存储数据,而不像其它两种用object数组。这就要求它存储的数据是序列化以后的。它把key和value依次序列化以后依次写入同一个字节数组(实际上是写入一个ChainedBuffer,ChainedBuffer再写入到它里边的字节数组),这就要求有另外的元数据来区分key和value的边界。所以PartitionedSerializedBuffer会另外使用一个meta buffer存储元数据,这个meta buffer是一个IntBuffer,即一个integer buffer。

这样它存储的数据就在两个buffer里:kvBuffer, 存储的是序列化后的key和value; metaBuffer存储的是关于kvBuffer的元数据。

其中metaBuffer里的每个元素是关于一个kv对的元数据,有4个int,依次是

  1. keyStart,这是一个long, 用两个int存储。指这个kv对在kvBuffer中的起始位置。
  2. keyValLen, 用一个int存储。即key和value序列化后,总的长度。
  3. partitionId, 用一个int存储。存储partitionId在metaBuffer里,使得在kv排序时,直接对metaBuffer按partitionId排序就行了,而kvBuffer不需要变化。

这个buffer只支持按照partition id排序,因此要选它做buffer, ExternalSorter的Ordering参数必须是None。

这个数组的排序时直接移动底层的字节,所以要求Serializer必须supportRelcationSerializedObjects。

使用这种buffer的好处是

1. 省内存,这由两原因引起。首先,最主要的因素,它把对象序列化以后存储,通常会占用更少的内存。其次,它存储所使用的byte buffer是ChainedBuffer这个类。ChainedBuffer的底层存储用的是ArrayBuffer[Array[Byte]],这使得它比直接用ArrayBuffer[Byte]更省内存,但是ChainedBuffer的实现其表现的像一个字节数组。ChainedBuffer中的ArrayBuffer里的字节数组是等长的,称为一个chunk, ExternalSorter使用spark.shuffle.sort.kvChunkSize来做为chunk的大小,默认为4M。这使得它在ArrayBuffer中存储的引用占的大小与整个集合的大小相比,不会太大,也算是比起AppendOnlyMap和PartitionedPairBuffer用object数组做存储的一点优势。

2. 就像PartitionedSerializedPairBuffer所说的。对这个集合排序意味着只需要交换metaBuffer里的元素,而kvBuffer不需要修改。而metaBuffer排序时是按照partitionId排序,partitionId就保存在metaBuffer使用的int buffer里,这意味着获取partitionId不需要通过引用(而AppendOnlyMap和PartitionedPairBuffer就需要获得对(partitionId, key)组成的tuple的引用,然后再访问partitionId),这就最小化了访问缓存时的未命中。所以,对这个buffer内元素的排序的效率会较高。

3. 它的内存占用可以更准确地估计。

实现PartitionedSerailizedPairBuffer还是挺复杂的。PartitionedSerializedPairBuffer虽然继承了SizeTracker,但是却没有使用SizeTracker的estimateSize方法,相反,由于它是使用的基本类型的数组,因此可以直接计算出自己较准确的大小,所以它覆盖了SizeTracker的estimateSize方法。

override def estimateSize: Long = metaBuffer.capacity * 4L + kvBuffer.capacity

这明显比其它两种buffer对内存占用的估计准确得多。


spill

 ExternalSorter继承了Spillable[WriteablePartitionedPairCollection[K, C]],实现了其spill方法,用来对buffer进行spill。

Spill的时机

为了合理地地在同一个executor的task线程间分配用于shuffle的内存,shuffle时内存buffer的大小向ShuffleMemoryManager申请,以避免过度占用内存,但这个MemoryManager并不实际地控制虚拟机的内存,只是起到限制作用。当buffer扩张需要的内存过多,ShuffleMemoryManager分配不了这么多内存时,buffer就会被spill。

Spill的策略

Spill的的策略必须考虑到以后对spill出来文件的merge。ExternalSorter会写出唯一一个文件,因此merge是一定会的。但

是如果需要进行aggregate,那么spill出来的文件一定需要按照partition以及key排序,才能用merge sort来对combiner做聚合。但是这样做的开销是很大的,首先需要对集合先进行排序,才能写入文件(这也是为啥WritablePartitionedPairCollection会定义partitionedDestructiveSortedIterator这个方法),其次在merge文件时需要先反序列化,然后再把merge完成后的combiner序列化写入文件;然后,merge sort本身也会耗费时间。因此,ExternalSorter在某些情况下会按照类似于hash shuffle的方法为每个partition写一个文件,每次spill,就把buffer里的数据按照partition追加到对应的文件。在需要输出一整个文件时,把这些文件直接连接在一起,这样就避免了一次反序列化和一次序列化。而之所以能直接把每个partition的文件相连,而不影响读取,是因为shuffle的reader一方请求获得的就是每个partition对应的那部分字节串,所以reader和writer都是在同样的位置开启的输入流和输出流,因此外层的压缩流和序列化流也不会因此而混乱。

但是这样不好的地方在于如果reducer很多,那么中间文件就会非常多,可能会遇到hash shuffle类似的问题(俺并不清楚具体会对性能有多大影响)。所以,有时merge sort的方式还是必须的。这时候,buffer每次spill都写出一个包括各个partition数据的文件。然后在merge时,对这些文件进行merge,采用merge sort的方式。那么,这两种spill的方式如何选择呢?

ExternalSorter用bypassMergeSort这个bool值来做出选择,如果此值为true,就用第一种方式,否则用第二种方式

  private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
  private val bypassMergeSort =
    (numPartitions <= bypassMergeThreshold && aggregator.isEmpty && ordering.isEmpty)
  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    if (bypassMergeSort) {
      spillToPartitionFiles(collection)
    } else {
      spillToMergeableFile(collection)
    }
  }

可见只有在reducer数目不会太多,不需要aggregation 并且不需要进行排序的情况下才会用spillToPartitionFiles。

 spillToPartitionFiles

  private def spillToPartitionFiles(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    spillToPartitionFiles(collection.writablePartitionedIterator())
  }

  private def spillToPartitionFiles(iterator: WritablePartitionedIterator): Unit = {
    assert(bypassMergeSort)

    // Create our file writers if we haven't done so yet
    if (partitionWriters == null) {
       ...  
    }

    // No need to sort stuff, just write each element out
    while (iterator.hasNext) {
      val partitionId = iterator.nextPartition()
      iterator.writeNext(partitionWriters(partitionId))
    }
  }

所有三种buffer都实现了WritablePartitionedPairCollection接口,因此都可以从它们获取一个WritablePartitionedIterator,这个迭代器前边提到过,特点在于可以知道下一个元素的partitiionId, 也可以直接调用writeNext把下一个元素写到BlockObjectWriter里。而spillToPartitionFiles就是这么使用它的,它取出迭代器中下一个元素的partitionId, 就能找到对应于这个partition的writer,然后用它来写入下一个元素。所以,多次spill出来的结果中同样的partition里的kv都会被用同样的writer写入同一个文件。

spillToMergeableFile

方法名里的是File而不像spillToPartitionFiles中是Files,它只会spill到一个文件。所以这个文件的内容是排序后的。那么写入同一个文件的问题是需要记录每个KV属于哪个partition,否则就需要再用partitioner算一下。由于写入文件时,每个partition的kv对记在一起,所以实际只需要记录下每个partition有多少个KV对就行了。spillToMergableFiles把这个信息记录在elementsPerPartition这个数据结构里

// How many elements we have in each partition
    val elementsPerPartition = new Array[Long](numPartitions)

另外一个问题与序列化流有关。当通过一个序列化流写入了大量的对象,它内部的数据结构可能会很多,而且在这个内部数据结构增长时,它可能会进行的拷贝,很大的内部数据结构意味着占用过多内存,对大量数据拷贝意味着时间开销的增长。因此ExternalSorter通过serializerBatchSize这个参数来控制每次序列化流最多写入的元素个数。在写入serializerBatchSize这个元素后,这个序列化流会被关闭,确切地说是writer被关闭,然后重新开启新的writer继续往同一个文件写。这样带来的问题是,整个文件是多个输出流的输出追加在一起的结果,因此需要记录每个输出流开始的位置,也就是写完一个batch的对象后,文件增长的大小。spillToMergableFiles用batchSizes这个数组来记录每个batch的字节数,在此次spill结束后,这些簿记的信息被组装成SpilledFile,它被作为元数据使用,记在spills这个ArrayBuffer[SpilledFiles]里。

    // List of batch sizes (bytes) in the order they are written to disk
    val batchSizes = new ArrayBuffer[Long]
  private[this] case class SpilledFile(
    file: File,
    blockId: BlockId,
    serializerBatchSizes: Array[Long],
    elementsPerPartition: Array[Long])

由于这种复杂的写入方式,对于写出来的文件,需要一个特殊的reader,即SpillReader。这个reader的特殊之处在于它可以产生一个特殊的迭代器,这个迭代器的每个元素都是某个partition中kv的迭代器。

def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] {
  ...
}

也就是调用readNextPartition返回的迭代器可以迭代这个partition内的所有元素。这样spillToMergeableFiles的主要逻辑就很清楚了,在源码中是这样的

      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      while (it.hasNext) {
        val partitionId = it.nextPartition()
        it.writeNext(writer)
        elementsPerPartition(partitionId) += 1
        objectsWritten += 1

        if (objectsWritten == serializerBatchSize) {
          flush()
          curWriteMetrics = new ShuffleWriteMetrics()
          writer = blockManager.getDiskWriter(
            blockId, file, serInstance, fileBufferSize, curWriteMetrics)
        }
      }

首先,从buffer中构造一个WritablePartitionedIterator,排序方式使用comparator,

  • 如果在ExternalSorter的构造函数中提供了Ordering,就会按Ordering排序
  • 如果没有提供Ordering,但是提供了Aggregator,就会按hashCode排序
  • 如果即没有Ordering,也没有aggregator,就不排序。

然后把迭代器中的每个元素调用it.writeNext写入writer,在此过程中根据此元素的partitionId,增长elementsPerPartition中对应的partition中的元素数,如果一个writer写入的元素数到了serializerBatchSize,就调用flush,关闭writer,记录这个batch对应的byte总量到batchSizes,然后建立新的writer。

最后,调用

spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition))

来记录此次spill出来的mergable file的元数据。


 Merge

SortShuffleWriter在调用sorter.insertAll(records)把数据写入sorter之后,会调用

val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

生成最后的输出文件(这是一整个文件)。ExternalSorter的writePartitionedFile会spill出来的数据进行merge。这分为三种情况

没有发生spill

由于内存中buffer里的数据已经进行了aggregate(如果需要的话),所以这种情况的处理逻辑比较简单。

  1. 调用buffer的destructiveSortedWritablePartitionedIteartor,获取一个按partition排序的SortedWritablePartitionIterator
  2. 按partitionId的顺序,把同一个partition的内容用同一个writer写到最终的输出文件里,写一个partition按一个writer。
  3. 记录每个partition的字节数,簿记到lengths里
else if (spills.isEmpty && partitionWriters == null) {
      //说明只有内存中的数据,并没有发生spill
      // Case where we only have in-memory data
      val collection = if (aggregator.isDefined) map else buffer
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)//获取SortedWritablePartitionIterator
      while (it.hasNext) {
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics.shuffleWriteMetrics.get)
        val partitionId = it.nextPartition()//获取此次while循环开始时的partition id
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer) //把与这个partition id相同的数据全写入
        }
        writer.commitAndClose()//这个writer只用于写入这个partition的数据,因此当此partition数据写完后,需要commitAndClose,以使得reader可读这个文件段。
        val segment = writer.fileSegment()
        lengths(partitionId) = segment.length//把这个partition对应的文件里数据的长度添加到lengths里
      }
    }

发生了spill, 且使用bypassMergeSort

这也意味着Aggregator和Ordering都没有。所以不需要聚合,也不需要对partition内部的元素排序。所以直接把每个partition内容依次写入最终的输出文件就行了。

    if (bypassMergeSort && partitionWriters != null) {
      //byPassMergeSort了,所以会用到partitionWriters。如果partitionWriters不为空,就代表着的确写了些东西。就需要把这些文件合并。
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())//把已有的writer commitAndClose了
      val out = new FileOutputStream(outputFile, true)//把所有文件合并到使用这个文件输出流输出的文件
      val writeStartTime = System.nanoTime
      util.Utils.tryWithSafeFinally {
        for (i <- 0 until numPartitions) {
          val in = new FileInputStream(partitionWriters(i).fileSegment().file)//对于每个writer的输出文件,建立一个文件输出流
          util.Utils.tryWithSafeFinally {
            //把writer的输出文件里的数据拷贝到最终的文件里
            lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
          } {
            in.close()
          }
        }
      } {
        out.close()
        context.taskMetrics.shuffleWriteMetrics.foreach(
          _.incShuffleWriteTime(System.nanoTime - writeStartTime))
      }
    }

发生了spill,并且没有bypassMergeSort

这时候就需要对spill出来的mergable files以及内存中的数据进行merge。ExternalSorter使用partitionedIterator来完成merge,得到一个按partition组合出来的迭代器,它的每个元素都是(partitionId, 这个partition内容的迭代器)这样的二元组。然后把这个partitionedIterator按partition依次写到输出文件里就行了。

  for ((id, elements) <- this.partitionedIterator) {//merge过程在partitionedIterator方法中
        if (elements.hasNext) {//对于这个partition的所有数据,用一个writer写入
          val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
            context.taskMetrics.shuffleWriteMetrics.get)
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          writer.commitAndClose()//写完了
          val segment = writer.fileSegment()
          lengths(id) = segment.length//簿记
        }
      }

实际的merge过程发生在this.partitionedIterator这一步,partitionedIterator是ExternalSorter的一个方法

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]

partitionedIterator的实现和writePartitionedFile一样,需要考虑同样的三种情况,所以只有第三种情况才会被调用。partitionedIterator处理的三种情况为:

没有发生spill

由于buffer中的数据是已经aggregate以后的(如果需要的话),所以直接把buffer里的数据按排序,同样partition的数据就会到一起,此时简单地按partition组合一下就行了。只是排序的时候需要考虑是否需要按Ordering来排序。

    if (spills.isEmpty && partitionWriters == null) {
      //只有内存中的数据,就按是否有ordering采用不同的排序方式得到迭代器,然后按partition对迭代器中的数据进行组合。
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition ID, not key
        groupByPartition(collection.partitionedDestructiveSortedIterator(None))
      } else {
        groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator)))
      }
    }

发生了spill, 并且bypassMergeSort

这种情况也很简单,因为每个分区都对应不同的文件,就直接把这些分区在文件里的内容和在内存中的内容组合起来就行了。

else if (bypassMergeSort) {
      //否则就代表spill出来文件了,如果bypassMergeSort就代表着写出来了一些文件,每个partition对应一个
      // Read data from each partition file and merge it together with the data in memory;
      // note that there's no ordering or aggregator in this case -- we just partition objects
      val collIter = groupByPartition(collection.partitionedDestructiveSortedIterator(None))//获得内存中数据的迭代器
        //取得spill出来的那些文件里为这个partition所写的文件,然后和内存里的这个partition的迭代器组合在一起。
      collIter.map { case (partitionId, values) =>
        (partitionId, values ++ readPartitionFile(partitionWriters(partitionId)))
      }
    }

发生了spill,并且没有bypassMergeSort

这是最复杂的一种情况,因为此时spill出来的每个文件里都有各个分区的内容,所以需要进行merge sort,而在merge sort的过程中,可能需要进行aggregation。

ExternalSorter专门有一个merge方法来完成这个工作,所以第三种情况会直接调用merge方法。

else {
      //此时没有bypassMergeSort,并且spill出来一些文件。因此需要把它们和内存中数据merge在一起,这是最复杂的一种情况。
      merge(spills, collection.partitionedDestructiveSortedIterator(comparator))
    }

这个merge方法首先为每个spill出来的文件创建一个reader,然后按partition id的顺序,依次从各个reader和内存中的迭代器中获取这个partition对应的那部分迭代器。这样对于每个partition,都获得了一组迭代器。merge方法对每个partition对应的那些迭代器进行merge。

又根据ExternalSorter是否有Aggregator和Ordering的情况,分成三种处理逻辑

1. 需要聚合,此时会调用mergeWithAggregation方法来边merge边做aggregate

2. 不需要聚合,并且提供了Ordering。这时候直接mergeSort就行了

3. 不需要聚合,并且没有提供Ordering,这时候就直接把每个partition对应的那组迭代器里的元素组合在一起就行了,会直接用Iterator的flatten方法

  private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))//为每个spill出来的文件生成一个reader
    val inMemBuffered = inMemory.buffered//内存中的迭代器进行buffered,以方便查看其head的信息
    (0 until numPartitions).iterator.map { p => //对每一个partition
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)//对内存中的数据获取这个partition对应的iterator
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)//把文件数据的迭代器和内存数据的迭代器都放在一个seq里
      if (aggregator.isDefined) {//如果需要聚合的话
        // Perform partial aggregation across partitions 对这个partition对应的那些iterator进行merge,并且聚合数据
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

merge方法 

mergeWithAggregation

这会边merge,边做aggregation。根据传进去的iterators是否是按照Ordering排序的,分为两种:

1. 非totalOrder

这是最复杂的一种情况。非totalOrder,说明了这些迭代器中一个partition内部的元素实际是按照hash code排序的。所以即使key1==key2,但是key1和key2之间可能有key3, 它只是与key1和key2有相同的哈希码,但==号并不成立。它所要处理的问题和hash shuffle的ExternalAppendOnlyMap是类似的,但算法并不相同。ExternalSorter里的算法复杂度更低一些,但实际运行时的情况跟互不相等的key的hash code的冲突程度有关。ExternalAppendOnlyMap是基于PriorityQueue做的,而ExternalSorter里的算法是使用两个buffer完成的,后者充分利用了“==不成立的元素不可能有相同的hash code”这个条件,把相同hash code的元素都取出来,对这些元素用两个buffer做聚合。

    if (!totalOrder) {
      // We only have a partial ordering, e.g. comparing the keys by hash code, which means that
      // multiple distinct keys might be treated as equal by the ordering. To deal with this, we
      // need to read all keys considered equal by the ordering at once and compare them.
      new Iterator[Iterator[Product2[K, C]]] {
        val sorted = mergeSort(iterators, comparator).buffered//先按comparator进行merge sort,不aggregate

        // Buffers reused across elements to decrease memory allocation
        val keys = new ArrayBuffer[K] //存放compare为0,但又相到不==的所有key
        val combiners = new ArrayBuffer[C]//存放keys中对应位置的key对应的所有combiner聚合后的结果

        override def hasNext: Boolean = sorted.hasNext

        override def next(): Iterator[Product2[K, C]] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          keys.clear()
          combiners.clear()
          val firstPair = sorted.next()//获取排序后iterator的第一个pair
          keys += firstPair._1//第一个pair的key放在keys里
          combiners += firstPair._2 //第一个pair的combiner放在combiners里
          val key = firstPair._1//第一个pair的key
          while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
            //获取sorted中跟前key compare以后为0的下一个kv。注意,compare为0不一定 ==号成立
            val pair = sorted.next()
            var i = 0
            var foundKey = false
            while (i < keys.size && !foundKey) {
              if (keys(i) == pair._1) {//用当前取出的这个kc的key与keys中key依次比较,找到一个==的,就对combiner进行aggregate,然后结果放在combiners
              // 里,并且结束循环
                combiners(i) = mergeCombiners(combiners(i), pair._2)
                foundKey = true
              }
              i += 1
            }
            //如果这个kc里的key与keys里所有key都不==,意味着它与它当前缓存的所有keycompare为0但不==,所以它是一个新的key,就放在keys里,它的combiner放在combiners里
            if (!foundKey) {
              keys += pair._1
              combiners += pair._2
            }
          }

          // Note that we return an iterator of elements since we could've had many keys marked
          // equal by the partial order; we flatten this below to get a flat iterator of (K, C).
          keys.iterator.zip(combiners.iterator) //把keys和combiners 进行zip,得到iterator of (K, C)
        }
      }.flatMap(i => i) //flatMap之前是Iteator[compare为0的所有kc聚合而成的Iteator[K, C]], 所以直接flatMap(i => i)就成了
    }

 2.totalOrder

此时对这些迭代器先用comparator进行merge sort, 得到的merge后的迭代器里所有==号成立的key就都挨在一起了。所以接下来只需要直接按==号把迭代器划分,然后进行aggregate就行了。

else {
      //因为是total ordering的,意味着用Ordering排序,所以==的key是挨在一起的
      // We have a total ordering, so the objects with the same key are sequential.
      new Iterator[Product2[K, C]] {
        val sorted = mergeSort(iterators, comparator).buffered

        override def hasNext: Boolean = sorted.hasNext

        override def next(): Product2[K, C] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          val elem = sorted.next()
          val k = elem._1
          var c = elem._2
          while (sorted.hasNext && sorted.head._1 == k) { //取出所有==的kc,进行merge
            val pair = sorted.next()
            c = mergeCombiners(c, pair._2)
          }
          (k, c)
        }
      }
    }

总结

ExternalSorter就Spark的sort-based shuffle的核心,它整个文件有800多行,虽然其算法不太复杂,还是由于要处理各种情况,以及进行相关的优化,其实现还是很繁琐的。它的复杂性主要来源于以下几个方面:

  1. 需要控制内存消耗,所以需要spill以及merge
  2. 在spill和merge过程中需要考虑到Aggregator和Ordering的不同情况
  3. 需要为每个partition使用一个输出流,因此有一些输出流的切换和簿记工作。

此外,为了提高效率,它根据特殊情况使用了PartitonedSerializedPairBuffer、byPassMergeSort等优化手段。

在它的实现中,大量使用了迭代器。

  1. 使用了大量迭代器的基本操作,如map、flatmap、flatten、filter、zip。

  2. 为各种集合生成了特殊的迭代器。主要是WritableParitionedPariCollection中定义的三种获取特殊迭代器的方法:partitionedDestructiveSortedIterator,     destructiveSortedWritablePartitionedIterator, writablePartitionedIterator

  3. 大量使用了迭代器的包装。比如Scala的BufferedIterator, mergeSort和mergeWithAggregation中的包装了另一个迭代器的匿名迭代器(new Iterator{...}),     IteratorForPartition。

而且它所使用的三种buffer的设计,以及merge的算法也是值得看一下的。

不过shuffle绝对是Spark程序的性能杀手。每个元素都要经过如此复杂的处理,所以shuffle的总的性能开销还是挺大的,但这也意味着对shuffle的过程进行优化可以对性能有较大的提升。俺认为,一方面,可以优化自己的程序,包括尽量避免shuffle、减少需要shuffle中需要IO的数据量(各种使kv、kc序列化后变得更小的方法,使用map-side combiner)、选择合适的shuffle配置参数等;另一方面,Spark框架本身的shuffle的实现也还有优化的空间,比如对内存占用更准确地估计,根据被shuffle的数据的特点区分不同情况以采用更细致的策略(比如实现shuffle专用的各种特殊集合, 考虑到shuffle特点的序列化方法等)。

原文地址:https://www.cnblogs.com/devos/p/4805526.html