【源码】flink 窗口数据触发清理流程

前言: Flink 窗口会将当前窗口的数据存储在状态中,等待窗口结束的时候触发计算,那窗口状态什么时候清理?

(前提: 窗口的主要逻辑是在 WindowOperator 中完成的)

翻一下 WindowOperator 的代码,可以看到下面这个方法,“Drops all state for the given window and calls” 这个注释,还是可以比较明确的说明这个方法的作用的。

那 WindowOperator.clearAllState 是什么时候调用的

/**
 * Drops all state for the given window and calls
 * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
 *
 * <p>The caller must ensure that the
 * correct key is set in the state backend and the triggerContext object.
 */
private void clearAllState(
    W window,
    AppendingState<IN, ACC> windowState,
    MergingWindowSet<W> mergingWindows) throws Exception {
  // 清理 窗口状态
  windowState.clear();
  // 清理 触发器
  triggerContext.clear();
  processContext.window = window;
  // 清理 窗口上下文,调用 userFunction 的 clear 方法
  processContext.clear();
  if (mergingWindows != null) {
    mergingWindows.retireWindow(window);
    mergingWindows.persist();
  }

clearAllState 是在 WindowOperator.onEventTime/onProcessingTime 方法中调用的,计算是否达到窗口的 isCleanupTime

if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
    clearAllState(triggerContext.window, windowState, mergingWindows);
}

protected final boolean isCleanupTime(W window, long time) {
  return time == cleanupTime(window);
}

private long cleanupTime(W window) {
  if (windowAssigner.isEventTime()) {
    // 加上 允许的延迟(窗口的延迟,不是 watermark 减去的那个值)
    long cleanupTime = window.maxTimestamp() + allowedLateness;
    return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
  } else {
    return window.maxTimestamp();
  }
}

窗口 timer.getTimestamp 时间 等于窗口的最大时间(如果允许延迟: 最大时间 + allowedLateness 时间),就调用 WindowOperator.clearAllState

如果是滑动窗口,数据属于多个窗口的,timer 或者说 onEventTime/onProcessingTime 方法是属于所有窗口的,每次触发的窗口是,窗口队列中最早结束的一个

(滑动窗口的数据流进来的时候,会自动把数据放到多个窗口的状态中去)

以下以事件时间为例说明

WindowOperator.onEventTime 上游调用的地方是 InternalTimerServiceImpl.advanceWatermark (在代码 WindowOperator.onEventTime/onProcessingTime/processElement 打个断点,

查看调用栈,可以看到窗口算子从网络 buffer 中读取 数据/watermark 的地方: StreamTaskNetworkInput.processElement )

public void advanceWatermark(long time) throws Exception {
  currentWatermark = time;

  InternalTimer<K, N> timer;

  while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    eventTimeTimersQueue.poll();
    keyContext.setCurrentKey(timer.getKey());
    triggerTarget.onEventTime(timer);
  }
}

可以看到队列,peek 会取出第一条,timer 会对应到第一条对应的窗口

(翻滚窗口 queue 的多条数据,对应多个keyGroup 的多个 key 的窗口,每次只 peek 出第一条,timer 对象是带了keyContext,第一步就是选择当前的 keyContext : keyContext.setCurrentKey(timer.getKey()) )

eventTimeTimersQueue 是在 WindowOperator.processElement 中调用 EventTimeTrigger.onElement 方法,注册定时器添加的

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    // if the watermark is already past the window fire immediately
    return TriggerResult.FIRE;
  } else {
    ctx.registerEventTimeTimer(window.maxTimestamp());
    return TriggerResult.CONTINUE;
  }
}

@Override
public void registerEventTimeTimer(N namespace, long time) {
  eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

队列的数据是在窗口处理输入数据的过程中,调用 EventTimeTrigger.onElement 方法,使用窗口结束时间注册了个定时(注册的时候设置 队列元素的 keyContext 和 namespace)

注: 使用的是 flink 实现的 KeyGroupedInternalPriorityQueue,根据 传入时间(window.masTimestamp)可能会插入到队列的不同位置(按 timestamp 排序, HeapPriorityQueue.siftUp/ siftDown)

@Override
// 添加到队列的末尾
protected void addInternal(@Nonnull T element) {
  final int newSize = increaseSizeByOne();
  moveElementToIdx(element, newSize);
  // 自排序
  siftUp(newSize);
}

// 当前元素位置向对头前移
private void siftUp(int idx) {
  final T[] heap = this.queue;
  // 拿出当前插入元素
  final T currentElement = heap[idx];
  // idx 的一半
  int parentIdx = idx >>> 1;
  // 折半插入: 比较当前插入元素与 队列数组中当前元素 index 一半的元素的优先级
  while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
    // 
    moveElementToIdx(heap[parentIdx], idx);
    idx = parentIdx;
    // 再一半
    parentIdx >>>= 1;
  }
  // 最后放入对应位置
  moveElementToIdx(currentElement, idx);
}

总结: 窗口数据输入时注册窗口最大时间的定时器用以触发窗口计算(同时会注册一个 registerCleanupTimer 用以清除窗口数据,

没有开启 allowedLateness 的窗口,两个定时器时间是相同的,只保留一个),watermark 前进的时候,会从定时器队列中取出对

头中的 timer,查看 timer.getTimestamp(即窗口的最大时间) 小于 当前 watermark 对应的 时间时,触发当前的 timer(从队列中弹出,

调用 triggerTarget.onEventTime(timer)),触发当前窗口的计算(窗口结束),同时查看当前窗口是否可以清除窗口状态(事件时间

的窗口如果允许延迟的,窗口结束是不能清理窗口状态的,)

清理状态就比较简单了,就是调用 AbstractHeapState.remove 方法,传入对应的 namestace,从 stateMap 中 remove 对应的 key

private void remove(K key, int keyGroupIndex, N namespace) {
  checkKeyNamespacePreconditions(key, namespace);

  StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
  stateMap.remove(key, namespace);
}

注: 窗口触发计算的时候,触发器返回 PURGE 也会删除窗口数据

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

原文地址:https://www.cnblogs.com/Springmoon-venn/p/13667023.html