SparkSubmit参数及参数性能调优

首先摆出常用的参数设定

bin/spark-submit 
--class com.xyz.bigdata.calendar.PeriodCalculator 
--master yarn 
--deploy-mode cluster 
--queue default_queue 
--num-executors 50 
--executor-cores 2 
--executor-memory 4G 
--driver-memory 2G 
--conf "spark.default.parallelism=250" 
--conf "spark.shuffle.memoryFraction=0.3" 
--conf "spark.storage.memoryFraction=0.5" 
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" 
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" 
--verbose 
${PROJECT_DIR}/bigdata-xyz-0.1.jar

关于spark-submit的执行过程,读Spark Core的源码能够获得一个大致的印象。今天事情比较多,所以之后会另写文章专门叙述关于Spark on YARN的事情(又挖了一个坑,上一个坑是关于Java String和JVM的,需要尽快填上了)。

num-executors

  • 含义:设定Spark作业要用多少个Executor进程来执行。
  • 设定方法:根据我们的实践,设定在30~100个之间为最佳。如果不设定,默认只会启动非常少的Executor。如果设得太小,无法充分利用计算资源。设得太大的话,又会抢占集群或队列的资源,导致其他作业无法顺利执行。

executor-cores

  • 含义:设定每个Executor能够利用的CPU核心数(这里核心指的是vCore)。核心数越多,并行执行Task的效率也就越高。
  • 设定方法:根据我们的实践,设定在2~6之间都是可以的,主要是根据业务类型和数据处理逻辑的复杂程度来定,一般来讲设2或者3就够用了。需要注意的是,num-executors * executor-cores不能将队列中的CPU资源耗尽,最好不要超过总vCore数的1/3,以给其他作业留下剩余资源。

executor-memory

  • 含义:设定每个Executor的内存量(堆内内存)。这个参数比executor-cores更为重要,因为Spark作业的本质就是内存计算,内存的大小直接影响性能,并且与磁盘溢写、OOM等都相关。
  • 设定方法:一般设定在2G~8G之间,需要根据数据量慎重做决定。如果作业执行非常慢,出现频繁GC或者OOM,就得适当调大内存。并且与上面相同,num-executors * executor-memory也不能过大,最好不要超过队列总内存量的一半。
    另外,还有一个配置项spark.executor.memoryOverhead,用来设定每个Executor可使用的堆外内存大小,默认值是executor-memory的0.1倍,最小值384M。一般来讲都够用,不用特意设置。

driver-memory

  • 含义:设定Driver进程的内存量(堆内内存)。
  • 设定方法:由于我们几乎不会使用collect()之类的算子把大量RDD数据都拉到Driver上来处理,所以它的内存可以不用设得过大,2G可以应付绝大多数情况。但是,如果Spark作业处理完后数据膨胀比较多,那么还是应该酌情加大这个值。
    与上面一项相同,spark.driver.memoryOverhead用来设定Driver可使用的堆外内存大小。

spark.default.parallelism

  • 含义:对于shuffle算子,如reduceByKey()或者join(),这个参数用来指定父RDD中最大分区数。由于分区与Task有一一对应关系,因此也可以理解为Task数。其名称的字面意义是“并行度”,不能直接表达出这种含义。
  • 设定方法:Spark官方文档中推荐每个CPU core执行2~3个Task比较合适,因此这个值要设定为(num-executors * executor-cores)的2~3倍。这个参数同样非常重要,因为如果不设定的话,分区数就会由RDD本身的分区来决定,这样往往会使得计算效率低下。

spark.shuffle.memoryFraction

  • 含义:shuffle操作(聚合、连接、分组等等)能够使用的可用堆内存(堆大小减去300MB保留空间)的比例,默认值是0.2。如果shuffle阶段使用的内存比例超过这个值,就会溢写到磁盘。
  • 设定方法:取决于计算逻辑中shuffle逻辑的复杂度,如果会产生大量数据,那么一定要调高。在我们的实践中,一般都设定在0.3左右。但是,如果调太高之后发现频繁GC,那么就是执行用户代码的execution内存不够用了,适当降低即可。

spark.storage.memoryFraction

  • 含义:缓存操作(persist/cache)能够使用的可用堆内存的比例,默认值是0.6。
  • 设定方法:如果经常需要缓存非常大的RDD,那么就需要调高。否则,如果shuffle操作更为重量级,适当调低也无妨。我们一般设定在0.5左右。

其实,spark.shuffle/storage.memoryFraction是旧版的静态内存管理(StaticMemoryManager)的遗产。在Spark 1.6版本之后的文档中已经标记成了deprecated。目前取代它们的是spark.memory.fraction和spark.memory.storageFraction这两项,参考新的统一内存管理(UnifiedMemoryManager)机制可以得到更多细节。
前者的含义是总内存占堆的比例,即execution+storage+shuffle内存的总量。后者则是storage内存占前者的比例。默认值分别为0.75(最新版变成了0.6)和0.5。

spark.driver/executor.extraJavaOptions

  • 含义:Driver或Executor进程的其他JVM参数。
  • 设定方法:一般可以不设置。如果设置,常见的情景是使用-Xmn加大年轻代内存的大小,或者手动指定垃圾收集器(最上面的例子中使用了G1,也有用CMS的时候)及其相关参数。

举例

举例,以我上一个老东家的参数命令,假设数据量400G,要求1个半小时内跑完
 
离线:

spark-submit --master yarn
--class com.sddt.spark.web.WebETL
--driver-memory 10G
--executor-memory 24G
--num-executors 20
--executor-cores 4
--conf spark.web.etl.inputBaseDir=hdfs://master:9999/user/hive/warehouse/rawdata.db/web
--conf spark.web.etl.outputBaseDir=hdfs://master:9999/user/hadoop-twq/traffic-analysis/web
--conf spark.web.etl.startDate=20180617
--conf spark.driver.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master"
--conf spark.executor.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master -Dcom.sun.management.jmxremote.port=1119 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
/home/hadoop-twq/traffice-analysis/jars/spark-sessionization-etl-1.0-SNAPSHOT-jar-with-dependencies.jar prod

实时流:

#!/usr/bin/env bash

# the two most important settings:
# 取决于如下因素:
# 1:每一秒接收到的events,尤其是在高峰时间
# 2:数据源的缓冲能力
# 3:可以忍受的最大的滞后时间
# 我们通过将我们的Streaming程序在准生产环境中跑几天来确定以上的因素
# 进而确定我们的executors的个数
num_executors=6
# 取决于如下因素:
# 1:每一个batch需要处理的数据的大小
# 2:transformations API的种类,如果使用的transformations需要shuffle的话,则需要的内存更大一点
# 3:使用状态Api的话,需要的内存更加大一点,因为需要内存缓存每一个key的状态
executor_memory=6g

# 每一个executor配置3到5个cores是比较好的,因为3到5个并发写HDFS是最优的
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
executor_cores=3

# backpressure
receiver_max_rate=100
receiver_initial_rate=30

spark-submit
--master yarn -
-deploy-mode cluster
--name "Real_Time_SessionizeData"
--class com.sddt.sessionize.SessionizeData
--driver-memory 2g
--num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory}
--queue "realtime_queue"
--files "hdfs:master:9999/user/hadoop-master/real-time-analysis/log4j-yarn.properties"
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer`
--conf spark.locality.wait=10 `# 减少Spark Delay Scheduling从而提高数据处理的并行度, 默认是3000ms`
--conf spark.task.maxFailures=8 `# 增加job失败前最大的尝试次数, 默认是4`
--conf spark.ui.killEnabled=false `# 禁止从Spark UI上杀死Spark Streaming的程序`
--conf spark.logConf=true `# 在driver端打印Spark Configuration的日志`
`# SPARK STREAMING CONFIGURATION`
--conf spark.streaming.blockInterval=200 `# 生成数据块的时间, 默认是200ms`
--conf spark.streaming.backpressure.enabled=true `# 打开backpressure的功能`
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# direct模式读取kafka每一个分区数据的速度`
`# YARN CONFIGURATION`
--conf spark.yarn.driver.memoryOverhead=512 `# Set if --driver-memory < 5GB`
--conf spark.yarn.executor.memoryOverhead=1024 `# Set if --executor-memory < 10GB`
--conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts`
--conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour`
--conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures`
--conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour`
--driver-java-options hdfs://master:9999/user/hadoop-master/real-time-analysis/output real_time_session s hdfs://master:9999/user/hadoop-master/real-time-analysis/checkpoint session master:9092
/home/hadoop-master/real-time-analysis/realtime-streaming-sessionization-1.0-SNAPSHOT-jar-with-dependencies.jar
hdfs://master:9999/user/hadoop-master/real-time-analysis/output real_time_session s hdfs://master:9999/user/hadoop-master/real-time-analysis/checkpoint session master:9092

  

性能调优

Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如代码清单2-1所示:

/usr/opt/modules/spark/bin/spark-submit 
--class com.sddt.spark.Analysis 
--num-executors 80 
--driver-memory 6g 
--executor-memory 6g 
--executor-cores 3 
/usr/opt/modules/spark/jar/spark.jar 

 

可以进行分配的资源如表2-1所示:

名称

说明

--num-executors

配置Executor的数量

--driver-memory

配置Driver内存(影响不大)

--executor-memory

配置每个Executor的内存大小

--executor-cores

配置每个Executor的CPU core数量

 

调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。

对于具体资源的分配,我们分别讨论Spark的两种Cluster运行模式:

第一种是Spark Standalone模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写submit脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每个Executor分配8G内存,2个CPU core。

第二种是Spark Yarn模式,由于Yarn使用资源队列进行资源的分配和调度,在表写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。

对表2-1中的各项资源进行了调节后,得到的性能提升如表2-2所示:

名称

解析

 

 

增加Executor·个数

在资源允许的情况下,增加Executor的个数可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将Executor的个数增加到8个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。

 

 

 

增加每个Executor的CPU core个数

  在资源允许的情况下,增加每个Executor的Cpu core个数,可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将每个Executor的CPU core个数增加到4个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。

 

 

 

 

 

 

 

增加每个Executor的内存量

  在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点:

  1. 可以缓存更多的数据(即对RDD进行cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
  2. 可以为shuffle操作提供更多内存,即有更多空间来存放reduce端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
  3. 可以为task的执行提供更多内存,在task的执行过程中可能创建很多对象,内存较小时会引发频繁的GC,增加内存后,可以避免频繁的GC,提升整体性能。

 

 

 

原文地址:https://www.cnblogs.com/tesla-turing/p/13550613.html