Spark Structured Streaming 的 Stateful 操作

Structured Streaming 内部使用 StateStore 模块实现增量持续查询,和故障恢复
StateStore 模块提供了分片的、分版本的、可迁移的、高可用的 key-value store

而在应用层面主要是使用 mapGroupsWithState 和 flatMapGroupsWithState 实现状态操作

参考这篇文章的例子 https://blog.csdn.net/wangpei1949/article/details/105028892

object MapGroupsWithStateExample {

  def main(args: Array[String]) {

    val spark = SparkSession.builder.appName("MapGroupsWithStateExample").getOrCreate()

    spark.udf.register("timezoneToTimestamp", timezoneToTimestamp _)

    val jsonSchema =
      """{
        "type":"struct",
        "fields":[
          {
            "name":"eventTime",
            "type":"string",
            "nullable":true
          },
          {
            "name":"eventType",
            "type":"string",
            "nullable":true
          },
          {
            "name":"userID",
            "type":"string",
            "nullable":true
          }
        ]
      }"""

    val inputTable = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092")
        .option("subscribe", "test_1")
        .load()

    val resultTable = inputTable
        .select(from_json(col("value").cast("string"), DataType.fromJson(jsonSchema)).as("value"))
        .select($"value.*")
        .withColumn("timestamp",
                    functions.callUDF("timezoneToTimestamp",
                                      functions.col("eventTime"),
                                      lit("yyyy-MM-dd HH:mm:ss"),
                                      lit("GMT+8")))
        .filter($"timestamp".isNotNull && $"eventType".isNotNull && $"userID".isNotNull)
        .withWatermark("timestamp", "2 minutes")
        .groupByKey((row: Row) => {
            // 分钟 + userID 作为每个 group 的 key
            val timestamp = row.getAs[Timestamp]("timestamp")
            val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
            val currentEventTimeMinute = sdf.format(new Date(timestamp.getTime))
            currentEventTimeMinute + "," + row.getAs[String]("userID")
        })
        .mapGroupsWithState[(String, Long), (String, String, Long)](GroupStateTimeout.EventTimeTimeout())(
            // [] 里的 (String, Long) 和 (String, String, Long) 分别代表状态类型和返回类型
            // () 里的 GroupStateTimeout.EventTimeTimeout() 指定如何判断状态超时

            // 这里开始是 group map 的函数
            // 接受 3 个参数: 当前 group 的 key,当前 group 的所有数据,处理这个 group 时的状态
            //                每个 (分钟 + userID) 会维护一个状态
           (groupKey: String, currentBatchRows: Iterator[Row], groupState: GroupState[(String, Long)]) => {
                println("当前组对应的 Key: " + groupKey)
                println("当前 Watermark: " + groupState.getCurrentWatermarkMs())
                println("当前组的状态是否存在: " + groupState.exists)
                println("当前组的状态是否过期: " + groupState.hasTimedOut)

                var totalValue = 0L

                if (groupState.hasTimedOut) {
                    // 当前组状态已过期,则清除状态
                    println("清除状态...")

                    groupState.remove()

                } else if (groupState.exists) {
                    // 当前组状态已存在,则根据需要处理
                    println("增量聚合....")

                    // 历史值: 从状态中获取
                    val historyValue = groupState.get._2

                    // 当前值: 从当前组的新数据计算得到
                    val currentValue = currentBatchRows.size

                    // 总值 = 历史 + 当前
                    totalValue = historyValue + currentValue

                    // 更新状态
                    val newState = (groupKey, totalValue)
                    groupState.update(newState)

                    // 事件时间模式下,不需要设置超时时间,会根据 Watermark 机制自动超时
                    // 处理时间模式下,可设置个超时时间,根据超时时间清理状态,避免状态无限增加
                    // groupState.setTimeoutDuration(1 * 10 * 1000)
                } else {
                    // 当前组状态不存在,则初始化状态
                    println("初始化状态...")

                    totalValue = currentBatchRows.size
                    val initialState = (groupKey, totalValue * 1L)
                    groupState.update(initialState)
                }

                if (totalValue != 0) {
                    val groupKeyArray = groupKey.split(",")
                    (groupKeyArray(0), groupKeyArray(1), totalValue)
                } else {
                    null
                }
            }
        )
        .filter(_ != null)
        .toDF("minute", "userID", "pv")

    // Query Start
    val query = resultTable
        .writeStream
        .format("console")
        .option("truncate", "false")
        .outputMode("update")
        .trigger(Trigger.ProcessingTime("2 seconds"))
        .start()

    query.awaitTermination()
  }

  def timezoneToTimestamp(dateTime: String, dataTimeFormat: String, dataTimeZone: String): Timestamp = {
    var output: Timestamp = null
    try {
      if (dateTime != null) {
        val format = DateTimeFormatter.ofPattern(dataTimeFormat)
        val eventTime = LocalDateTime.parse(dateTime, format).atZone(ZoneId.of(dataTimeZone));
        output = new Timestamp(eventTime.toInstant.toEpochMilli)
      }
    } catch {
      case ex: Exception => println("error")
    }
    output
  }
}


原文地址:https://www.cnblogs.com/moonlight-lin/p/14165467.html