Flink:时间和水位线

接着上一篇:https://www.cnblogs.com/wwjj4811/p/15223023.html

时间

时间是流处理应用程序的另一个重要概念。

事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义,许多常见的流计算都是基于时间语义的。

Flink支持以下3种时间类型:

  • Event Time:事件创建的时间
  • Ingestion Time:数据进入Flink的时间
  • Processing Time:执行操作算子的本地系统时间,与机器相关

image-20210903144714699

水位线

乱序数据的影响

image-20210903161136064

当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。

由于网络、分布式等原因,会导致乱序数据的产生。

乱序数据会让窗口计算不准确。

水位线

遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口

Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发。

Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现;

数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据, 都已经到达了,因此,window 的执行也是由 Watermark 触发的。

watermark 用来让程序自己平衡延迟和结果正确性

image-20210906092518580

watermark 是一条特殊的数据记录

watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退

watermark 与数据的时间戳相关

Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。

有序流的 Watermarker 如下图所示:(Watermark 设置为 0)

image-20210906150302372

乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)

image-20210906150318575

当 Flink 接收到数据时,会按照一定的规则去生成 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是 基于数据携带的时间戳生成的,一旦 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 event time 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应 的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。

Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。

只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。

使用 Watermarks

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        //设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(100);

        DataStream<String> inputStream = env.socketTextStream("192.168.1.77", 7777);

        DataStream<SensorReading> mapStream = inputStream.map((str) -> {
            String[] split = str.split(" ");
            return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        })
        //乱序数据设置时间戳和watermark
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        //基于事件时间的开创聚合,统计15s内温度最小值
        SingleOutputStreamOperator<SensorReading> minStream = mapStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .minBy("temperature");

        minStream.print("minStream");
        env.execute();

窗口的起始点和偏移量

点进去timeWindow方法的源码:

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

因为我们这里不是ProcessingTime时间语义,所以调用window(TumblingEventTimeWindows.of(size));

TumblingEventTimeWindows类中有一个分配窗口assignWindows方法:

	@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		if (timestamp > Long.MIN_VALUE) {
			// Long.MIN_VALUE is currently assigned when no timestamp is present
			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
			return Collections.singletonList(new TimeWindow(start, start + size));
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}

TimeWindow.getWindowStartWithOffset方法:

	public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
		return timestamp - (timestamp - offset + windowSize) % windowSize;
	}

上面offset没有设置过,默认为0,所以公式简化为:timestamp - timestamp % windowSize,

所以最后结果是小于timestamp,且是windowSize的最大整数倍

当设置了offset后,如果offset为正数,则窗口会向右偏移offset位置。

原文地址:https://www.cnblogs.com/wwjj4811/p/15245885.html