SparkStreaming之checkpoint检查点

一.简介

  流应用程序必须保证7*24全天候运行,因此必须能够适应与程序逻辑无关的故障【例如:系统故障、JVM崩溃等】。为了实现这一点,SparkStreaming需要将足够的信息保存到容错存储系统中,以便它可以从故障中恢复。

  检查点有两种类型。

    1.元数据检查点

      将定义流式计算的信息保存到容错存储系统【如HDFS等】。这用于从运行流应用程序所在的节点的故障中恢复。

      元数据包括:

        1.配置

          用于创建流应用程序的配置。

        2.DStream操作

          定义流应用程序的DStream操作集。

        3.不完整的批次

          在任务队列中而尚未完成的批次。

    2.数据检查点

      将生成的RDD保存到可靠的存储系统。在一些跨多个批次组合数据的有状态转换中,这是必须的。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖关系链的长度随着时间而增加。为了避免恢复时间的这种无限增加【与依赖链成正比】,有状态变换的中间RDD周期性地检查以存储到可靠的存储系统中,以切断依赖链。

  总而言之,元数据检查点主要用于从节点故障中恢复,而如果使用状态转换,即使对于基本功能也需要数据或RDD检查点。

二.需要设置检查点的情况

  1.有状态转换的使用,如果在应用程序中使用了updateStateByKey或reduceByKeyAndWindow,则必须提供检查点以缓存之前批次的中间结果。

  2.从运行应用程序的节点故障中恢复,元数据检查点用于使用进度信息进行恢复。

  备注:在没有上述状态转换的简单流应用程序中可以不使用检查点。在这种情况下,节点故障的恢复将是部分性的【某些以接收但未处理的数据可能会丢失】。

三.配置检查点

  可以通过在容错,可靠的文件系统【例如:HDFS、S3或Windows文件系统】中设置目录来启用检查点,检查点信息将保存到该文件系统中。使用:streamingContext.checkpoint(checkpointDirectory)来设置的。这将允许使用上述状态转换。此外,如果要使应用程序从节点故障中恢复,则应重写流应用程序以使其具有以下行为。

  1.当程序首次启动时,它将创建一个新的StreamingContext,设置所有流后调用start()。

  2.当程序在失败后重新启动时,它将从检查点目录中的检查点数据重新创建StreamingContext。

四.代码实现

 1 package big.data.analyse.streaming
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 
 7 /**
 8   * Created by zhen on 2019/8/15.
 9   */
10 object Checkpoint {
11   def functionToCreateContext():StreamingContext = {
12     val conf = new SparkConf().setMaster("local[2]").setAppName("StreaingTest")
13     val ssc = new StreamingContext(conf, Seconds(10))
14     val lines = ssc.socketTextStream("192.168.245.137", 9999)
15 
16     val words = lines.flatMap(_.split(" "))
17     val pairs = words.map(word=>(word,1)).reduceByKey(_+_)
18     pairs.foreachRDD(row => row.foreach(println))
19     ssc.checkpoint("D:\checkpoint")
20     ssc
21   }
22   def main(args: Array[String]) {
23     /**
24       * 设置日志级别
25       */
26     Logger.getLogger("org").setLevel(Level.WARN) // 设置日志级别
27 
28     /**
29       * 获取入口及设置checkpoint检查点
30       */
31     val ssc = StreamingContext.getOrCreate("D:\checkpoint", functionToCreateContext _)
32 
33     ssc.start()
34     ssc.awaitTermination()
35     ssc.stop()
36   }
37 }

五.结果

  入参:

    

  结果:

    

六.总结

  1.需要确保节点进程在失败时会自动重启,这只能通过部署基础结构来完成。

  2.检查点的默认间隔是批处理间隔的倍数,且至少为10秒。通常DStream的5~10个滑动间隔为检查点间隔是一个很好的设置。

原文地址:https://www.cnblogs.com/yszd/p/11358535.html