Spark调优


下面调优主要基于2.0以后。

代码优化

1.语言选择

如果是ETL并进行单节点机器学习,SparkR或Python。优点:语言相对简单;缺点:使用语言自身的数据结构时,效率低,因为这些数据需要转换。

如果用到自定义transformations或自定义类,Scala或Java。优点:性能好;缺点:语言相对复杂。

2.API选择

  • DataFrames
    • 大多数情况下的最佳选择
    • 更有效率的储存(Tungsten,减少GC开销)和处理(Catalyst优化查询)
    • 全阶段代码生成
    • 直接内存访问
    • 没有提供域对象编程和编译时检查
    • 部分算子在特定情况可优化,如order后take、window function等
    • 少数功能没有相应的算子
  • DataSets
    • 适用于性能影响可接受的复杂ETL管道
    • 通过Catalyst提供查询优化
    • 提供域对象编程和编译时检查
    • 较高GC开销
    • 打破整个阶段的代码生成
  • RDDs
    • 在Spark 2.x中,基本不需要使用,除非某些功能上面两种API没有
    • 增加序列化/反序列化开销。上面两种结构不必对整个对象进行反序列化也可访问对象属性。
    • 没有通过Catalyst进行查询优化
    • 没有全阶段代码生成
    • 高GC开销

3.内存

  • 数据类型

尽量用DF,默认通常比自定义有更多优化。

另外,优先用原始数据结构和数组,次之为String而非其他集合和自定义对象。可以把自定义类改写成String的形式来表示,即用逗号,竖线等隔开每个成员变量,又或者用json字符串存储。

数字或枚举key优于string

估计内存消耗的方法:1.cache并在UI的Storage页查看。2.org.apache.spark.util.SizeEstimator可估计object的大小

  • GC(1.6以后)

    • spark.memory.fraction默认0.6,即堆内存减去reserved的300MB后的60%,它用来存储计算和cache所需的数据(storageFraction设置这两类数据的边界。例如0.4代表当execution需要空间时会赶走超过40%的那部分storage空间),剩下的40%存储Spark的元数据。

    • GC调优:在UI或者配置一些选项后能在worker节点的日志查看GC信息。

    • 过多full GC,则要增加executor内存。

    • 过多minor GC,则调大伊甸区。

    • 如果老年代空间不足,减少用于cache的内存,即减少fraction,同时减少storageFraction。或者直接调小伊甸区。

    • 调用G1回收器,如果heap size比较大,要调大-XX:G1HeapRegionSize。

    • 关于伊甸区大小的设置,可以根据每个executor同时处理的任务数估计。比如一个executor同时处理4个task,而每份HDFS压缩数据为128M(解压大概为2到3倍),可以把伊甸区设置为4 x 3 x 128MB

  • OffHeap(2.0以后)

    尝试使用Tungsten内存管理,设置spark.memory.ofHeap.enabled开启,spark.memory.ofHeap.size控制大小。

4.Caching

cache不同操作逻辑都需要用到的RDD或DF。会占用storage空间。

目前Spark自带的cache适合小量数据,如果数据量大,建议用Alluxio,配合in-memory and ssd caching,或者直接存到HDFS。

cache()是persist(MEMORY_ONLY),首选。内存不够部分(整个partition)在下次计算时会重新计算。
如下所示,persist可以使用不同的存储级别进行持久化。能不存disk尽量不存,有时比重新算还慢
MEMORY_AND_DISK
MEMORY_ONLY_SER(序列化存,节省内存(小2到5倍),但要反序列化效率稍低,第二种选择),MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2, MEMORY_AND_DISK_2 备2份

对已经不需要的RDD或DF调用unpersist

当persist数据而storage不足时,默认会执行LRU算法,可通过persistencePriority控制,来淘汰之前缓存的数据。不同persist选项有不同操作,比如memory_only时,重用溢出的persisted RDD要重新计算,而memory and disk会将溢出部分写到磁盘。

4.filter、map、join、partition、UDFs等

  • filter:尽早filter,过滤不必要的数据,有时还能避免读取部分数据(Catalyst的PushdownPredicate)。

  • mapPartitions替代map(foreachPartitions同理):

    该算子比map更高效,但注意算子内运用iterator-to-iterator转换,而非一次性将iterator转换为一个集合(容易OOM)。详情看high performance spark的“Iterator-to-Iterator Transformations with mapPartitions”

  • broadcast join:实际上是设置spark.sql.autoBroadcastJoinThreshold,是否主动用broadcast join交给Spark DF判断,因为我们只能知道数据源的大小而不一定知道经过处理后join前数据的大小

  • partition:减少partition用coalesce不会产生shuffle(把同节点的partition合并),例如filter后数据减少了不少时可以考虑减少分块

    repartition能尽量均匀分布data,在join或cache前用比较合适。

    自定义partitioner(很少用)

  • UDFs:在确定没有合适的内置sql算子才考虑UDFs

  • groupByKey/ reduceByKey

    对于RDD,少用前者,因为它不会在map-side进行聚合。但注意不要与DF的groupBy + agg和DS的groupByKey + reduceGroups混淆,它们的行为会经过Catalyst优化,会有map-side聚合。

  • flatMap:用来替代map后filter

5.I/O

将数据写到数据库中时,开线程池,并使用foreachPartition,分batch提交等。

开启speculation(有些storage system会产生重复写入),HDFS系统和Spark在同一个节点

6.广播变量

比如一个函数要调用一个大的闭包变量时,比如用于查询的map、机器学习模型等

配置优化

1.并行度

num-executors * executor-cores的2~3倍spark.default.parallelismspark.sql.shuffle.partitions

2.数据序列化Kryo

Spark涉及序列化的场景:闭包变量、广播变量、自定义类、持久化。

Kryo不是所有可序列化类型都支持,2.0之后,默认情况下,simple types, arrays of simple types, or string type的shuffle都是Kryo序列化。

//通过下面设置,不单单shuffle,涉及序列化的都会用Kryo。开启之后,scala的map,set,list,tuple,枚举类等都会启用。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//对于自定义类,要登记。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

优化缓存大小spark.kryoserializer.buffr.mb(默认2M)

3.数据本地化

调节任务等待时间 ,通过spark.locality.xxx,可以根据不同级别设置等待时间。

4.规划

资源均分:spark.scheduler.modeFAIR

限制:--max-executor-cores或修改默认spark.cores.max减少core的情况:

  1. Reduce heap size below 32 GB to keep GC overhead < 10%.
  2. Reduce the number of cores to keep GC overhead < 10%.

5.数据储存

存储格式:Parquet首选

压缩格式:选择splittable文件如 zip(在文件范围内可分), bzip2(压缩慢,其他都很好), LZO(压缩速度最快)。上传数据分开几个文件,每个最好不超几百MB,用maxRecordsPerFile控制。文件不算太大,用gzip(各方面都好,但不能分割)。在conf中通过spark.sql.partuet.compression.codec设置。

6.shuffle

spark.reducer.maxSizeInFlight(48m): reduce端拉取数据的buffer大小,如果内存够大,且数据量大时,可尝试96m,反之调低。

spark.reducer.maxReqsInFlight(Int.MaxValue):当reduce端的节点比较多时,过多的请求对发送端不利,为了稳定性,有时可能需要减少。

spark.reducer.maxBlocksInFlightPerAddress(Int.MaxValue):和上面对应,控制每次向多少个节点拉取数据。

spark.maxRemoteBlockSizeFetchToMem(Long.MaxValue):当一个block的数据超过这个值就把这个block拉取到到磁盘。

spark.shuffle.compress(true)

spark.shuffle.file.buffer:所产生准备shuffle的文件的大小,调大可减少溢出磁盘的次数,默认32k,可尝试64k。

spark.shuffle.sort.bypassMergeThreshold(200):小于这个值时用BypassMergeSortShuffleWriter。

Netty only:

maxRetries * retryWait两个参数:有可能CG时间长导致拉取不到数据

spark.shuffle.io.preferDirectBufs(true):如果对外内存紧缺,可以考虑关掉。

spark.shuffle.io.numConnectionsPerPeer(1):对于具有许多硬盘和少数主机的群集,这可能导致并发性不足以使所有磁盘饱和,因此用户可能会考虑增加此值。

7.executor内存压力和Garbage Collection

收集统计数据:在Spark-submit添加--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。这之后能在worker节点的logs里看到信息。也可以在UI查看。

如果full GC被触发多次,则表明老年代空间不够用,可以增加executor的内存或减少spark.memory.fraction的值(会影响性能)

如果很多minor GC,分配更多内存给Eden区(-Xmn=4/3*E)。关于伊甸区大小的设置,可以根据每个executor同时处理的任务数估计。比如一个executor同时处理4个task,而每份HDFS压缩数据为128M(解压大概为2到3倍),可以把伊甸区设置为4 x 3 x 128MB。

启用G1收集器也有可能提高效率。

spark.executor.memory是不包含overhead的,所以实际上占用的内存比申请的多。在executor内存中默认40%存一些数据和Spark的元数据,剩下的60%中,默认50% execution(计算) and 50% storage(用于caching) memory,但这是动态变动的,storage可借用execution的空间,当execution需要空间时又会赶走超过50%的那部分storage空间。这里数据量都是按block计算的。

storage过小会cache丢失,代价取决于persist等级,可能直接删除,需要时重计算淘汰数据或溢出淘汰数据到磁盘,execution过小会有很多I/O。

在1.6之前,内存空间划分为

  • Executionspark.shuffle.memoryFraction(default0.2):buffering intermediate data when performing shuffles, joins, sorts and aggregations
  • Storagespark.storage.memoryFraction(default0.6): caching, broadcasts and large task results
  • Other default 0.2: data structures allocated by user code and internal Spark metadata.

在1.6及之后,内存空间划分为

  • Execution and Storage spark.memory.fraction (1.6时0.75,2.0后0.6)
    • spark.memory.storageFraction(default0.5): 当上面的内存中,storage超过50%,cached data may be evicted
  • Other

8.集群配置

Resource and Job Scheduling

每个spark application运行一系列独立的executor进程。在application中,多个job可能同时运行,只要他们被分配到不同的线程。

executor调度

Spark’s standalone, YARN modes 和 coarse-grained Mesos mode都是静态划分,即app被分配最大量的executor,并拥有这些executor,直到app完成。Mesos可以实现动态划分。

Spark可以设置动态调整executor,根据workload决定是否回收executor,这更适合多服务。这机制对上面提到的三种集群部署模式都适用。在动态调整下,如果有task等待executor且有空余资源,就会启动executor。这里有两个参数,一个规定task的等待时间,一个规定每次提供executor的间隔时间(开始提供1、然后2、4等)。移除则是根据idletimeout参数设定时间。这种动态调整会带来一个问题,即reducer端未得到所需数据,mapper端进程就因空闲而退出,导致数据无法获取。为解决这个问题,在动态调整阶段,executor的退出并非马上退出,而是会持续一段时间。这个做法通过外部shuffle服务完成(所以启动动态调整时也要启动外部shuffle服务),reducer端会从service处拉取数据而不是executor。

cache数据也有类似的参数设置spark.dynamicAllocation.cachedExecutorIdleTimeout。

参数调整经验

  • spark.dynamicAllocation.initialExecutors决定了Executor初始数目。这个值的选取可以根据历史任务的Executor数目的统计,按照二八原则来设置,例如80%的历史业务的Executor数目都不大于参数值。同时,也要考虑集群的资源紧张度,当资源比较紧张时,这个值需要设置得小一点。
  • spark.dynamicAllocation.maxExecutors决定了业务最大可以拥有的Executor数目。默认无穷大,要注意过大会使大业务独占大部分资源,造成小任务没有资源的情况;过小会导致大任务执行时间超出业务要求。
  • spark.dynamicAllocation.executorIdleTimeout决定了Executor空闲多长时间后会被动态删除。当这个值比较小时,集群资源会比较充分地共享,但会影响业务的执行时间(在Executor被删除后,可能需要重新申请新的Executor来执行task);当这个值比较大时,不利于资源的共享,若一些比较大的任务占用资源,迟迟不释放,就会造成其他任务得不到资源。这个值的选取需要在用户业务的执行时间和等待时间上做一个权衡。需要注意的是,当spark.dynamicAllocation.maxExecutors为有限值时,spark.dynamicAllocation.executorIdleTimeout过小会导致某些任务不能申请新的资源。例如maxExecutors=10,而某个业务所需的资源大于或等于10个Executor,业务在申请到10个Executor之后,申请到Executor由于空闲(有可能因为task还没来得及分配到其上)而被删除,目前社区Spark SQL的逻辑是不会再申请新的Executor的,这样就会导致任务执行速度变慢。

job调度

FIFO:如果前面的job不需要所有资源,那么第二个job也可启动。适合长作业、不适合短作业;适合CPU繁忙型。

FAIR:轮询分配,大概平均的资源,适合多服务。该模式还可以设置pool,对job进行分组,并对各组设置优先级,从而实现job的优先级。可设置的参数有schedulingMode(FIFO和FAIR)、weight(池的优先级,如某个为2,其他为1,则2的那个会获得比其他多一倍的资源)、minShare(至少资源数)

小文件合并

当Reducer数目比较多时,可能会导致小文件过多。最好在输出前将文件进行合并。另外,后台应定期对数据进行压缩和合并。

冷热数据分离

  • 冷:性能低、数据块大、备份数少、压缩率高
  • 热:反之(压缩速度快)

参考

书籍:

Spark: The Definitive Guide

High Performance Spark

Spark SQL 内核剖析

文章:

官网Tunning Guide

官网Spark Configuration

Microsoft Azure's Optimize Spark jobs

原文地址:https://www.cnblogs.com/code2one/p/10159719.html