Spark1.0.0 属性配置

1:Spark1.0.0属性配置方式
      Spark属性提供了大部分应用程序的控制项,而且能够单独为每一个应用程序进行配置。
      在Spark1.0.0提供了3种方式的属性配置:
  • SparkConf方式
    • SparkConf方式能够直接将属性值传递到SparkContext;
    • SparkConf能够对某些通用属性直接配置,如master使用setMaster,appname使用setAppName;
    • 也能够使用set()方法对属性进行键-值对配置,如set("spark.executor.memory", "1g") 。
  • 命令行參数方式
    • 这样的方式是在使用spark-submit或spark-shell提交应用程序的时候。用命令行參数提交;
    • 这样的方式能够比較灵活的配置各个应用程序的执行环境;
    • 能够通过spark-submit --help 或 spark-shell –help显示出属性的完整列表。
  • 文件配置方式
    • 该方式是将属性配置项以键值对方式写入文本文件里,一个配置项占一行;
    • 该文件默觉得conf/spark-defaults.conf。spark-submit在提交应用程序的时候会检查是否存在该文件,有的话就将相关的属性配置加载;
    • 该文件能够在 spark-submit的命令參数--properties-file定义不同的位置。

  • 优先权
    • SparkConf方式 > 命令行參数方式 >文件配置方式
  • 查看Spark属性配置
    • 通过应用程序的webUI(地址http://<driver>:4040)能够查看Spark属性配置,从而检查属性配置是否正确。
    • 仅仅是显示通过上面三种方式显式指定的属性配置。对于其它属性能够假定使用默认配置;
    • 对于大多数内部控制属性,系统已经提供了合理的默认配置。


2:Spark1.0.0中通用属性
A:应用程序属性
 属性名称  默认  含义
 spark.app.name   应用程序名称
 spark.master   要连接的群集管理器
 spark.executor.memory  512 m 每一个executor使用的内存总量
 spark.serializer  org.apache.spark.serializer.
JavaSerializer
在网络数据传送或缓存时使用的序化,默认的序是Java序化,尽管这样的序对不论什么Java对象能够使用,兼容性好,可是处理速度相当的慢。假设要追求处理速度的话,建议使用org.apache.spark.serializer.KryoSerializer序化。当然也能够随意是定义为org.apache.spark.Serializer 子类的序化

 spark.kryo.registrator   假设要使用 Kryo序化。须要创建一个继承KryoRegistrator的类并设置系统属性spark.kryo.registrator指向该类。

 spark.local.dir  /tmp 用于暂存空间的文件夹。该文件夹用于保存map输出文件或者转储RDD。

该文件夹位于快速的本地磁盘上,或者位于使用逗号分隔的多个不同磁盘上的文件夹。注意: 在Spark 1.0 及更高版本号这属性将被群集管理器配置的环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 取代。

 spark.logConf  false SparkContext 启动时记录有效 SparkConf信息。
   
B:执行时环境
 属性名称  默认  含义
 spark.executor.memory  512 m 分配给每一个executor进程总内存(使用类似512m、2g格式) 
 spark.executor.extraJavaOptions   要传递给executor的额外 JVM 选项,注意不能使用它来设置Spark属性或堆大小设置。
 spark.executor.extraClassPath   追加到executor类路径中的附加类路径,主要为了兼容旧版本号的Spark。通常不须要用户设置。
 spark.executor.extraLibraryPath    启动executor JVM 时要用到的特殊库路径。

 spark.files.userClassPathFirst  false executor在载入类的时候是否优先使用用户自己定义的JAR包,而不是Spark带有的JAR包。此功能能够用于解决Spark依赖包和用户依赖包之间的冲突。

眼下,该属性仅仅是一项试验功能。


C:Shuffle 操作
 属性名称  默认  含义
 spark.shuffle.consolidateFiles  false 假设设置为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说。合并文件能够提高文件系统性能。假设使用的是ext4 或 xfs 文件系统。建议设置为true;对于ext3。因为文件系统的限制,设置为true反而会使内核>8的机器减少性能。
 spark.shuffle.spill  true 假设设置为true,在shuffle期间通过溢出数据到磁盘来减少内存使用总量。溢出阈值是由spark.shuffle.memoryFraction指定的。
 spark.shuffle.spill.compress  true 是否压缩在shuffle期间溢出的数据,假设压缩将使用spark.io.compression.codec。
     
 spark.shuffle.compress  true 是否压缩map输出文件,压缩将使用spark.io.compression.codec。
 spark.shuffle.file.buffer.kb  100 每一个shuffle的文件输出流内存缓冲区大小,以KB为单位。

这些缓冲区能够降低磁盘寻道的次数,也降低创建shuffle中间文件时的系统调用。

 spark.reducer.maxMbInFlight  48 每一个reduce任务同一时候获取map输出的最大大小 (以兆字节为单位)。因为每一个map输出都须要一个缓冲区来接收它。这代表着每一个 reduce 任务有固定的内存开销。所以要设置小点,除非有非常大内存。
   
D:Spark UI
 属性名称  默认  含义
 spark.ui.port  4040 应用程序webUI的port
 spark.ui.retainedStages  1000 在GC之前webUI保留的stage数量
 spark.ui.killEnabled  true 同意在webUI将stage和对应的job杀死
 spark.eventLog.enabled  false 是否记录Spark事件,用于应用程序在完毕后重构webUI。
 spark.eventLog.compress  false 是否压缩记录Spark事件,前提spark.eventLog.enabled为true。
 spark.eventLog.dir  file:///tmp/spark-events 假设spark.eventLog.enabled为 true,该属性为记录spark事件的根文件夹。在此根文件夹中,Spark为每一个应用程序创建分文件夹。并将应用程序的事件记录到在此文件夹中。

用户能够将此属性设置为HDFS文件夹,以便history server读取历史记录文件。

   
E:压缩和序化
 属性名称  默认  含义
 spark.broadcast.compress  true 是否在发送之前压缩广播变量。
 spark.rdd.compress  false 是否压缩序化的RDD分区 ,能够节省大量空间。但会消耗一些额外的CPU时间。
 spark.io.compression.codec  org.apache.spark.io.
LZFCompressionCodec
用于压缩内部数据如 RDD 分区和shuffle输出的解码器Spark提供两个编解码器: org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec。当中,Snappy提供更高速的压缩和解压缩,而LZF提供了更好的压缩比。
 spark.io.compression.snappy
.block.size
 32768 使用Snappy解码器时,解码器使用的块大小 (以字节为单位) 。

 spark.closure.serializer  org.apache.spark.serializer.
JavaSerializer
用于闭包的序化器,眼下仅仅有支持Java序化器。
 spark.serializer.
objectStreamReset
 10000 使用 org.apache.spark.serializer.JavaSerializer序化时。序化器缓存对象以防止写入冗余数据,这时停止这些对象的垃圾收集。通过调用重置序化器,刷新该信息就能够收集旧对象。若要关闭这重定期重置功能将其设置为< = 0 。默认情况下每10000个对象将重置序化器。

 spark.kryo.referenceTracking  true 当使用Kryo序化数据时。是否跟踪对同一对象的引用。假设你的对象图有回路或者同一对象有多个副本。有必要设置为true。其它情况下能够禁用以提高性能。

 spark.kryoserializer.buffer.mb  2 在Kryo 里同意的最大对象大小Kryo会创建一个缓冲区,至少和序化的最大单个对象一样大) 。

假设Kryo 出现缓冲区限制超出异常报错,添加这个值。注意。每一个worker的每一个core仅仅有一个缓冲区。

   
F:运行操作
 属性名称  默认  含义
 spark.default.parallelism 本地模式: 本地机器内核数
Mesos精细模式: 8
其它: 全部executor的core总数
或者2,以较大者为准
假设用户没设置,系统使用集群中执行shuffle操作的默认任务数(groupByKey、 reduceByKey等)。
 spark.broadcast.factory org.apache.spark.broadcast.
HttpBroadcastFactory
广播的实现类
 spark.broadcast.blockSize  4096 TorrentBroadcastFactory块大小(以kb为单位)。太大值在广播时减少并行性 (使速度变慢)。太小值, BlockManager性能可能会受到冲击。
 spark.files.overwrite  false 通过 SparkContext.addFile() 加入的文件在目标中已经存在而且内容不匹配时,是否覆盖目标文件。
 spark.files.fetchTimeout  false 获取由driver通过SparkContext.addFile() 加入的文件时,是否使用通信时间超时。
 spark.storage.memoryFraction  0.6 Java堆用于cache的比例
 spark.tachyonStore.baseDir  System.getProperty("java.io.tmpdir") 用于存储RDD的techyon文件夹,tachyon文件系统的URL由spark.tachyonStore.url设置。

也能够是逗号分隔的多个techyon文件夹。

 spark.storage.
memoryMapThreshold
 8192 以字节为单位的块大小,用于磁盘读取一个块大小进行内存映射。这能够防止Spark在内存映射时使用非常小块,普通情况下,对块进行内存映射的开销接近或低于操作系统的页大小。
 spark.tachyonStore.url  tachyon://localhost:19998 基于techyon文件的URL。
 spark.cleaner.ttl  无限 spark记录不论什么元数据(stages生成、task生成等)的持续时间。定期清理能够确保将超期的元数据遗忘。这在执行长时间任务是非常实用的,如执行24/7的sparkstreaming任务。注意RDD持久化在内存中的超期数据也会被清理。

G:网络通信
 属性名称  默认  含义
 spark.driver.host  本地主机名 执行driver的主机名或 IP 地址。

 spark.driver.port  随机 driver侦听的port。

 spark.akka.frameSize  10 以MB为单位的driver和executor之间通信信息的大小,设置值越大。driver能够接受更大的计算结果。
 spark.akka.threads  4 用于通信的actor线程数,在大型集群中拥有很多其它CPU内核的driver能够添加actor线程数。
 spark.akka.timeout  100 以秒为单位的Spark节点之间通信超时时间。

 spark.akka.heartbeat.pauses  600 以下3个參数是用于设置akka自带的故障探測器,设置非常大值的话。能够停用故障探測器。假设想启用故障探測器。以秒为单位设置这3个參数。

一般是在特殊须要的情况下开启故障探測器。一个敏感的故障探測器有助于恶意的executor的定位,而对于因为GC暂停或网络滞后引起的情况下,不须要开启故障探測器;另外故障探測器的开启会导致因为心跳信息的频繁交换而引起的网络泛滥。
本參数是设置可接受的心跳停顿时间。

 spark.akka.failure-detector.threshold  300.0 相应AKKA的akka.remote.transport-failure-detector.threshold
 spark.akka.heartbeat.interval   1000 心跳间隔时间
  
H:调度
 属性名称  默认  含义
 spark.task.cpus  1 为每一个任务分配的内核数。

 spark.task.maxFailures  4 job放弃task前该task的失败次数,该值>=1
 spark.scheduler.mode  FIFO SparkContext对job进行调度所採用的模式。
对于多用户可採用FAIR模式。

 spark.cores.max  未设置 当应用程序执行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每台机器,而是整个集群)。

假设不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值,而Mesos将使用集群中可用的内核。

 spark.mesos.coarse  false 假设设置为true。在Mesos集群中执行时使用粗粒度共享模式。
 spark.speculation  false 下面几个參数是关于Spark猜測运行机制的相关參数。此參数设定是否使用猜測运行机制,假设设置为true则spark使用猜測运行机制,对于Stage中拖后腿的Task在其它节点中又一次启动,并将最先完毕的Task的计算结果最为终于结果。
 spark.speculation.interval   100 Spark多长时间进行检查task执行状态用以猜測。以毫秒为单位。

 spark.speculation.quantile  0.75 猜測启动前。Stage必需要完毕总Task的百分比。
 spark.speculation.multiplier  1.5 比已完毕Task的执行速度中位数慢多少倍才启用猜測
 spark.locality.wait  3000 下面几个參数是关于Spark数据本地性的相关參数。
本參数是以毫秒为单位启动本地数据task的等待时间,假设超出就启动下一本地优先级别的task。

该设置相同能够应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 随意节点 ),当然,也能够通过spark.locality.wait.node等參数设置不同优先级别的本地性。

 spark.locality.wait.process  spark.locality.wait  本地进程级别的本地等待时间
 spark.locality.wait.node  spark.locality.wait 本地节点级别的本地等待时间
 spark.locality.wait.rack  spark.locality.wait 本地机架级别的本地等待时间
 spark.scheduler.revive.interval   1000 复活又一次获取资源的Task的最长时间间隔(毫秒),发生在Task由于本地资源不足而将资源分配给其它Task执行后进入等待时间,假设这个等待时间内又一次获取足够的资源就继续计算。


I:安全
 属性名称  默认  含义
 spark.authenticate  false Spark是否启用内部身份验证。
 spark.authenticate.secret   设置Spark用于组件之间进行身份验证的密钥。假设不是YARN上执行而且spark.authenticate为true时,须要设置密钥。

 spark.core.connection.
auth.wait.timeout
 30 Spark用于组件时间进行身份认证的超时时间。

 spark.ui.filters   Spark web UI 要使用的以逗号分隔的筛选器名称列表。

筛选器要符合javax servlet Filter标准,每一个筛选器的參数能够通过设置java系统属性来指定:
spark.<class name of filter>.params='param1=value1,param2=value2'
比如:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params='param1=foo,param2=testing'

 spark.ui.acls.enable  false Spark webUI存取权限是否启用。

假设启用。在用户浏览web界面的时候会检查用户是否有訪问权限。

 spark.ui.view.acls   以逗号分隔Spark webUI訪问用户的列表。默认情况下仅仅有启动Spark job的用户才有訪问权限。

   
J:Spark Streaming
 属性名称  默认  含义
 spark.streaming.blockInterval   200 在时间间隔内(毫秒)Spark Streaming接收器将接收数据合并成数据块并存储在Spark。
 spark.streaming.unpersist  true 假设设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,相同的,SparkStreaming接收的原始输入数据也会自己主动被清理;假设设置为false,则同意原始输入数据和持久化的RDD数据可被外部的Streaming应用程序訪问,由于这些数据不会自己主动清理。
   
3:集群特有的属性
A:Standalone特有属性
Standalone还能够通过环境变量文件conf/spark-env.sh来设置属性,相关的配置项是:
  • SPARK_MASTER_OPTS  配置master使用的属性
  • SPARK_WORKER_OPTS  配置worker使用的属性
  • SPARK_DAEMON_JAVA_OPTS  配置master和work都使用的属性
配置的时候,使用类似的语句:

export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"

# - 当中x代表属性,y代表属性值

当中SPARK_MASTER_OPTS所支持的属性有:
 属性名称  默认  含义
 spark.deploy.spreadOut   true Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性。后者对于计算密集型工作负载更有效
 spark.deploy.defaultCores   无限 假设没有设置spark.cores.max,该參数设置Standalone集群分配给应用程序的最大内核数,假设不设置,应用程序获取全部的有效内核。

注意在一个共享的集群中。设置一个低值防止攫取了全部的内核,影响他人的使用。

 spark.worker.timeout  60 master由于没有收到心跳信息而觉得worker丢失的时间(秒)
   
当中SPARK_WORKER_OPTS所支持的属性有:
 属性名称  默认  含义
 spark.worker.cleanup.enabled   false 是否定期清理worker的应用程序工作文件夹。仅仅适用于Standalone模式,不适用于YARN模式。清理的时候将无视应用程序是否在执行。
 spark.worker.cleanup.interval  1800  清理worker本地过期的应用程序工作文件夹的时间间隔(秒)
 spark.worker.cleanup.appDataTtl   7*24*3600  worker保留应用程序工作文件夹的有效时间。

该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定。

   
当中SPARK_DAEMON_JAVA_OPTS所支持的属性有:
 属性名称  含义
 spark.deploy.recoveryMode 以下3个參数是用于配置zookeeper模式的master HA。


设置为ZOOKEEPER表示启用master备用恢复模式,默觉得NONE。

 spark.deploy.zookeeper.url zookeeper集群URL
 spark.deploy.zookeeper.dir zooKeeper保存恢复状态的文件夹。缺省为/spark
 spark.deploy.recoveryMode 设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE
 spark.deploy.recoveryDirectory Spark保存恢复状态的文件夹

B:YARN特有属性
YARN特有属性的配置,应该是支持SparkConf方式和conf/spark-defaults.conf文件配置方式,

 属性名称  默认  含义
 spark.yarn.applicationMaster.waitTries  10 RM等待Spark AppMaster启动次数,也就是SparkContext初始化次数。超过这个数值,启动失败。
 spark.yarn.submit.file.replication  3 应用程序上载到HDFS的文件的复制因子
 spark.yarn.preserve.staging.files  false 设置为true,在job结束后,将stage相关的文件保留而不是删除。

 spark.yarn.scheduler.heartbeat.interval-ms  5000 Spark AppMaster发送心跳信息给YARN RM的时间间隔
 spark.yarn.max.executor.failures  2倍于executor数 导致应用程序宣告失败的最大executor失败数
 spark.yarn.historyServer.address  无 Spark history server的地址(不要加http://)。这个地址会在Spark应用程序完毕后提交给YARN RM,然后RM将信息从RM UI写到history server UI上。

   
原文地址:https://www.cnblogs.com/mengfanrong/p/5153196.html