Spark分析之MemoryStore

private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)

class MemoryStore(blockManager: BlockManager, maxMemory: Long)extends BlockStore(blockManager) {
    private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

    //
    private def tryToPut(blockId: BlockId, value: Any,size: Long,deserialized: Boolean): ResultWithDroppedBlocks = {
         if (enoughFreeSpace) { //空闲内存是否足以容纳block
            val entry = new MemoryEntry(value, size, deserialized)
            entries.synchronized {
                entries.put(blockId, entry) //将Block放置到内部维护的HashMap中
            }
            //如果是反序列话的就以对象数组方式处理,否则就是以字节数组方式处理
            val valuesOrBytes = if (deserialized) "values" else "bytes"
         }else{//告诉BlockManager内存不足以存下该block,是否将其drop到硬盘中(如果该Block允许Disk存储)
            val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
         }
    }
    
    //取:直接从HashMap中根据blockid获取即可
    override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
        val entry = entries.synchronized {
            entries.get(blockId)
        }
        if (entry == null) {
            None
        } else if (entry.deserialized) { //反序列话的就以对象数组方式处理
            Some(entry.value.asInstanceOf[Array[Any]].iterator)
        } else { //序列话的就以字节数组方式处理
            val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
            Some(blockManager.dataDeserialize(blockId, buffer))
        }
    }
}


总结:

1)内部维护了一个LinkedHashMap来管理所有的block,以blockid作为key将block存储在LinkedHashMap中;

2)在MemoryStore中存放block(tryToPut)时,首先调用ensureFreeSpace()确保空闲内存是否足以容纳该block:

  足:将该block直接加入到LinkedHashMap中去;

  不足:通过BlockManager.dropFromMemory将该block写入到disk中

3)MemoryStore将序列化后的字节数组或者反序列化后的java对象数组的block存取在Memory中。

原文地址:https://www.cnblogs.com/luogankun/p/3924936.html