Flink 非对齐Unaligned的checkpoint(源码分析)

本文源码基于flink1.14

在帮助用户排查任务的时候,经常会发现部分task处理的慢,在Exactly once语义时需要等待快照的对齐而白白柱塞的情况

在flink1.11版本引入了非对齐的checkpoint,来解决这种柱塞问题,所以来看看这个新特性的源码是如何实现的

先看下官网的图来总的说下实现原理,再来看看源码

flink是基于Chandy-Lamport算法来实现全局快照的,其核心就是在数据中间穿插barrier

当一个task上游同一批次所有的barrier到齐时,就可以触发快照状态的保存了,问题就是出在这里,等待对齐

来看下上面官网的图,是官网优化的一个具体思路

当某task的第一个barrier到时,那内部当前批次状态必然是不完整的,那多久才算完整呢?等到这批checkpoint的数据全部都到齐都处理完,状态就完整了

那当第一个barrier到的时候,剩下没到的数据在哪呢,答案就是,上游task的outBuffer(ResultSubpartition)和自己inBuffer(inputGates)里面

ok  分析到这里就可以来看flink的思路了,如果当第一个barrier来的时候我不能触发checkpoint, 是因为还有部分数据没有处理到

那干脆就直接把这部分还没处理的数据(在buffer里面的数据),连同状态数据一起保存到checkpoint里面不就行了吗   ???

在从checkpoint恢复的时候就先把这部分buffer数据, 先恢复到当前task的buffer里面,继续计算就可以了,其实弱化了每个checkpoint批次的概念

这样一来当收到第一个barrier的时候,就可以直接触发checkpoint了

下面就是来看看Flink 源码的实现了

 看下熟悉的StreamTask因为barrier在flink里面依然被当做数据的一种,在主循环里面看看接收到以后做了什么

1、先看输入inputBuffer的保存

 在AbstractStreamTaskNetworkInput中接收数据的时候从inputGate拉取数据的时候

可以看到会根据数据的类型,如果是barrier类型会走到processBarrier方法

注意这里的这个barrierHandler是SingleCheckpointBarrierHandler实现类,因为非对齐模式的话收到第一个barrier就触发checkpoint了,所以也等同于sigle了

这里的state是实现类AlternatingWaitingForFirstBarrierUnaligned是非对齐模式特有的

来看看怎么处理的

可以看到在 SubtaskCheckpointCoordinatorImpl 中会准备inputFlight数据的快照,目的肯定就是为了保持到checkpoint中

这个方法prepareInflightDataSnapshot方法看下

会调用 BiFunctionWithException prepareInputSnapshot这个action对象,这个对象从哪里传进来的呢?

原来在StreamTask构造函数的时候就通过自己的prepareInputSnapshot方法来创建这个Function了

来看下这个方法的逻辑

会遍历所有的inputProcess然后调用它的准备快照方法

这个方法里面具体

就将具体的input的数据保存到state里面去了

input的保存就说完了

2、接着来看output缓存数据的保存

回到最开始的AlternatingWaitingForFirstBarrierUnaligned类当保存完input buffer的数据

调用initInputsCheckpoint方法之后,来看下具体的逻辑

当触发完input数据的保存以后,就是触发全局的checkpoint了,这里会一直走到streamTask的triggerCheckpointOnBarrier在里面会走到performCheckpoint

最后在SubtaskCheckpointCoordinatorImpl类中

 最后在 BufferWritingResultPartition 类里面

PipelinedSubpartition调用addbuffer然后将channelStateWriter.addOutputData把output buffer的数据保存到状态里面去了

3、讲完触发checkpoint保存缓存中的数据,接下来就是从chekpoint恢复的时候怎么恢复这些未处理的数据了

来看下StreamTask如果从chekpoint恢复的是否是如何处理的

restore方法调用了restoreGates

这里就是将数据in buffer状态的保存到InputGate, 然后out buffer 的状态数据保存到ResultPartitionWriter里面去,继续处理了

note: 可以看到这种非对齐checkpoint,虽然第一个barrier接收到就能马上触发checkpoint不用柱塞,但是会增加很多的IO

因为要保存很多额外的数据,相比柱塞的方式,所以慎用!!!!

over

原文地址:https://www.cnblogs.com/ljygz/p/15793693.html