067 HA与updateStateByKey结合

  是HA与updateStateByKey相结合的程序。

  有点问题,有点奇怪,重启项目后运行没有问题,但是第三次启动的时候,就不会在打印数据了,有点问题。

1.程序

 1 package com.stream.it
 2 import org.apache.spark.streaming.kafka.KafkaUtils
 3 import org.apache.spark.streaming.{Seconds, StreamingContext}
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 /**
 7   * 通过HA管理KAFKA的offset偏移量
 8   * Created by ibf on 03/04.
 9   */
10 object HAAndUpdateStateByKey {
11   def main(args: Array[String]): Unit = {
12     val conf = new SparkConf()
13       .setAppName("StreamingHAOfKafkaDirect")
14       .setMaster("local[*]")
15     val sc = SparkContext.getOrCreate(conf)
16     val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/012"
17 
18     // ========================================
19     /**
20       * 创建一个StreamingContext对象
21       *
22       * @return
23       */
24     def createingStreamingContextFunc(): StreamingContext = {
25       val kafkaParams = Map("metadata.broker.list" -> "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093,linux-hadoop01.ibeifeng.com:9094", "auto.offset.reset" -> "smallest")
26       val topics = Set("beifeng")
27       val ssc = new StreamingContext(sc, Seconds(1))
28       val dstream = KafkaUtils.createDirectStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](ssc, kafkaParams, topics)
29 
30       val resultDStream = dstream
31         .filter(_._2.nonEmpty)
32         .mapPartitions(iter => {
33           iter.flatMap(_._2.split(" ").map((_, 1)))
34         })
35         .updateStateByKey(
36           (values: Seq[Int], state: Option[Long]) => {
37             // 获取当前key的累加值
38             val currentSum = values.sum
39             // 获取之前批次的累加值
40             val preSum = state.getOrElse(0L)
41 
42             // 返回结果
43             Some(currentSum + preSum)
44           }
45         )
46 
47       // 打印一下
48       resultDStream.print()
49 
50       // ssc设置checkpoint
51       ssc.checkpoint(checkpointDir)
52       // 返回对象
53       ssc
54     }
55 
56     val ssc = StreamingContext.getOrCreate(
57       checkpointPath = checkpointDir,
58       creatingFunc = createingStreamingContextFunc
59     )
60 
61     // 将ssc传给其它管理Streaming关闭的对象==>其它线程(当前线程做数据的接收处理工作)
62     shutdownStreaming(ssc)
63 
64     // start ===spark.streaming.stopGracefullyOnShutdown为true的时候,当jvm退出的时候,自动关闭
65     ssc.start()
66     ssc.awaitTermination() // 阻塞操作
67   }
68 
69   /**
70     * 进行shutdown操作
71     *
72     * @param ssc
73     */
74   def shutdownStreaming(ssc: StreamingContext): Unit = {
75     new Thread(new Runnable {
76       override def run(): Unit = {
77         println("你好......")
78         // 当某个条件被触发的时候,streamingcontext关闭
79         // 这里假设停留10秒中
80         Thread.sleep(10000)
81         // 关闭操作
82         ssc.stop(true, true)
83         println("你好")
84       }
85     }).start()
86   }
87 
88 }

2.效果

  

原文地址:https://www.cnblogs.com/juncaoit/p/9484060.html