Spark的Shuffle机制

什么是Shuffle

在RDD中,将每个相同key的value聚合起来。相同key的value可能在不同partition,也可能在不同节点。因此shuffle操作会影响多个节点。

常见的shuffle操作有:groupByKey(),reduceBykey()等。

Shuffle Write和Read

Shuffle Write:上一个stage的每个map task会将处理好的相同key的数据写入一个分区文件。

Shuffle Read:reduce task就会从上一个stage的节点上寻找属于自己的分区文件,将相同key对应的value汇聚到同一个节点。

Shuffle的类型

Spark的Shuffle共有两类:HashShuffle和SortShuffle。之前默认是HashShuffle,现在默认是SortShuffle。

HashShuffle

执行流程

  1. 每个map task将不同key的数据写到不同的buffer中
  2. 每个buffer对应一个分区文件,即磁盘小文件
  3. reduce task来拉取对应的磁盘小文件

其中每个map task会有reduce task数量的分区文件,因此产生的磁盘小文件个数为:M*R(M为map task个数,R为reduce task个数)

存在的问题

主要问题是磁盘小文件过多,磁盘小文件过多会衍生出很多问题:

  • Write过程中会产生很多写入文件对象,要写入数据
  • Read过程中会产生很多读取文件对象,要读取数据
  • 对象过多,会造成JVM堆内存频繁的GC,而且如果GC还提供不了相应的内存,最终会OOM
  • 小文件数量很多,网络通信消耗也大

改进

前面是每个map task产生相应的reduce task个数的小文件。

合并机制:合并后,每个core产生对应的reduce task个数的小文件,即每个Executor产生R个,产生磁盘小文件总数:C*R(C为core的个数,R为reduce task个数)

减少了小文件数量。

SortShuffle

执行流程

  1. map task将数据结果写入内存
  2. 对内存中数据进行排序分区
  3. 溢写到磁盘,形成多个小文件
  4. 将小文件合并为一个大文件,同时生成一个索引文件
  5. reduce task去每个map task靠索引文件去数据文件拉去数据

可以发现SortShuffle的执行过程和MapReduce的Shuffle很相似,其最终只生成一个数据文件和索引文件。生成文件个数:2*M

改进,bypass机制

bupassSortShuffle少了排序的步骤。

触发条件为shuffle reduce task要小于spark.shuffle.sort.bypassMergeThreshold的参数值。产生的文件个数:2*M。

排序机制

在内存中,对数据进行排序,然后将数据写入磁盘。假设数据有100w,每次读取10w数据排序写入,就有10个文件。这时候读取10个文件的头部数据,然后采取堆排序写入最终有序的文件,由此可以形成全局有序。

Shuffle文件寻址

MapOutputTrack

负责管理磁盘小文件的地址。有MapOutputTrackerMaster存在于Driver节点,和MapOutputTrackerWorker存在于Executor节点。

BlockManager

负责管理块。有BlockManagerMaster存在Driver节点,和BlockManagerWorker存在于Executor.

寻址流程

map task执行完后,map task会将磁盘小文件的地址通过MapOutputTrackerWorker发送给Driver的MapOutputTrackerMaster,MapOutputTrackerMaster再将磁盘地址发送给reduce task端的MapOutputTrackerWorker。最后reduce task端的BlockManagerWorker和map task端的BlockManagerWorker通信,拉取数据。

原文地址:https://www.cnblogs.com/chenshaowei/p/13320712.html