Spark Streaming HA

Driver HA:

1、在提交application的时候,添加 --supervise 选项,如果Driver挂掉,会自动启动一个Driver

2、代码层面恢复Driver

3、在恢复checkpoint中数据的时候,把旧的逻辑也一起给恢复了

主要的作用就是当SparkStreaming 停机之后,下次启动的时候,让代码知道上一次停机的数据处理节点在什么地方,避免从头开始执行

代码逻辑如下:

object SparkStreamingDriverHA {

  /**
    * Driver HA :
    * 1、在提交application的时候, 添加 --supervise选项, 如果Driver挂掉,会自动启动一个Driver
    * 2、代码层面恢复Driver
    * 3、在恢复的同时,如果新添加了处理逻辑,会将旧的处理逻辑恢复
    * @param args
    */

  //设置checkpoint目录:
  val ckDir = "./data/streamingCheckpoint"




  def main(args: Array[String]): Unit = {

    /**
      *  StreamingContext.getorCreate(ckDir,CreateStreamingContext)
      * 这个方法首先会从CKDir目录中获取StreamingContext【因为StreamingContext是序列化存储在checkpoint中,回复时会尝试反序列化这些object】
      * 如果用修改过的class可能会导致错误,此时需要跟换checkpoibt目录或者删除checkpoint目录中的数据,程序才能跑起来
      *
      */

    val ssc = StreamingContext.getOrCreate(ckDir, CreateStreamingContext)

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }


   def CreateStreamingContext(): StreamingContext = {

    println("=============create new StreamingContext===============")
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("DriverHA")

    val ssc = new StreamingContext(conf, Durations.seconds(5))

    ssc.sparkContext.setLogLevel("Error")

     /**
       * 默认checkpoint存储
       * 1、配置信息
       * 2、DStream操作逻辑
       * 3、job的执行进度
       * 4、offset
       */

     ssc.checkpoint(ckDir)
     val lines = ssc.textFileStream("./data/streamingCopyFile")
     val words = lines.flatMap(line=> {line.split(" ")})
     val pairWords = words.map(word=>{(word,1)})
     val result = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})

   /*  result.print()*/

     /**
       * 更改逻辑
       *
       *
       */

      result.foreachRDD(pairRdd=>{
        pairRdd.filter(one=>{
          println("===========filter============")
          true
        }).foreach(println)


      })


     ssc

   }

当重启之后,如果checkpoint记录了上一次的sparkContext,就会按照上一次的执行逻辑执行,否则就按照最新的执行逻辑去执行

val ssc = StreamingContext.getOrCreate(ckDir, CreateStreamingContext)

CKDir: 可以找到上一次停止时的执行位置

CreateStreamingContext: 在checkpoint文件夹中没有数据的情况下,会按照最新的代码逻辑进行数据执行 ,CreateStreamingContext方法就代表最新的执行逻辑

原文地址:https://www.cnblogs.com/wcgstudy/p/11090193.html