Flink之Window的使用(2):时间窗口

相关文章链接

Flink之Window的使用(1):计数窗口

Flink之Window的使用(2):时间窗口

Flink之Window的使用(3):WindowFunction的使用

具体实现代码如下所示:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sensorStream: DataStream[SensorReading] = env
    .socketTextStream("localhost", 9999)
    .map(new MyMapToSensorReading)

// 1、使用window方法进行开窗设置
// 1.1、滚动窗口
/**
 * 知识点:
 * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类,分别是创建处理时间窗口 和 事件时间窗口(事件时间窗口需要设置时间特性)
 * 2、滚动窗口中,of方法可以设置2个参数,第一个是窗口的大小,第二个是时间偏移量(不设置时默认使用伦敦时间,当设置为-8时,为使用北京时间),偏移量设置时需要小于窗口大小
 */
val windowStream_1: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    //            .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8)))   // 偏移量设置时需要小于窗口大小
    //            .window(TumblingEventTimeWindows.of(Time.seconds(5)))                     // 事件时间窗口
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

// 1.2、滑动窗口
/**
 * 知识点:
 * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类
 * 2、滑动窗口中,of方法可以设置3个参数,第一个是窗口大小,第二个是滑动步长,第三个是偏移量
 */
val windowStream_2: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    //            .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8)))  // 偏移量设置时需要小于窗口大小
    //            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))                // 事件时间窗口
    .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

// 1.3、会话窗口
val windowStream_3: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    //            .window(EventTimeSessionWindows.withGap(Time.minutes(10)))            // 事件时间会话窗口
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

// 2、使用timeWindow方法进行开窗
// 2.1、滚动窗口
val timeWindowStream_1: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    .timeWindow(Time.seconds(5))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
// 2.2、滑动窗口
val timeWindowStream_2: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    .timeWindow(Time.seconds(15), Time.seconds(5))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

windowStream_1.print()

env.execute("TimeWindowDemo")
原文地址:https://www.cnblogs.com/yangshibiao/p/14133628.html