Flink之ProcessFunction案例

知识点:

1、Flink 提供了 8 个 Process Function: 
    ProcessFunction  
    KeyedProcessFunction 
    CoProcessFunction 
    ProcessJoinFunction 
    BroadcastProcessFunction 
    KeyedBroadcastProcessFunction
    ProcessWindowFunction 
    ProcessAllWindowFunction

2、
KeyedProcessFunction重要方法:

  a)processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素 都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。 

  b)onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回 调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定 的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器 触发的时间信息(事件时间或者处理时间)。

 3、Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法

    currentProcessingTime(): Long 返回当前处理时间 

    currentWatermark(): Long 返回当前 watermark 的时间戳 
    registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的定时器。当 processing time 到达定时时间时,触发 timer。 

    registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
    deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定 时器。如果没有这个时间戳的定时器,则不执行
    deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时 器,如果没有此时间戳的定时器,则不执行。 当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 keyed streams 上面使用。

场景:

10s钟温度都是上升,则报警

1、处理代码案例1

package processFunction

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
 * @author yangwj
 * @date 2021/1/10 21:25
 * @version 1.0
 */
object ProcessFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })
//      .keyBy(_.id)
//      .process(new MykeyedProcessFunction())

    val warningStream: DataStream[String] = dataStream.keyBy(_.id).process(new TempIncreWarning(10000L))
    warningStream.print()
    env.execute("KeyedProcessFunction Test")

  }
}


//10s钟温度都是上升,则报警
class TempIncreWarning(inerval:Long) extends KeyedProcessFunction[String,SensorReading,String]{

  //定义状态,保存上一个温度值进行比较,保存注册定时器的时间戳用于删除
  lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double] ("last-temp",classOf[Double]))
  lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long] ("timer-ts",classOf[Long]))

  override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = {
    //先取状态
    val lastTemp: Double = lastTempState.value()
    val timerTs: Long = timerTsState.value()
    //更新温度值
    lastTempState.update(i.temperature)

    //当前温度值和上次温度进行比较
    if(i.temperature > lastTemp && timerTs == 0){//初始化
      //如果温度上升,且没有定时器,那么注册当前时间10s之后的定时器
      val ts: Long = context.timerService().currentProcessingTime() + inerval
      context.timerService().registerProcessingTimeTimer(ts)
      timerTsState.update(ts)

    } else if(i.temperature < lastTemp){
      //如果温度下降,那么删除定时器
      context.timerService().deleteProcessingTimeTimer(timerTs)
      timerTsState.clear()
    }

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    out.collect("传感器" + ctx.getCurrentKey+"的温度连续"+inerval/1000 + "秒连续上升")
    timerTsState.clear()
  }
}


//KeyedProcessFunction 工能测试
class MykeyedProcessFunction extends KeyedProcessFunction[String,SensorReading,String]{

  //定时器
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {

  }

  override def open(parameters: Configuration): Unit = {
    val valueState: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate", classOf[Int]))

  }

  //context上下文
  override def processElement(i: SensorReading,
       context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = {
    //注册触发,onTimer()
    context.timerService().registerEventTimeTimer(context.timestamp()+60000L)

    //删除定时器
    //    context.timerService().deleteEventTimeTimer()
  }
}

原文地址:https://www.cnblogs.com/ywjfx/p/14264923.html