Checkpoint & cache & persist

checkpoint

checkpoint(检查点)是Spark为了避免长链路,大计算量的Rdd不可用时,需要长时间恢复而引入的。主要就是将通过大量计算而获得的这类Rdd的数据直接持久化到外部可靠的存储体系中(一般为hdfs文件)。在以后再需要从这个Rdd获取数据时,直接从检查点获取数据从而避免了从头重新计算Rdd的数据。

生成checkpoint

checkpoint是在job执行结束后再启动专门的checkpoint job生成的(完成job的action方法之后),也就是说需要checkpoint的rdd会被提交到集群上计算两次。所以在对rdd调用checkpoint时,建议也加上rdd.cache。如果对多个rdd进行了checkpoint(这些rdd不是父子关系),则需要将这些rdd都提交到集群执行一次。

Spark当前实现的checkpoint策略,如果父子rdd都调用了checkpoint方法,则默认只会对子rdd进行checkpoint。可以通过系统参数spark.checkpoint.checkpointAllMarkedAncestors来保存所有调用了checkpoint方法的rdd。

RDDCheckpointData与RDD是一一对应的,包含了一个RDD的checkpoint的所有信息,RDD可以通过其RDDCheckpointData.isCheckpointed方法来判断RDD是否有对应的checkpoint。RDD会调用RDDCheckpointData.checkpoint方法来实际checkpoint当前RDD。RDDCheckpointData有两个子类LocalRDDCheckpointData和ReliableRDDCheckpointData,分别对应本地模式的checkpoint方法和集群模式的checkpoint方法。RDDCheckpointData会调用子类的doCheckpoint方法来实际保存rdd的数据。

rdd生成checkpoint文件的流程如下(这里以集群模式为例):

SparkContext.runJob -> RDD.doCheckpoint -> RDDCheckpointData.checkpoint -> ReliableRDDCheckpointData.doCheckpoint -> ReliableRDDCheckpoint.writeRDDToCheckpointDirectory -> SparkContext.runJob

  1. 在SparkContext.runJob方法中,在DAGScheduler.runJob结束后,调用finalRdd的doCheckpoint方法
  2. 在RDD.doCheckpoint方法中,如果当前rdd没有调用checkpoint方法(checkpointData.isDefined == false),则遍历当前rdd依赖的所有rdd,递归调用这些rdd的checkpoint方法
  3. 如果当前rdd调用checkpoint方法(checkpointData.isDefined == true),则如果设置保存所有调用了checkpoint方法的rdd的数据(checkpointAllMarkedAncestors == true),则首先遍历当前rdd的所有依赖的rdd的doCheckpoint方法。
  4. 然后再对当前rdd调用RDDCheckpointData.checkpoint方法来实际将当前rdd的数据保存到checkpoint文件中
  5. 在RDDCheckpointData.checkpoint方法中,首先调用子类的doCheckpoint方法保存rdd数据到checkpoint文件并返回checkpoint文件对应的rdd,然后将当前rdd的所有依赖清空
  6. 在子类的doCheckpoint方法中(这里以ReliableRDDCheckpointData为例),会调用ReliableCheckpointRDD.writeRDDToCheckpointDirectory方法来实际保存rdd数据到checkpoint中,并返回checkpoint对应的rdd
  7. 在ReliableCheckpointRDD.writeRDDToCheckpointDirectory方法中,会调用SparkContext.runJob方法将当前rdd提交到集群执行,并调用rdd.iterator方法遍历rdd中数据并保存到SparkContext设置的checkpoint目录中,最后返回ReliableCheckpointRDD包装的对checkpoint文件引用的rdd

SparkContext.runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    // 省略部分代码
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    // 在job结束后再checkpoint
    rdd.doCheckpoint()
  }

RDD.doCheckpoint

  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
            // 对父rdd进行checkpoint(也会调用runJob方法),当前这个是个串行操作
            dependencies.foreach(_.rdd.doCheckpoint())
          }
          checkpointData.get.checkpoint()
        } else {
          // 如果checkpointData为null,说明当前rdd没有checkpoint,则遍历其父类rdd
          // 如果都没有调用过checkpoint方法,则返回
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

RDDCheckpointData.checkpoint

  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

    val newRDD = doCheckpoint()

    // Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

ReliableRDDCheckpointData.checkpoint

  protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

    // Optionally clean our checkpoint files if the reference is out of scope
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
    newRDD
  }

ReliableCheckpointRDD.writeRDDToCheckpointDirectory

  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {

    val sc = originalRDD.sparkContext

    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }

    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    // 再次提交job,来生成checkpoint
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
    }
    newRDD
  }

读取checkpoint

Spark的rdd的读取都是通过rdd.iterator方法来读取数据的。rdd.iterator都会调用computeOrReadCheckpoint方法来读取数据。如果当前rdd是被checkpoint的(isCheckpointedAndMaterialized == true),则直接调用rdd父类(被checkpoint的rdd的父类就是CheckpointRDD类型的rdd,也就是在写入checkpoint时,最后返回的rdd)的iterator方法。在父类的rdd中由于isCheckpointedAndMaterialized == false,会调用CheckpointRDD.compute方法来获取数据。这里以CheckpointRDD的一个子类ReliableCheckpointRDD为例,在compute方法中直接读取对应的checkpoint文件来获取数据。

RDD.computeOrReadCheckpoint

  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    // 如果当前rdd被checkpoint了,则直接从rdd的parent取
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

ReliableCheckpointRDD.compute

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
  val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
  ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}

cache & persist

cache和persist都是Spark用来缓存数据的方法,cache和persist的唯一区别就是cache的存储级别是MEMORY_ONLY,而persist可以指定存储级别,也就是说cache的数据在保存数据的节点内存不足的情况下会被丢弃,而persist在设置了存储级别可为DISK时,在节点内存不足时,缓存的数据可以被刷新到磁盘上保存。cache&persist和checkpoint的区别在于cache&persist只是为了加快数据的计算,RDD的依赖链路是完整保存的,在cache&persist的数据不可用时,Spark还可以根据RDD的依赖关系重新计算得到数据。而checkpoint检查点对应的RDD的依赖链路是不保存的,对应的RDD只有一个指向检查点文件的父RDD,所以在检查点数据丢失的情况下,Spark是无法根据RDD的依赖链路恢复数据的。而且由于cache&persist的数据由BlockManager管理,所以在driver程序执行结束时,被cache&persist的数据也会被清空。而checkpoint的数据是写入诸如HDFS文件系统中的,是独立存在的,所以可以被下一个driver程序执行使用。

cache和persist方法只是简单的设置了一下RDD的存储级别(RDD默认的存储级别是NONE)。在调用cache或persist时是不会触发缓存数据,只有在调用了action操作,Spark在调用到设置了cache或persist的RDD的iterator方法时,才会检查RDD是否是缓存了的。执行流程如下:

  1. 首先在action操作前调用RDD.cache或RDD.persist操作,设置RDD的存储级别
  2. 在job真正运行后,最终任务会调用ResultTask.runTask或ShuffleMapTask.runTask方法执行。在runTask方法中会调用RDD.iterator方法
  3. 在iterator中会判断当前RDD是否设置了存储级别(RDD默认的存储级别是NONE,只有cache和persist才可以改变RDD的存储级别,所以只要RDD的存储级别不是NONE则一定是设置了缓存),如果不是NONE,则调用getOrCompute方法,否则直接调用computeOrReadCheckpoint方法
  4. 在getOrCompute方法中,以RDD的id和partitionId作为查询的key,首先尝试从BlockManager中直接获取缓存的RDD数据(先尝试从本地获取,如果本地获取不到再根据driver提供的节点从其他节点获取),如果不能从BlockManager中获取,则再调用computeOrReadCheckpoint方法调用RDD.compute方法直接计算或从检查点获取数据,然后将获取到的数据缓存到BlockManager中,以便下次可以直接从BlockManager中直接获取(BlockManager原理请查看Spark存储体系)

RDD.cache

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): this.type = persist()

RDD.persist

  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // 当前在设置了RDD的存储级别后不允许修改
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // 如果当前的RDD是第一次被缓存,则将RDD注册到SparkContext中以便之后的清理和统计,这个操作只会做一次
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

RDD.iterator

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    // 只有rdd设置了cache或persist时,storageLevel才不是NONE,可以调用getOrCompute方法,首先从BlockManager中
    // 直接获取,如果没有再通过compute方法计算获取并保存到BlockManager中
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

RDD.getOrCompute

  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
    // 这里的一个block就是一个partition
    val blockId = RDDBlockId(id, partition.index)
    var readCachedBlock = true
    // 如果是shuffle操作,则通过computeOrReadCheckpoint方法从map获取到iterator,然后调用getOrElseUpdate
    // 将iterator保存到BlockManager中(map阶段只是写到map文件,并没有保存到blockManager,所以这里调用
    // get是获取不到数据的,所以会update),如果是非shuffle操作,首先从BlockManager中获取,获取不到则
    // 调用computeOrReadCheckpoint方法,直接从checkpoint获取或通过compute方法获取。如果是数据源的RDD的
    // compute方法,则直接从数据源获取数据,如果是转换后的RDD的compute方法,则递归调用父类的iterator
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
      readCachedBlock = false
      computeOrReadCheckpoint(partition, context)
    }) match {
      case Left(blockResult) =>
        if (readCachedBlock) {
          val existingMetrics = context.taskMetrics().inputMetrics
          existingMetrics.incBytesRead(blockResult.bytes)
          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
            override def next(): T = {
              existingMetrics.incRecordsRead(1)
              delegate.next()
            }
          }
        } else {
          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
        }
      case Right(iter) =>
        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
    }
  }
原文地址:https://www.cnblogs.com/cenglinjinran/p/8476250.html