Spark- Spark普通Shuffle操作的原理剖析

在spark中,什么情况下会发生shuffle?

reduceByKey,groupByKey,sortByKey,countByKey,join,cogroup等操作。

默认的shuffle操作的原理剖析

  假设有一个节点上面运行了4个 ShuffleMapTask,然后这个节点上只有2个 cpu core。假如有另外一台节点,上面也运行了4个ResultTask,现在呢,正等着要去 ShuffleMapTask 的输出数据来完成比如 reduceByKey 等操作。

  每个 ShuffleMapTask 都会为 ReduceTask 创建一份 bucket 缓存,以及对应的 ShuffleBlockFile 磁盘文件。

  ShuffleMapTask 的输出会作为 MapStatus,发送到 DAGScheduler 的 MapOutputTrackerMaster 中。MapStatus 包含了每个 ResultTask 要拉取的数据的大小。

  每个 ResultTask 会用 BlockStoreShuffleFetcher 去 MapOutputTrackerMaster 获取自己要拉取数据的信息,然后底层通过 BlockManager 将数据拉取过来。

  每个 ResultTask 拉取过来的数据,其实就会组成一个内部的RDD,叫ShuffleRDD;优先放入内存,其次内存不够,那么写入磁盘。

  然后每个ResultTask针对数据进行聚合,最后生成MapPartitionsRDD,也就是我们执行reduceByKey等操作希望获得的那个RDD。map端的数据,可以理解为Shuffle的第一个RDD,MapPartitionsRDD。所以假设如果有100个map task ,100个 reduce task,本地磁盘要产生10000个文件,磁盘IO过多,影响性能。

Spark Shuffle操作的两个特点

第一个特点,就是说,在 Spark 早期版本中,那个 bucket 缓存是非常重要的;因为需要将一个 ShuffleMapTask 所有的数据都写入内存缓存之后,才会刷新到磁盘。但是这就有一个问题,如果map side 数据过多,那么很容易造成内存溢出。所以spark在新版本中。优化了默认那个内存缓存是100kb,然后呢,写入一点数据达到刷新的阈值之后,就会将数据一点一点地刷新到磁盘。

  这种操作的优点是不容易发生内存溢出。缺点在于,如果内存缓存过小的话,那么可能发生过多的磁盘 io 操作。所以,这里的内存缓存大小,是可以根据实际的业务情况进行优化的。

第二个特点,与MapReduce完全不一样的是,MapReduce 它必须将所有的数据都写入本地磁盘文件以后,才能启动reduce 操作,来拉取数据。为什么?因为mapreduce 要实现默认的根据key 排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。

  但spark不需要,spark默认的情况下,是不会对数据进行排序的。因此ShuffleMapTask 每写入一点数据,ResultTask 就可以拉取一点数据,然后在本地执行我们定义的聚合函数和算子,进行计算。

  spark这种机制的好处在于,速度比mapreduce 快多了。但是也有一个问题,mapreduce 提供的reduce,是可以处理每个key 对应的 value上的,很方便。但是spark 中,由于这种实时拉取的机制,因此提供不了直接处理 key 对应的 value 的算子, 只能通过 groupByKey,先shuffle,有一个MapPartitionsRDD,然后用map 算子来处理每个 key 对应的 values。就没有maprece 的计算模型那么方便。

原文地址:https://www.cnblogs.com/RzCong/p/7719019.html