Shuffle 相关参数配置

概述

Shuffle是Spark Core比较复杂的模块,它也是非常影响性能的操作之一。因此,在这里整理了会影响Shuffle性能的各项配置。

   

1)spark.shuffle.manager

Spark 1.2.0官方版本支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0之前仅支持Hash Based Shuffle。Spark 1.1引入了Sort Based Shuffle。Spark 1.2的默认Shuffle机制从Hash变成了Sort。如果需要Hash Based Shuffle,只需将spark.shuffle.manager设置成"hash"即可。

配置方式:

①进入spark安装目录的conf目录

②cp spark-defaults.conf.template spark-defaults.conf

③spark.shuffle.manager=hash

   

应用场景:当产生的临时文件不是很多时,性能可能会比sort shuffle要好。因为没有任何的排序操作

如果对性能有比较苛刻的要求,那么就要理解这两种不同的Shuffle机制的原理,结合具体的应用场景进行选择。

   

对于不需要进行排序且Shuffle产生的文件数量不是特别多时,Hash Based Shuffle可能是更好的选择;因为Sort Based Shuffle会按照Reducer的Partition进行排序。

   

而Sort Based Shuffle的优势就在于可扩展性,它的出现实际上很大程度上是解决Hash Based Shuffle的可扩展性的问题。由于Sort Based Shuffle还在不断地演进中,因此它的性能会得到不断改善。

   

对于选择哪种Shuffle,如果性能要求苛刻,最好还是通过实际测试后再做决定。不过选择默认的Sort,可以满足大部分的场景需要。

   

2)spark.shuffle.spill

这个参数的默认值是true,用于指定Shuffle过程中如果内存中的数据超过阈值(参考spark.shuffle.memoryFraction的设置)时是否需要将部分数据临时写入外部存储。如果设置为false,那么这个过程就会一直使用内存,会有内存溢出的风险。因此只有在确定内存足够使用时,才可以将这个选项设置为false。

   

   

3)spark.shuffle.memoryFraction

在启用spark.shuffle.spill的情况下,spark.shuffle.memoryFraction决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始spill。在Spark 1.2.0里,这个值是0.2。

此参数可以适当调大,可以控制在0.4~0.6。

通过这个参数可以设置Shuffle过程占用内存的大小,它直接影响了写入到外部存储的频率和垃圾回收的频率。可以适当调大此值,可以减少磁盘I/O次数

   

4)spark.shuffle.blockTransferService

在Spark 1.2.0中这个配置的默认值是netty,而在之前的版本中是nio。它主要是用于在各个Executor之间传输Shuffle数据。netty的实现更加简洁,但实际上用户不用太关心这个选项。除非有特殊需求,否则采用默认配置即可。

   

5)spark.shuffle.consolidateFiles

这个配置的默认值是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问题。如果配置选项为true,那么对于同一个Core上运行的Shuffle Map Task不会产生一个新的Shuffle文件而是重用原来的。

   

6)spark.shuffle.compress和spark.shuffle.spill.compress

这两个参数的默认配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用来设置Shuffle过程中是否对Shuffle数据进行压缩。其中,前者针对最终写入本地文件系统的输出文件;后者针对在处理过程需要写入到外部存储的中间数据,即针对最终的shuffle输出文件。

1. 设置spark.shuffle.compress

需要评估压缩解压时间带来的时间消耗和因为数据压缩带来的时间节省。如果网络成为瓶颈,比如集群普遍使用的是千兆网络,那么将这个选项设置为true可能更合理;如果计算是CPU密集型的,那么将这个选项设置为false可能更好。

2. 设置spark.shuffle.spill.compress

   

如果设置为true,代表处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。因此要综合考虑CPU由于引入压缩、解压的消耗时间和Disk IO因为压缩带来的节省时间的比较。在Disk IO成为瓶颈的场景下,设置为true可能比较合适;如果本地硬盘是SSD,那么设置为false可能比较合适。

   

7)spark.reducer.maxMbInFlight

这个参数用于限制一个Result Task向其他的Executor请求Shuffle数据时所占用的最大内存数,默认是64MB。尤其是如果网卡是千兆和千兆以下的网卡时。默认值是设置这个值需要综合考虑网卡带宽和内存。

   

原文地址:https://www.cnblogs.com/shuzhiwei/p/11077349.html