Spark Job 内存的分配管理

可以申请的最大内存

启动 Spark Job 之前要先检查需要的内存是否太大,这部分代码在

// resource-managersyarnsrcmainscalaorgapachesparkdeployyarnClient.scala

  private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    logInfo("Verifying our application has not requested more than the maximum " +
      s"memory capability of the cluster ($maxMem MB per container)")
    val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
    if (executorMem > maxMem) {
      throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
        s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
        s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
        s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
    }
    val amMem = amMemory + amMemoryOverhead
    if (amMem > maxMem) {
      throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
        "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
        "'yarn.nodemanager.resource.memory-mb'.")
    }
    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
      amMem,
      amMemoryOverhead))

    // We could add checks to make sure the entire cluster has enough resources but that involves
    // getting all the node reports and computing ourselves.
  }

executorMemory 就是 spark-submit 指定的 executor memory 大小,就是 java 的 -Xmx 参数
executorMemoryOverhead 是堆外内存,可以通过 spark-submit 指定,默认 executorMemory * 0.1 或是 384M
pysparkWorkerMemory 是启动 JVM 前运行的 Python 代码需要的内存,一般不设置,默认是 0

  val MEMORY_OVERHEAD_FACTOR = 0.10
  val MEMORY_OVERHEAD_MIN = 384L

  private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)

  private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt

  private val pysparkWorkerMemory: Int = if (isPython) {
    sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
  } else {
    0
  }

amMemory 就是 spark-submit 指定的 Driver 的 memory 大小,就是 java 的 -Xmx 参数
amMemoryOverhead 是 Driver 的堆外内存,默认 amMemory * 0.1 或是 384M

而 maxMem 就是 Yarn 可以分配的最大内存,主要取决于两个配置

yarn.scheduler.maximum-allocation-mb     # yarn 可以分配给 container 的最大内存
yarn.nodemanager.resource.memory-mb      # yarn 可以使用的总内存

申请的内存必须满足

(executor 堆内内存 + executor 堆外内存 + executor python 内存) < (Yarn 可分配的 container 内存)
(driver 堆内内存 + driver 堆外内存) < (Yarn 可分配的 container 内存)

这就是 Spark Job 可以申请的最大内存

默认堆外内存是不开启的,可以通过 spark.memory.offHeap.enabled 参数开启

可以申请的最小内存

coresrcmainscalaorgapachesparkmemoryUnifiedMemoryManager.scala

  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

systemMemory 等于指定的 driver memory
executorMemory 等于指定的 executor memory

这两个值不得小于 reservedMemory * 1.5 = 450M

堆内内存的划分

指定的堆内内存可以划分为几个部分

  • 预留内存(Reserved):用于存储 Spark 内部对象,大小固定为 300M
  • 用户内存(User):存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息
  • 存储内存(Storage):主要用于存储 spark 的 cache 数据,例如 RDD 的缓存、unroll 数据
  • 执行内存(Execution):主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

大小分别为

  • 预留内存 = 300M
  • 用户内存 = (堆内内存 - 预留内存) * (1 - spark.memory.fraction)
  • 存储内存 = (堆内内存 - 预留内存) * spark.memory.fraction * spark.memory.storageFraction
  • 执行内存 = (堆内内存 - 预留内存) * spark.memory.fraction * (1 - spark.memory.storageFraction)

其中
spark.memory.fraction 默认是 0.6
spark.memory.storageFraction 默认是 0.5

其中,存储内存和执行内存,在自己空间不足而对方还有空间时,可以占用对方空间

可以看到 spark-submit 指定的内存,必须比数据大,并不是所有内存都用于存储数据



原文地址:https://www.cnblogs.com/moonlight-lin/p/13894508.html