Spark的动态资源分配

跑spark程序的时候,公司服务器需要排队等资源,参考一些设置,之前不知道,跑的很慢,懂得设置之后简直直接起飞。

简单粗暴上设置代码:

 1 def conf(self):
 2      conf = super(TbtestStatisBase, self).conf
 3      conf.update({
 4             'spark.shuffle.service.enabled': 'true',
 5             'spark.dynamicAllocation.enabled': 'false',
 6             'spark.dynamicAllocation.initialExecutors': 50,
 7             'spark.dynamicAllocation.minExecutors': 1,
 8             'spark.dynamicAllocation.maxExecutors': 125,
 9             'spark.sql.parquet.compression.codec': 'snappy',
10             'spark.yarn.executor.memoryOverhead': 4096,
11             "spark.speculation": 'true',
12             'spark.kryoserializer.buffer.max': '512m',
13       })

一小部分设置。简单解析一下:

1、spark.shuffle.service.enabled。用来设置是否开启动态分配。开启了动态分配的Application在申请资源的时候默认会拥有更高的优先级

2、spark.dynamicAllocation.initialExecutors (默认下是3)

   spark.dynamicAllocation.minExecutors (默认下是0)

     spark.dynamicAllocation.maxExecutors (默认下是30)

Executor应该是所谓资源单位,自己理解为越多执行越快嘛,如果是Yarn的话,就是Containers,一个道理  

3、spark.yarn.executor.memoryOverhead 是设置堆外内存大小,和 executor_memory 做个对比:

  ExecutorMemoryJVM进程的JAVA堆区域

  MemoryOverheadJVM进程中除Java以外占用的空间大小,包括方法区(永久代)、Java虚拟机栈、本地方法栈、JVM进程本身所用的内存、直接内存(Direct Memory等。

  两者关系:如果用于存储RDD的空间不足,先存储的RDD的分区会被后存储的覆盖。当需要使用丢失分区的数据时,丢失的数据会被重新计算。ExecutorMemory + MemoryOverhead之和(JVM进程总内存)                             

     我只是简单理解堆外内存为一个备用区域吧,还不知道具体什么作用。有遇到内存不够报错的情况,然后调大了MemoryOverhead。

4、理论上:非动态分配情况下,我们必须要等到有100个Executor才能运行Application,并且这100个会一直被占用到程序结束,即便只有一个任务运行了很长时间。动态分配情况下,当有10个Executor的时候,我们的Application就开始运行了,并且我们后续可以继续申请资源,最多申请到100个Executor,当我们有空闲资源的时候,我们可以被释放资源到最少只保留10个Executor,当需要的时候我们有更高的优先级从YARN那儿拿到资源。

但是!

5、用了之后简直起飞。。公司服务器好像根本不存在动态这回事,总是只给几个executor,虽然开始是很快,但执行过程很慢,所以我放弃了,不动态调整了,直接设置死 num_executors = 90。。。然后:

发现一般资源还是挺富裕的嘛,写了90个 也不用等很久。为什么动态一直不肯分给我。。。好多资源,瞬间2个小时缩短为20分钟。

 

原文地址:https://www.cnblogs.com/qingjiaowoyc/p/7118554.html