Flink:Window Api

基本概念

窗口 window

image-20210902165907439

一般真实的流都是无界的,怎样处理无界的数据?

可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也就是得到有界流

窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析

window类型:

  • 时间窗口:按照时间生成 Window。
    • 滚动时间窗口
    • 滑动时间窗口
    • 会话窗口
  • 计数窗口:窗口(window)就是将无限流切割为有限流的一种方式,它会将流 数据分发到有限大小的桶(bucket)中进行分析
    • 滚动计数窗口
    • 滑动计数窗口

滚动窗口-Tumbling Windows

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗 口,窗口的创建如下图所示:

image-20210902171055226

适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。

滑动窗口-Sliding Windows

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,可以有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中

例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包 含着上个 10 分钟产生的数据,如下图所示:

image-20210902171934857

适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

会话窗口-Session Windows

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗 口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。

image-20210902172951558

窗口分配器

窗口分配器 —— window() 方法

我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。

Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。

  • window() 方法接收的输入参数是一个 WindowAssigner
  • WindowAssigner 负责将每条输入的数据分发到正确的 window 中
  • Flink 提供了通用的 WindowAssigner
    • 滚动窗口--tumbling window
    • 滑动窗口--sliding window
    • 会话窗口--session window
    • 全局窗口--global window

创建不同类型的窗口

  • 滚动时间窗口(tumbling time window)

image-20210903100111842

  • 滑动时间窗口(sliding time window)

image-20210903100138936

  • 会话窗口(session window)

image-20210903100219683

  • 滚动计数窗口(tumbling count window)

image-20210903100250516

  • 滑动计数窗口(sliding count window)

image-20210903100355230

窗口函数

window function 定义了要对窗口中收集的数据做的计算操作

可以分为两类

  • 增量聚合函数(incremental aggregation functions)
    • 增量聚合函数(incremental aggregation functions)
    • ReduceFunction, AggregateFunction
  • 全窗口函数(full window functions)
    • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
    • ProcessWindowFunction,WindowFunction

时间窗口增量聚合

下面计算每三秒中数据的个数:

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<String> inputStream = env.socketTextStream("192.168.1.77", 7777);

        DataStream<SensorReading> mapStream = inputStream.map((str) -> {
            String[] split = str.split(" ");
            return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        DataStream<Integer> resultStream = mapStream.keyBy("id")
            	//开一个时间窗口
                .timeWindow(Time.seconds(3))
            	//聚合
                .aggregate(new MyAggregateFun());

        resultStream.print();

        env.execute();
    }

    private static class MyAggregateFun implements AggregateFunction<SensorReading, Integer, Integer>{

        //创建一个累加器
        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(SensorReading value, Integer accumulator) {
            //累加操作
            return accumulator + 1;
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return  a + b;
        }
    }

测试效果:

动画

全窗口聚合

代码:

        //id 结束时间 个数
        DataStream<Tuple3<String, Long, Integer>> resultStream = mapStream.keyBy("id")
                .timeWindow(Time.seconds(3))
                .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) {
                        String id = tuple.getField(0);
                        long windowEnd = window.getEnd();
                        int count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id, windowEnd, count));
                    }
                });

效果:动画2

计数窗口测试

滑动计数窗口测试:

        DataStream<Double> resultStream = mapStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new MyAvgFunc());
        resultStream.print();

MyAvgFunc.java

    /**
     * @author wen.jie
     * @date 2021/9/3 11:00
     * 求平均温度
     */
    public static class MyAvgFunc implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double>{

        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }

        @Override
        public Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
        }

        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }

        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0+b.f0, a.f1+b.f1);
        }
    }

效果:每两条数据滑动一次

动画

其他可选API

  • .trigger() —— 触发器
    • 定义 window 什么时候关闭,触发计算并输出结果
  • .evictor() —— 移除器
    • 定义移除某些数据的逻辑
  • .allowedLateness() —— 允许处理迟到的数据
  • .sideOutputLateData() —— 将迟到的数据放入侧输出流(旁路输出)
  • .getSideOutput() —— 获取侧输出流(旁路输出)

同过allowedLateness可以处理迟到数据。

在使用“事件时间”窗口时,可能会发生元素迟到的情况,具体表现是,Flink用于跟踪“事件时间”进度的水位线已经超过了元素所属窗口的结束时间戳。

在默认情况下,当水位线超过窗口末端时将删除迟到的元素。但是,Flink允许为窗口算子指定最大允许延迟--在删除指定元素之前可以延迟的时间,默认值为0。

Flink保持窗口的状态,直到允许的延迟过期为止。一旦发生这种情况,Flink将删除该窗口并删除其状态。

侧输出流(旁路输出流的使用方法):

//标记旁路输出
final OutputTag<T> tag = new OutputTag<>("later-data");
//创建源数据
DataStream<T> input = .......;
SingleOutputStreamOperator<T> sumStream = input
    //keyBy:键控流转换算子
    .keyBy("id")
    //窗口转换算子
    .timeWindow(Time.seconds(15))
    //运行延迟时间
    .allowedLateness(Time.minutes(1))
    //将迟到的数据发送到用OutputTag标识的旁路输出流中
    .sideOutputLateData(tag)
    .sum("temperature");

//加载旁路输出数据
sumStream.getSideOutput(tag).print();
原文地址:https://www.cnblogs.com/wwjj4811/p/15223023.html