Flink之Watermark的设置和使用

具体实现代码如下所示:

main函数中代码如下:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可
env.getConfig.setAutoWatermarkInterval(5000)

val sensorStream: DataStream[SensorReading] = env
    .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
    .map(new MyMapToSensorReading)

// 1、引入Watermark(使用已有的类)
// 1.1、给一个没有乱序,时间为升序的流设置一个EventTime
val ascendingStream: DataStream[SensorReading] = sensorStream.assignAscendingTimestamps(_.timestamp)
// 1.2、当流中存在时间乱序问题,引入watermark,并设置延迟时间
/**
 * 知识点:
 * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型
 * 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间)
 * 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回
 * 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便
 */
val watermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
    override def extractTimestamp(element: SensorReading): Long = {
        element.timestamp * 1000
    }
})

// 2、使用 TimestampAssigner 引入 Watermark
// 2.1、Assigner with periodic watermarks(周期性引入watermark)
/**
 * 知识点:
 * 1、系统会周期性的将watermark插入到流中,默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置,单位为毫秒
 * 2、产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大于流中最大watermark就插入,小于就不插入
 * 3、如下,可以自定义一个周期性的时间戳抽取(需要实现 AssignerWithPeriodicWatermarks 接口)
 */
env.getConfig.setAutoWatermarkInterval(5000)
val periodicWatermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10))

env.execute("WatermarkDemo")

自定义类实现ProcessFunction接口:

/**
 * 自定义一个周期生成watermark的类
 * @param bound watermark的延时时间(毫秒)
 */
class MyPeriodicAssigner(bound: Long) extends AssignerWithPeriodicWatermarks[SensorReading] {

    // 当前为止的最大时间戳(毫秒)
    var maxTs: Long = Long.MinValue

    /**
     * 获取当前的watermark(默认200毫秒获取一次,可以通过 env.getConfig.setAutoWatermarkInterval(5000) 来设置)
     * @return 当前watermark,当前最大时间戳 - 延时时间
     */
    override def getCurrentWatermark: Watermark = {
        new Watermark(maxTs - bound)
    }

    /**
     * 指定eventTime对应的字段(流中每条数据都会调用一次此方法)
     * @param element 流中的每条数据
     * @param previousElementTimestamp 无
     * @return 当前流的eventTime(单位:毫秒)
     */
    override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
        // 每条数据都获取其中的时间戳,跟最大时间戳取大,并重新赋值给最大时间戳
        maxTs = maxTs.max(element.timestamp * 1000)
        element.timestamp * 1000
    }
}
原文地址:https://www.cnblogs.com/yangshibiao/p/14133587.html