Spark(七)【RDD的持久化Cache和CheckPoint】

RDD的持久化

1. RDD Cache缓存

​ RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

使用

1.rdd.cache() 等价于rdd.persist(MEMORY_ONLY)

2.rdd.persist() 可以设置级别

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
示例

应用中运行两个job任务,缓存rdd3

/**
 * @description: RDD的Cache缓存
 * @author: HaoWu
 * @create: 2020年08月04日
 */
object DependeciedTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = rdd1.map(x=>x)
    val rdd3 = rdd2.groupBy(x=>x)
    println("*******缓存前rdd3的血缘依赖**********")
    println(rdd3.toDebugString)
    //将rdd3缓存
    rdd3.cache()
    //任务1
    rdd3.collect()
    println("*******缓存后rdd3的血缘依赖**********")
    println(rdd3.toDebugString)
    //任务2
    rdd3.saveAsTextFile("output")
    Thread.sleep(10000000)
  }
}

小说明:Spark会将Shuffle的RDD缓存进cache,上面写的有些小问题,但不影响演示。

缓存前后RDD的血缘依赖:

任务1的执行WEB界面

任务2的执行WEB界面

总结

① cache操作会增加血缘关系,不改变原有的血缘关系

②RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行

③丢失的数据会被重算,RDD的各个Partition是相对独立,只需要计算丢失的部分即可

④Spark会自动对一些Shuffle操作的中间数据做cache操作

⑤实际使用的时候,如果想重用数据,建议调用persist或cache。

2. RDD CheckPoint检查点

​ 对cache的补充。 cache一般将数据缓存到内存,不可靠,checkpoint选择将数据写入到磁盘!

什么是checkpoint检查点:通过将RDD中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

使用
    //设置checkpoint目录,集群环境checkpoint目录必须是HDFS的路径
    sc.setCheckpointDir("./checkpointDir1")
    //rdd3设置checkpoint
    rdd3.checkpoint()
示例
/**
 * @description: RDD的CheckPoint持久化
 * @author: HaoWu
 * @create: 2020年08月04日
 */
object DependeciedTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //设置checkpoint目录,集群环境checkpoint目录必须是HDFS的路径
    sc.setCheckpointDir("./checkpointDir1")
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = rdd1.groupBy(x=>x)
    val rdd3 = rdd2.map(x=>x)
    //增加cache缓存
    rdd3.cache()
    println("********设置checkpoint前,rdd3的血缘依赖**********")
    println(rdd3.toDebugString)
    //增加数据检查点
    rdd3.checkpoint()
    //任务1
    rdd3.collect()
    println("********设置checkpoint后,rdd3的血缘依赖**********")
    println(rdd3.toDebugString)
    //任务2
    rdd3.saveAsTextFile("output12")
    Thread.sleep(10000000)
  }
}

查看检查点前后的血缘依赖

总结

①在集群环境运行时,checkpoint目录必须是HDFS的路径

②一旦RDD被checkpoint,当前RDD的血缘关系会被切断

③ checkpoint在第一个行动算子被调用时,触发,依然会生成一个Job,执行checkpoint任务

3. cache和check的区别

①Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

②Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

③建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

原文地址:https://www.cnblogs.com/wh984763176/p/13435457.html