Flink 动态窗口统计面试题-实现

之前分享了一个 Flink 的面试题,这里简单回顾下内容:

有两个输入源,一个是命令流,一个是数据流
需要将命令流进行广播,然后和数据流进行connect,根据命令流指定的命令进行统计
实现一个输出到终端的 sink,将统计结果打印出来,每一条记录包括 taskId, targetAttr, periodStartTime(周期开始时间), value (统计后的值,double类型)

面试题原文链接: https://mp.weixin.qq.com/s/iKx0EE-xvnOyncCIhN6MeA

实现流程

1、命令流使用从 kafka 输入,方便手动发送命令,map 解析成对象,广播
2、数据流实现 SourceFunction 自动生成数据,map 解析成对象
3、使用数据流关联 命令流,输出数据与命令组合的 tuple
4、生成 timestamp 和 周期性的 watermark(flink 自带)
5、数据通过 DynamicTumblingEventTimeWindows.assignWindows 指定动态窗口
6、使用窗口函数根据命令的 methed 计算对应的结果

具体实现请移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow

动态窗口实现

这个问题的难点就在: 根据命令流的规则进行窗口统计(而命令的规则中,指定了统计的目标,也指定了 窗口的长度和开始时间)

Flink 原生的翻滚、滑动、session 窗口都是固定时间长度的窗口(session 窗口特殊的是,指定的长度不是窗口的长度而是 session timeout 的时间)

看下 翻滚窗口(TumblingEventTimeWindows)的源码

public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
    // 指定窗口的长度
    private final long size;
    // 指定窗口的开始时间的偏移长度,如: Flink 的窗口都是整点的,按天的窗口都是从 0 点开始(UTC0),指定 offset = 8 小时,就称为北京时间的 0 点了
    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) {
        if (Math.abs(offset) >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
        }

        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            // Long.MIN_VALUE is currently assigned when no timestamp is present
            // 每条数据进来会根据 当前的 timestam 使用 offset 和 size 计算窗口对于的开始时间,结束时间就是 开始时间 + size 
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

}

TimeWindow.java
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
}

Flink 在使用 TumblingEventTimeWindows 功能的时候,每条数据都会进入 TumblingEventTimeWindows.assignWindows 方法,计算数据属于的窗口(知道窗口的长度,基于 0 的偏移值,任何一个 正常的 timestam 都可以通过上面的 getWindowStartWithOffset 函数计算出该 timestamp 对应窗口的 开始时间和结束时间)。

动态窗口的实现也是基于 TumblingEventTimeWindows 修改的,主要是"根据每条输入数据的命令,修改 窗口的 size 和 offset" 使窗口称为动态窗口

核心代码如下:

/**
 * flink dynamic tumbling event window
 */
@PublicEvolving
public class DynamicTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    // not final, dynamic modify
    private long size;
    private long offset;

    protected DynamicTumblingEventTimeWindows() {
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            Tuple2<DataEntity, Command> element1 = (Tuple2<DataEntity, Command>) element;
            Command command = element1._2;
            // cal new window size
            // 大于当前时间的情况又怎么处理呢: 窗口开始时间大于 timestamp,下一窗口命令还未开始,数据属于上一窗口命令,所以不修改 size 与 offset
            if (command.startTime() < timestamp) {
                long millis = command.startTime() % 999;
                if ("minute".equalsIgnoreCase(command.periodUnit())) {
                    this.size = command.periodLength() * 60 * 1000;
                    // offset 等于 命令开始时间的 秒值 + 毫秒值
                    long second = command.startTime() / 1000 % 60;
                    offset = second * 1000 + millis;
                } else {
                    this.size = command.periodLength() * 1000;
                    // offset 等于 命令开始时间的 毫秒值
                    offset = millis;
                }
            }
            // todo 窗口开始时间大于或者小于 当前 timestamp 的时候,需要处理
            // 小于当前时间,可以计算出当前timestamp 对应的窗口
            long start = getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    /**
     * cal window start time
     */
    public long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp + offset - windowSize) % windowSize;
    }

    public static DynamicTumblingEventTimeWindows of() {
        return new DynamicTumblingEventTimeWindows();
    }

}

DynamicTumblingEventTimeWindows.of 生成窗口的时候,不再指定固定的 size 和 offset(动态窗口的规则中,有指定 对于的 属于进行统计,所以就不指定默认窗口 size 和 offset)

根据输入数据的命令部分,所以命令大于当前 timestamp 的数据(小于 timestamp 的命令说明该命令尚未开始,数据还是属于上一窗口,前面 数据流与命令流 关联的时候,已经做了处理,这里只是多加一层判断),根据命令中的 startTime 计算命令对应窗口基于 0 毫秒的偏移值(如果是分钟的窗口还有加上 秒 的偏移值),窗口的长度是 periodLength 属性对应的值,这里就得到了命令对应的窗口的 size 和 offset,后面的流程就和 Flink 原生窗口(TumblingEventTimeWindows)一样了,计算下 窗口的开始时间,结束时间

命令开始时间处理

对于命令的开始时间,其实也是一个处理的难点

命令的开始时间可能是小于、等于、大于当前时间的,其中小于和等于的命令,意味着窗口马上就要开始,使用对应的属性计算窗口的 size 和 offset 即可

对于命令的开始大于当前时间的命令,需要做下特殊处理,大于当前时间,意味着命令还不能生效,不能替换当前命令,当前的数据,是属于上一个正在执行的命令

在实现的时候,我使用了两个 map 的对应,一个存储当前正在执行命令,一个存储最新的命令(为了简单,假设基于每个属性的命令一次只会有一个在执行)

核心代码如下:

new BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]() {

    // 存放当前命令的 map(非 keyBy 的不能使用 keyState,用 hashmap 将就了)
    var currentCommand: util.HashMap[String, Command] = _
    // 存放新命令的 map
    var commandState: MapStateDescriptor[String, Command] = _

    override def open(parameters: Configuration): Unit = {

      currentCommand = new util.HashMap[String, Command]()
      commandState = new MapStateDescriptor[String, Command]("commandState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint[Command]() {}))
    }

    override def processElement(element: DataEntity, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#ReadOnlyContext, out: Collector[(DataEntity, Command)]): Unit = {
      // 命令可以是大于/小于当前时间
      // 小于当前时间的,直接添加即可,之前命令的窗口不会收到新数据,新数据直接进新命令的窗口
      // 大于当前时间的命令,不能直接与流一起往下游输出,等时间小于当前的 processTime 时间后,才会开始新窗口
      val command = ctx.getBroadcastState(commandState).get(element.attr)
      val current = currentCommand.get(element.attr)
      if (command != null && command.startTime <= ctx.currentProcessingTime()) {
        // 当新命令的时间小于当前的处理时间,替换旧命令
        currentCommand.put(element.attr, command)
      }
      // 如果当前命令为空,数据就不往下发送了
      if (current != null) {
        out.collect((element, current))
      }
      // command not exists, ignore it
    }

    override def processBroadcastElement(element: Command, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#Context, out: Collector[(DataEntity, Command)]): Unit = {
      // only one command are new accepted, cover old command
      logger.info("receive command : " + element)
      ctx.getBroadcastState(commandState).put(element.targetAttr, element)
    }
  }

代码都有注释,不再赘述

全部代码,请移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow

测试

从 kafka 数据 readme.md 中对应的命令,查看 输出结果

2020-09-09 20:12:08,812 INFO  - receive command : Command(task1,attr2,sum,SECOND,20,1598596980000)
2020-09-09 20:12:08,812 INFO  - receive command : Command(task2,attr1,sum,MINUTE,1,1598596980000)
2020-09-09 20:12:08,812 INFO  - receive command : Command(task3,attr2,max,SECOND,30,1598596980000)
2020-09-09 20:12:09,816 INFO  - receive command : Command(task4,attr3,min,MINUTE,1,1599640669628)
sum> {"method":"min","periodStartTime":"20:11:10","targetAttr":"attr3","periodEndTime":"20:12:10","value":"18.0","taskId":"task4"}
sum> {"method":"max","periodStartTime":"20:11:59","targetAttr":"attr2","periodEndTime":"20:12:29","value":"981.0","taskId":"task3"}
sum> {"method":"max","periodStartTime":"20:12:29","targetAttr":"attr2","periodEndTime":"20:12:59","value":"937.0","taskId":"task3"}
sum> {"method":"sum","periodStartTime":"20:11:59","targetAttr":"attr1","periodEndTime":"20:12:59","value":"26876.0","taskId":"task2"}
sum> {"method":"min","periodStartTime":"20:12:10","targetAttr":"attr3","periodEndTime":"20:13:10","value":"32.0","taskId":"task4"}
sum> {"method":"max","periodStartTime":"20:12:59","targetAttr":"attr2","periodEndTime":"20:13:29","value":"998.0","taskId":"task3"}
2020-09-09 20:13:43,712 INFO  - receive command : Command(task5,attr2,sum,SECOND,20,1598596980000)
2020-09-09 20:13:43,712 INFO  - receive command : Command(task6,attr1,sum,MINUTE,1,1598596980000)
2020-09-09 20:13:43,712 INFO  - receive command : Command(task7,attr2,max,SECOND,30,1598596980000)
2020-09-09 20:13:43,712 INFO  - receive command : Command(task8,attr3,min,MINUTE,1,1599640669628)
sum> {"method":"max","periodStartTime":"20:13:29","targetAttr":"attr2","periodEndTime":"20:13:59","value":"995.0","taskId":"task3"}
sum> {"method":"sum","periodStartTime":"20:12:59","targetAttr":"attr1","periodEndTime":"20:13:59","value":"31627.0","taskId":"task2"}
sum> {"method":"min","periodStartTime":"20:13:10","targetAttr":"attr3","periodEndTime":"20:14:10","value":"90.0","taskId":"task4"}
sum> {"method":"max","periodStartTime":"20:13:59","targetAttr":"attr2","periodEndTime":"20:14:29","value":"945.0","taskId":"task7"}
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
原文地址:https://www.cnblogs.com/Springmoon-venn/p/13641481.html