spark的内存分配管理

SPARK的内存管理器

StaticMemoryManager,UnifiedMemoryManager

1.6以后默认是UnifiedMemoryManager.

这个内存管理器在sparkContext中通过SparnEnv.create函数来创建SparkEnv的实例时,会生成.

通过spark.memory.useLegacyMode配置,能够控制选择的内存管理器实例.

假设设置为true时,选择的实例为StaticMemoryManager实例,否则选择UnifiedMemoryManager实例.默认情况下这个值为false.

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode"false)
val memoryManager: MemoryManager =
  if (useLegacyMemoryManager) {
    new StaticMemoryManager(confnumUsableCores)
  } else {
    UnifiedMemoryManager(confnumUsableCores)
  }

 

UnifiedMemoryManager

这个实例生成时,最大内存的得到方法:

1,依据当前JVM的启动内存,减去300MB,

   这个300MB能够通过spark.testing.reservedMemory配置得到.

2,最大内存值,通过1计算出来的内存值,与spark.memory.fraction配置的系数进行相乘.默认是0.75.

演示样例:假设JVM配置的内存为1GB,那么可使用的最大内存为(1GB-300MB)*0.75

须要的配置项:

配置项spark.memory.fraction,默认值0.75,这个配置用于配置当前的内存管理器的最大内存使用比例.

配置项spark.memory.storageFraction,默认值0.5,这个配置用于配置rdd的storage与cache的默认分配的内存池大小.

配置项spark.memory.offHeap.size,默认值0,这个配置用于配置非堆内存的大小,默认不启用.这个不做分析.

 

在实例生成后,默认会依据storage的内存权重,总内存减去storage的内存权重,生成两个内存池storageMemoryPoolonHeapExecutionMemoryPool.

 

onHeapExecutionMemoryPool用于在运行executor的shuffle操作时,使用的内存,

storageMemoryPool用于在运行rdd的cache操作时,使用的内存.

 

Executor运行时的内存分配

这个操作一般是在task运行shuffle操作时,计算spill时,在内存中的CACHE时使用的内存.通过调用实例中的acquireExecutionMemory函数来申请内存.

override private[memory] def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long = synchronized {

这个函数传入的memoryMode可选择是使用堆内存还是直接使用本地内存,默认是使用堆内存.
  assert(onHeapExecutionMemoryPool.poolSize + 

       storageMemoryPool.poolSize == maxMemory)
  assert(numBytes >= 0)
  memoryMode match {
    case MemoryMode.ON_HEAP =>

这里定义的这个函数,用于推断numBytes(须要申请的内存大小)减去当前内存池中可用的内存大小是否够用,假设不够用,这个函数的传入值是一个正数
      def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
        if (extraMemoryNeeded > 0) {

这里依据当前的rdd的cache中的内存池的大小,减去配置的storage的存储大小,与当前storage的内存池中的可用大小,取最大值出来,这个值表示是一个可用于回收的内存资源.
          val memoryReclaimableFromStorage =
            math.max(storageMemoryPool.memoryFree

            storageMemoryPool.poolSize - storageRegionSize)
          if (memoryReclaimableFromStorage > 0) {

首先依据计算出来的storage中能够进行回收的资源,通过StorageMemoryPool进行资源的释放.得到一个完毕释放的资源大小.这里依据executor中task须要的内存与storage可回收的资源取最小值进行资源的回收.把得到的可用资源加入到executor的内存池中.
            // Only reclaim as much space as is necessary and available:
            val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
              math.min(extraMemoryNeededmemoryReclaimableFromStorage))
            onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
          }
        }
      }
这个函数用于计算executor的内存池能够使用的最大内存大小.最小能够使用总内存减去storage的配置权重,也就是默认情况下,shuffle的executor的内存最小能够使用0.5的权重的内存.
      def computeMaxExecutionPoolSize(): Long = {
        maxMemory - math.min(storageMemoryUsedstorageRegionSize)
      }
运行内存的分配操作.
      onHeapExecutionMemoryPool.acquireMemory(
        numBytestaskAttemptIdmaybeGrowExecutionPool

        computeMaxExecutionPoolSize)

    case MemoryMode.OFF_HEAP =>
      offHeapExecutionMemoryPool.acquireMemory(numBytestaskAttemptId)
  }
}

 

给executor中的task分配须要的内存:

private[memory] def acquireMemory(
    numBytes: Long,
    taskAttemptId: Long,
    maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
    computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
  assert(numBytes > 0s"invalid number of bytes requested: $numBytes")

  // TODO: clean up this clunky method signature

  if (!memoryForTask.contains(taskAttemptId)) {

这里首先检查这个task是否在memoryForTask中存在,假设不存在时,表示这个task是第一次申请内存,在这个集合中设置此task的当前使用内存为0,并唤醒全部的当前的executor的等待的task.
    memoryForTask(taskAttemptId) = 0L
    // This will later cause waiting tasks to wake up and check numTasks again
    lock.notifyAll()
  }

  // TODO: simplify this to limit each task to its own slot
  while (true) {

运行内存的分配操作,这个操作会一直进行迭代,直到满足一定的条件.

首先得到当前的executor中有申请内存的task的个数,并得到当前的task的使用内存量.
    val numActiveTasks = memoryForTask.keys.size
    val curMem = memoryForTask(taskAttemptId)

计算出须要申请的内存与当前内存池中的内存,是否须要对storage中的内存进行回收.假设须要申请的内存大于了当前内存池的内存,这个參数传入为一个大于0的数,这个时候会对storage的内存进行回收.
    maybeGrowPool(numBytes - memoryFree)
这里计算出executor的内存值能够使用的最大内存,默认情况下,最小可使用内存为总内存减去storage的配置内存.也就是默认可使用50%的内存.
    val maxPoolSize = computeMaxPoolSize()

这里计算出每一个task平均可使用的最大内存大小,与最小内存大小.

如:有5个task,可使用100MB的内存,那么最大可使用的内存为20MB,最小可使用的内存为10MB.
    val maxMemoryPerTask = maxPoolSize / numActiveTasks
    val minMemoryPerTask = poolSize / (* numActiveTasks)
这里计算出当前可申请的内存.可以申请的内存总量不能超过平均每一个task使用内存的平均大小.
    // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
    val maxToGrant = math.min(numBytesmath.max(0maxMemoryPerTask - curMem))
   
    val toGrant = math.min(maxToGrantmemoryFree)

这里控制迭代能否够跳出的条件.假设可申请的内存小于须要申请的内存,同一时候当前task使用的内存加上可申请的内存小于每一个task平均使用的内存时,这个申请操作会wait住.等待其他的task资源回收时进行唤醒.否则跳出迭代,返回可申请的内存.
    if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
      logInfo(s"TID $taskAttemptId waiting for at least 1/2N of 

            $poolName pool to be free")
      lock.wait()
    } else {
      memoryForTask(taskAttemptId) += toGrant
      return toGrant
    }
  }
  0L  // Never reached
}

 

BLOCKCACHE时的内存分配

在运行rdd的iterator操作时,假设对rdd运行过cache或者persist的操作时,也就是storage的级别不是none时,会对数据进行cache操作.在cache后的block中有一个超时时间,这个超时时间在blockManager中通过一个定时器,会定时去删除cache的block与的Broadcast数据.

假设是BLOCK的CACHE的超时,可通过spark.cleaner.ttl.BLOCK_MANAGER配置.

在对RDD运行cache操作时,终于会调用内存管理器中的acquireStorageMemory函数来进行操作.

override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    evictedBlocks: mutable.Buffer[(BlockIdBlockStatus)])

Boolean = synchronized {

这个传入的參数中,evictedBlocks是一个用于返回的传入參数,这个集合中表示进行这次申请后,被淘汰掉的block的信息.
  assert(onHeapExecutionMemoryPool.poolSize 

      + storageMemoryPool.poolSize == maxMemory)
  assert(numBytes >= 0)

 

假设当前的BLOCK的的CACHE的大小已经超过了当前可用的内存总量(总内存减去executor的使用内存)时,直接返回false,表示不做内存分配.申请的内存太多,不处理了.
  if (numBytes > maxStorageMemory) {
    // Fail fast if the block simply won't fit
    logInfo(s"Will not store $blockId as the required space 

      ($numBytes bytes) exceeds our " +
      s"memory limit ($maxStorageMemory bytes)")
    return false
  }

假设当前申请的block须要的内存大小超过了当前storage的内存池可用的内存大小时,在executor的内存池中回收部分资源,原则是假设申请的内存小于executor内存池可用的内存,回收申请的大小,否则回收executor全部的可用内存.并运行内存的分配操作.
  if (numBytes > storageMemoryPool.memoryFree) {
    // There is not enough free memory in the storage pool, 

//    so try to borrow free memory from
    // the execution pool.
    val memoryBorrowedFromExecution = 

     Math.min(onHeapExecutionMemoryPool.memoryFreenumBytes)
    onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
    storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
  }
  storageMemoryPool.acquireMemory(blockIdnumBytesevictedBlocks)
}

 

在StorageMemoryPool中运行block的内存分配:

def acquireMemory(
    blockId: BlockId,
    numBytesToAcquire: Long,
    numBytesToFree: Long,
    evictedBlocks: mutable.Buffer[(BlockIdBlockStatus)])

Boolean = lock.synchronized {

第二个參数是须要申请的内存,第三个參数假设是0表示可用内存大于申请内存,大于0表示可用内存不够用.
  assert(numBytesToAcquire >= 0)
  assert(numBytesToFree >= 0)
  assert(memoryUsed <= poolSize)

 

这里,假设可用的内存不够用时,通过MemoryStore中的evictBlocksToFreeSpace函数来对当前的cache进行淘汰.
  if (numBytesToFree > 0) {
    memoryStore.evictBlocksToFreeSpace(Some(blockId)numBytesToFree

         evictedBlocks)
    // Register evicted blocks, if any, with the active task metrics
    Option(TaskContext.get()).foreach { tc =>
      val metrics = tc.taskMetrics()
      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId,

           BlockStatus)]())
      metrics.updatedBlocks Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    }
  }

 

假设完毕资源的回收后,当前可用的内存大于要申请的内存,表示申请成功,返回的值为true,否则为false.
  // NOTE: If the memory store evicts blocks,

  //   then those evictions will synchronously call
  // back into this StorageMemoryPool in order to free memory. Therefore,

  //     these variables
  // should have been updated.
  val enoughMemory = numBytesToAcquire <= memoryFree
  if (enoughMemory) {
    _memoryUsed += numBytesToAcquire
  }
  enoughMemory
}

 

Blockcache的淘汰

在executor的内存池不够使用时,或者总的内存不够使用时,会运行storage的内存池的资源回收操作.由shrinkPoolToFreeSpace函数,这个函数通过调用MemoryStorage中的evictBlocksToFreeSpace函数来进行block的淘汰(假设是对block的cache时,申请内存不够时会直接调用这个函数来淘汰老的block)

 

shrinkPoolToFreeSpace函数用于在executor的内存不够时,须要storage的内存池释放资源给executor使用时调用.

这个过程中,可给executor提供的内存分为五种可能:

1,storage默认的内存空间还没有使用完毕,同一时候executor须要的空间小于等于storage的内存池的可用空间,直接在storage的内存池中释放须要的大小.

2,storage默认的内存空间还没有使用完毕,同一时候executor须要的空间大于storage的内存池的可用空间,这个时候storage的可用空间所有进行释放,但这个时候不会做block的淘汰操作.

3,storage的默认的内存空间使用完毕,这个时候storage的内存池比默认的storage的配置权重要多,同一时候executor须要申请的内存小于多出的部分,对storage内存池中的block进行淘汰直到够executor的申请内存结束,这个时候storage的使用内存还是大于storage的默认配置权重大小.

4,storage的默认的内存空间使用完毕,这个时候storage的内存池比默认的storage的配置权重要多,同一时候executor须要申请的内存大于或等于多出的部分,对storage内存池中的block进行淘汰直到但最多仅仅淘汰到storage的配置权重大小就结束淘汰.

5,storage刚好使用到了配置的权重,无法进行分配.

 

def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {

首先依据须要释放的内存,1=executor申请的内存,2-1=storage内存池可用的内存,2-2=storage中占用的内存大于默认给storage分配的权重.

这里依据这个要释放的资源与内存池可用的资源取最小值进行释放,假设申请的小于可用的,不会对block进行淘汰操作,否则对block进行淘汰操作,直接淘汰到可用的内存空间结束.
  // First, shrink the pool by reclaiming free memory:
  val spaceFreedByReleasingUnusedMemory = math.min(spaceToFreememoryFree)
  decrementPoolSize(spaceFreedByReleasingUnusedMemory)
  val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
  if (remainingSpaceToFree > 0) {
    // If reclaiming free memory did not adequately shrink the pool,

    //      begin evicting blocks:
    val evictedBlocks = new ArrayBuffer[(BlockIdBlockStatus)]
    memoryStore.evictBlocksToFreeSpace(NoneremainingSpaceToFreeevictedBlocks)
    val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
    // When a block is released, BlockManager.dropFromMemory() 

//      calls releaseMemory(), so we do
    // not need to decrement _memoryUsed here. However, we do need to decrement the 

//      pool size.
    decrementPoolSize(spaceFreedByEviction)
    spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
  } else {
    spaceFreedByReleasingUnusedMemory
  }
}

 

evictBlocksToFreeSpace函数这个函数用于对storage的内存空间中释放掉部分block的存储空间的函数,由MemoryStorage进行实现.

这个函数的三个传入參数中:

第一个在block的cache时,会传入blockid,假设是executor要求释放时,传入为None,这个參数用于控制释放的资源,假设传入了blockid,那么这个block相应的rdd的全部的全部的CACHE都会被保留,仅仅释放其他的RDD相应的BLOCK的CACHE,假设传入为None时,不区分BLOCK,从头開始迭代,直接释放到须要的内存大小结束.

第二个是须要释放的内存大小.

第三个參数是释放后的block的集合,这个集合内容就是从内存中淘汰出去的block.

private[spark] def evictBlocksToFreeSpace(
    blockId: Option[BlockId],
    space: Long,
    droppedBlocks: mutable.Buffer[(BlockIdBlockStatus)]): Boolean = {
  assert(space > 0)
  memoryManager.synchronized {
    var freedMemory = 0L

这里得到传入的blockid相应的rdd.假设传入的blockId是None时,这个rdd也就不存在.
    val rddToAdd = blockId.flatMap(getRddId)
    val selectedBlocks = new ArrayBuffer[BlockId]
    // This is synchronized to ensure that the set of entries is not changed
    // (because of getValue or getBytes) while traversing the iterator, as that
    // can lead to exceptions.
    entries.synchronized {
      val iterator = entries.entrySet().iterator()

这里从全部的cache的block中进行迭代,假设迭代的block的rdd不是如今须要cache的block相应的rdd时(传入的blockId相应的RDD),就选择这个block.并释放内存大小.
      while (freedMemory < space && iterator.hasNext) {
        val pair = iterator.next()
        val blockId = pair.getKey
        if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
          selectedBlocks += blockId
          freedMemory += pair.getValue.size
        }
      }
    }

    if (freedMemory >= space) {
      logInfo(s"${selectedBlocks.size} blocks selected for dropping")
      for (blockId <- selectedBlocks) {

这里对选择的block,通过blockManager释放掉block的cache占用的内存.假设这个block的cache的级别中包括有disk的级别时,释放掉内存的同一时候会把这个cache的数据写入到磁盘中.

把运行释放后的block的集合加入到传入參数的droppedBlocks的集合參数中,用于数据的返回.


        val entry = entries.synchronized { entries.get(blockId) }
        // This should never be null as only one task should be dropping
        // blocks and removing entries. However the check is still here for
        // future safety.
        if (entry != null) {
          val data = if (entry.deserialized) {
            Left(entry.value.asInstanceOf[Array[Any]])
          } else {
            Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
          }
          val droppedBlockStatus = blockManager.dropFromMemory(blockIddata)
          droppedBlockStatus.foreach { status => droppedBlocks += 

                   ((blockIdstatus)) }
        }
      }
      true
    else {
      blockId.foreach { id =>
        logInfo(s"Will not store $id as it would require dropping another block " +
          "from the same RDD")
      }
      false
    }
  }
}

StaticMemoryManager

这个实例在1.6的环境下,须要把配置项spark.memory.useLegacyMode设置为true,才会被启用.以下首先先看看这个实例生成时的处理:

须要的配置项:

1,配置项spark.shuffle.memoryFraction,用于设置executor的shuffle操作可使用的内存,默认占总内存的0.2.

2,配置项spark.shuffle.safetyFraction,用于设置executor的shuffle的安全操作内存,默认占1配置内存的0.8.

3,配置项spark.storage.memoryFraction,用于设置block cache的使用内存,默认占总内存的0.6;

4,配置项spark.storage.safetyFraction,用于设置block cache的安全使用内存,默认占3配置内存的0.9;

5,配置项spark.storage.unrollFraction,默认值是storage内存总大于的0.2;这个有于在storage中cache的block的数据的反序列化时数据的展开使用空间.

 

def this(conf: SparkConfnumCores: Int) {
  this(
    conf,
    StaticMemoryManager.getMaxExecutionMemory(conf),
    StaticMemoryManager.getMaxStorageMemory(conf),
    numCores)
}

Executor运行时的内存分配

StaticMemoryManager,executor中的shuffle的内存运行分配这块事实上并没有统一内存管理中那么麻烦,仅仅是在分配的固定大小的存储空间中进行分配,假设无法再进行分配时,这个分配函数返回的分配量就是0.

private[memory] override def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long = synchronized {

 

OFF_HEAP的模式这里就不分析了,我在代码里没发现有地方去调用,好像申请内存时,是直接写死的ON_HEAP的模式.在这个地方,不会考虑executor的内存池中的内存是否够用,直接通过ExecutionMemoryPool内存池实例中的分配内存函数进行内存的分配 .


  memoryMode match {
    case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes,

         taskAttemptId)
    case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes

         taskAttemptId)
  }
}

内存分配部分的代码实现:这个部分与统一内存管理部分是一样的,

private[memory] def acquireMemory(
    numBytes: Long,
    taskAttemptId: Long,
    maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
    computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
  assert(numBytes > 0s"invalid number of bytes requested: $numBytes")

  // TODO: clean up this clunky method signature

首先假设说task是第一次申请内存,加入这个task到内存池的集合属性中,并把这个task的使用内存设置为0.  
  if (!memoryForTask.contains(taskAttemptId)) {
    memoryForTask(taskAttemptId) = 0L
    // This will later cause waiting tasks to wake up and check numTasks again
    lock.notifyAll()
  }

  以下開始迭代进行内存的分配.加上while的目的是为了保持假设task的分配内存达不到指定的大小时,就一直等待分配,直到达到指定的大小.
  // TODO: simplify this to limit each task to its own slot
  while (true) {
    val numActiveTasks = memoryForTask.keys.size
    val curMem = memoryForTask(taskAttemptId)

    

在这里,这个函数是一个空的实现,什么都不会做.
    maybeGrowPool(numBytes - memoryFree)

    

这个函数得到的值就是当前的executor的内存池的poolsize的大小.
    val maxPoolSize = computeMaxPoolSize()

依据当前的活动的task的个数计算出每一个task可使用的最大内存,每一个task使用的最小内存为最大内存除以2(假设申请的内存本身小于这个最小内存除外).
    val maxMemoryPerTask = maxPoolSize / numActiveTasks
    val minMemoryPerTask = poolSize / (* numActiveTasks)

计算出这次须要分配的内存,假设申请的内存小于可用的内存时,取申请内存,否则取这个task可申请的最大内存
    // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
    val maxToGrant = math.min(numBytesmath.max(0maxMemoryPerTask - curMem))
  

这里计算出来的值依据当前原则上能够申请的内存与当前内存池中的可用内存取最小值.
    val toGrant = math.min(maxToGrantmemoryFree)

这里有一个线程wait的条件,假设这一次申请的内存小于须要申请的内存,同一时候当前的task的使用内存小于最小的使用内存时,线程wait,等待其他的task释放内存或者有新的task增加来唤醒此wait.
    // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
    // if we can't give it this much now, wait for other tasks to free up memory
    // (this happens if older tasks allocated lots of memory before N grew)
    if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
      logInfo(s"TID $taskAttemptId waiting for at least 1/2N of 

           $poolName pool to be free")
      lock.wait()
    } else {
      memoryForTask(taskAttemptId) += toGrant
      return toGrant
    }
  }
  0L  // Never reached
}

 

Storage的展开内存分配

这里说明下,在UnifiedMemoryManager中展开内存的分配与stroage中block cache的内存分配共用同样的内存空间,因此申请方法与storage的block cache的内存分配同样,而在static的分配中,不同的区块,所使用的内存空间都是固定的,因此这里须要独立说明一下.

在对MemoryStorage中运行block的cache操作时,会运行pubInterator等操作,会先依据block中的数据申请相应数据大小的展开内存空间,把数据进行提取,然后才会运行storage的cache操作的内存分配.

运行流程,在MemoryStorage中:

1,putIterator函数运行,对block进行cache

2,unrollSafely函数运行,申请展开内存,依据block的内容大小.

3,释放申请的展开内存,并申请block cache内存,运行putArray->tryToPut函数.

 

看看unrollSafely函数怎样处理展开内存的申请:

这里先配置项spark.storage.unrollMemoryThreshold,默认值是1MB,先申请固定大小的展开内存,这个函数返回的值是一个true/false的值,true表示申请成功.这个函数调用内存管理器中的acquireUnrollMemory函数.这里申请到的内存大小会先记录到unrollMemoryMap集合中依据相应的taskid.

keepUnrolling = reserveUnrollMemoryForThisTask(blockIdinitialMemoryThreshold,

      droppedBlocks)

 

接下来,迭代block中的数据,把数据加入到vector的展开暂时变量中.

while (values.hasNext && keepUnrolling) {
  vector += values.next()
  if (elementsUnrolled % memoryCheckPeriod == 0) {
    // If our vector's size has exceeded the threshold, request more memory
    val currentSize = vector.estimateSize()
    if (currentSize >= memoryThreshold) {

这里每次申请当前的使用内存的一半做为展开内存,这个展开内存伴随着block的数据越多,申请的量也会越大.
      val amountToRequest = (currentSize * memoryGrowthFactor - 

            memoryThreshold).toLong
      keepUnrolling = reserveUnrollMemoryForThisTask(
        blockIdamountToRequestdroppedBlocks)
      // New threshold is currentSize * memoryGrowthFactor
      memoryThreshold += amountToRequest
    }
  }
  elementsUnrolled += 1
}

这里的推断比較关键,假设keepUnrolling的值为true,表示内存可以安全展开这个block的数据,否则表示不能展开这个block的内容.
if (keepUnrolling) {
  // We successfully unrolled the entirety of this block
  Left(vector.toArray)
else {
  // We ran out of space while unrolling the values for this block
  logUnrollFailureMessage(blockIdvector.estimateSize())
  Right(vector.iterator ++ values)
}

 

if (keepUnrolling) {
  val taskAttemptId = currentTaskAttemptId()
  memoryManager.synchronized {

这里假设内存可以安全展开当前的block,把这个block的展开内存存储到pendingUnrollMemoryMap的集合中相应此task的位置.
    // Since we continue to hold onto the array until we actually cache it, we cannot
    // release the unroll memory yet. Instead, we transfer it to pending unroll memory
    // so `tryToPut` can further transfer it to normal storage memory later.
    // TODO: we can probably express this without pending unroll memory (SPARK-10907)
    val amountToTransferToPending = currentUnrollMemoryForThisTask - 

       previousMemoryReserved
    unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
    pendingUnrollMemoryMap(taskAttemptId) =
      pendingUnrollMemoryMap.getOrElse(taskAttemptId0L) + 

      amountToTransferToPending
  }
}

 

提示:关于展开内存的释放部分,假设block的内容可以被安全展开存储到内存中时,这个时候,在做block的storage的操作时,会释放掉展开内存的空间(在pendingUnrollMemoryMap集合中),假设内存不可以安全展开block的内容时,这个时候无法进行block的cache操作(可能会写磁盘),这时申请的内容大小存储在unrollMemoryMap合中,这时因为不会运行block的memory的cache操作,因此这个集合中占用的内存大小临时不会被回收,仅仅有等到这个task结束时,占用的unrollMemoryMap合中的内存才会被回收.

...

 

接下来看看在StaticMemoryManager中怎样处理展开内存的分配:

override def acquireUnrollMemory(
    blockId: BlockId,
    numBytes: Long,
    evictedBlocks: mutable.Buffer[(BlockIdBlockStatus)])

Boolean = synchronized {
  val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
  val freeMemory = storageMemoryPool.memoryFree

这里依据可用的最大展开内存与当前正在使用中的展开内存,计算出能够申请的最大展开内存,假设这里得到的值是一个0时,表示不须要释放block cache的内存,假设是一个大于0的值,就表示须要释放BLOCK CACHE的内存.
  // When unrolling, we will use all of the existing free memory, and, if necessary,
  // some extra space freed from evicting cached blocks. We must place a cap on the
  // amount of memory to be evicted by unrolling, however, otherwise unrolling one
  // big block can blow away the entire cache.
  val maxNumBytesToFree = math.max(0maxUnrollMemory - currentUnrollMemory - 

     freeMemory)

这里计算出须要释放的内存,取申请的资源与能够使用的unroll内存资源的最小值,假设这个一个大于0的值,表示须要从storage的内存池中释放这么多的内存出来.
  // Keep it within the range 0 <= X <= maxNumBytesToFree
  val numBytesToFree = math.max(0math.min(maxNumBytesToFree

     numBytes - freeMemory))
  storageMemoryPool.acquireMemory(blockIdnumBytesnumBytesToFree

     evictedBlocks)
}

 

Storageblock cache的内存分配

在block使用了memory的storage时,同一时候block的内容可以被展开内存存储起来时,会通过MemoryStorage中相应的函数来向StaticMemoryManager中的acquireStorageMemory函数申请内存资源.

override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    evictedBlocks: mutable.Buffer[(BlockIdBlockStatus)])

Boolean = synchronized {
  if (numBytes > maxStorageMemory) {

假设一个block的内容太大,已经超过了配置的storage的存储空间大小,这个block不做cache.
    // Fail fast if the block simply won't fit
    logInfo(s"Will not store $blockId as the required space 

      ($numBytes bytes) exceeds our " +
      s"memory limit ($maxStorageMemory bytes)")
    false
  else {

否则通过storage的内存池运行block的cache的内存申请,这个过程中假设内存不够用时,会释放老的block的cache相应的内存空间,也就是会淘汰掉老的block cache,
    storageMemoryPool.acquireMemory(blockIdnumBytesevictedBlocks)
  }
}

 

在storage的内存池中处理block cache的内存申请:

def acquireMemory(
    blockId: BlockId,
    numBytesToAcquire: Long,
    numBytesToFree: Long,
    evictedBlocks: mutable.Buffer[(BlockIdBlockStatus)])

Boolean = lock.synchronized {

这个函数的传入參数中,numBytesToAcquire表示须要申请的内存大小,numBytesToFree假设是一个大于0的值,表示如今内存池中的内存空间不够,须要淘汰现有的block的cache.
  assert(numBytesToAcquire >= 0)
  assert(numBytesToFree >= 0)
  assert(memoryUsed <= poolSize)

 

这里先推断,假设申请的内存大于还在可用的内存,须要先淘汰掉部分block cache来释放空间.
  if (numBytesToFree > 0) {
    memoryStore.evictBlocksToFreeSpace(Some(blockId)numBytesToFree

             evictedBlocks)
    // Register evicted blocks, if any, with the active task metrics
    Option(TaskContext.get()).foreach { tc =>
      val metrics = tc.taskMetrics()
      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId

             BlockStatus)]())
      metrics.updatedBlocks Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    }
  }
 

分配内存是否成功,也就是要申请的内存小于或等于可用的内存空间,最后把分配的内存加入到使用的内存空间中.表示这部分内存已经被look住.
  val enoughMemory = numBytesToAcquire <= memoryFree
  if (enoughMemory) {
    _memoryUsed += numBytesToAcquire
  }
  enoughMemory
}

 

Storage处理block cache的淘汰

storage中内存不够使用时,通过memoryStorage去运行block的淘汰,并把淘汰后的block返回通知上层的调用端.

if (numBytesToFree > 0) {
  memoryStore.evictBlocksToFreeSpace(Some(blockId)numBytesToFreeevictedBlocks)
  // Register evicted blocks, if any, with the active task metrics
  Option(TaskContext.get()).foreach { tc =>
    val metrics = tc.taskMetrics()
    val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId

       BlockStatus)]())
    metrics.updatedBlocks Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
  }
}

 

MemoryStore中处理对cache的淘汰:

对block的cache进行淘汰的处理函数,传入參数中,第二个參数是须要释放的空间,第三个參数是被淘汰后的block的集合用于返回.

private[spark] def evictBlocksToFreeSpace(
    blockId: Option[BlockId],
    space: Long,
    droppedBlocks: mutable.Buffer[(BlockIdBlockStatus)]): Boolean = {
  assert(space > 0)
  memoryManager.synchronized {
    var freedMemory = 0L

这里首先先得到传入的block相应的rdd的id.得到这个rdd_id的目的是淘汰block时,假设发现是这个rdd的block时,不进行淘汰.
    val rddToAdd = blockId.flatMap(getRddId)
    val selectedBlocks = new ArrayBuffer[BlockId]
    // This is synchronized to ensure that the set of entries is not changed
    // (because of getValue or getBytes) while traversing the iterator, as that
    // can lead to exceptions.
    entries.synchronized {

这里開始对storage的内存中全部的cache进行迭代,这个迭代从最先进行cache的block開始,假设迭代到的block相应的RDD不是传入的BLOCK相应的RDD时,把这个BLOCK加入到选择的BLOCK的集合中,并计算当前内存池中的内存是否达到须要的内存空间,假设达到,停止选择BLOCK的操作.
      val iterator = entries.entrySet().iterator()
      while (freedMemory < space && iterator.hasNext) {
        val pair = iterator.next()
        val blockId = pair.getKey
        if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
          selectedBlocks += blockId
          freedMemory += pair.getValue.size
        }
      }
    }

    if (freedMemory >= space) {

假设流程运行到这里,说明内存空间释放成功,如今可用的内存空间已经达到须要的内存空间的大小,把选择的BLOCK相应的CACHE通过BLOCKMANAGER从内存中进行释放.并把释放后的BLOCK加入到droppedBlocks的集合中,这个集合用于返回结果,表示这次空间的释放时,这些BLOCK已经从CACHE中称出.
      logInfo(s"${selectedBlocks.size} blocks selected for dropping")
      for (blockId <- selectedBlocks) {
        val entry = entries.synchronized { entries.get(blockId) }
        // This should never be null as only one task should be dropping
        // blocks and removing entries. However the check is still here for
        // future safety.
        if (entry != null) {
          val data = if (entry.deserialized) {
            Left(entry.value.asInstanceOf[Array[Any]])
          } else {
            Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
          }
          val droppedBlockStatus = blockManager.dropFromMemory(blockIddata)
          droppedBlockStatus.foreach { status => droppedBlocks += ((blockId

                status)) }
        }
      }
      true
    else {

流程运行到这里,表示STORAGE的内存空间中无法释放出很多其它的内存,也就相当于是释放空间失败求.
      blockId.foreach { id =>
        logInfo(s"Will not store $id as it would require dropping another block " +
          "from the same RDD")
      }
      false
    }
  }
}

原文地址:https://www.cnblogs.com/gccbuaa/p/7055664.html