Flink:


Flink源码解析-从API到JobGraph - 李银辉的文章 - 知乎 https://zhuanlan.zhihu.com/p/22736103

用户通过api构造transformation存储到StreamExecutionEnvironment

  1. StreamExecutionEnvironment不存储SourceTransformation, 因为flink不允许提交只有Source的job,而根据其他类型的Transformation的input引用可以回溯到SourceTransformation。
  2. Stream可以分为两种类型,一种是继承DataStream类,另一种不继承;功能上的区别在于,前者产生transformation(flink会根据transformation的组织情况构建DAG),后者不产生transformation,但是会赋予Stream一些特殊的功能,例如:window, iterate, union等。下图反映的flink所有的stream类。 

    从类名字上也能推断每个类型Stream的功能。
  3. 一个job可以没有sink,其他operator也能良好工作。

 
env.execute("word count on flink");

做了两件事情:

  1. 把Transformation转化成StreamGraph。
  2. 把StreamGraph按照一定的原则切分成JobGraph。

从JobGraph到ExecutionGraph

上文提到,在客户的完成JobGraph的构建之后,将其通过akka提交给JobManager,接下来我们介绍下JobManager怎样按照JobGraph的规划进行任务调度。

JobManager收到客户端提交的JobGraph之后,会构建ExecutionGraph;ExecutionGraph的拓扑结构和JobGraph保持一致,只是把JobGraph重构成ExecutionJobGraph,其中按照JobVertex将顶点分装成ExecutionJobVertex,按照JobEdge将边封装成ExecutionAdge,还构建IntermediateResult(中间数据)用来描述节点之间的Data shuffle 

ExecutionJobGraph有下面几个特点:

  1. Partition的数量和上游节点的并行度保持一致。
  2. 下游节点在和上游节点建立连接时,只有POINTWISE和ALL_TO_ALL两种模式,事实上只有RescalePartitioner和ForwardPartitioner是POINTWISE模式,其他的都是ALL_TO_ALL。默认情况下如果不指定partitioner,如果上游节点和下游节点并行度一样为ForwardPartitioner,否则为RebalancePartioner ,前者POINTWISE,后者ALL_TO_ALL。

作者:cuteximi
链接:https://zhuanlan.zhihu.com/p/79409039
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
 

【重要资讯】

《史上超强阵容!大数据及人工智能领域顶级盛会,Flink Forward Asia 2019 不容错过!》《首届!Apache Flink 极客挑战赛强势来袭,重磅奖项等你拿,快来组队报名啦》

【入门教程】

《Apache Flink 零基础入门(一):基础概念解析》陈守元、戴资力 

《Apache Flink 零基础入门(二):开发环境搭建和应用的配置、部署及运行》沙晟阳(成阳) 

《Apache Flink 零基础入门(三):DataStream API 编程》崔星灿、高赟 

《Apache Flink 零基础入门(四):客户端操作的 5 种模式》周凯波(宝牛) 

《Apache Flink 零基础入门(五):流处理核心组件 Time&Window 深度解析》邱从贤(山智) 

《Apache Flink 零基础入门(六):状态管理及容错机制》孙梦瑶(美团点评) 

《Apache Flink 零基础入门(八): SQL 编程实践》伍翀(云邪)

【应用案例】

《OPPO数据中台之基石:基于Flink SQL构建实数据仓库》张俊(OPPO) 

《日均处理万亿数据!Flink在快手的应用实践与技术演进之路》董亭亭(快手)

《小红书如何实现高效推荐?解密背后的大数据计算平台架构》

《用Flink取代Spark Streaming!知乎实时数仓架构演进》知乎数据工程团队 

《58 集团大规模 Storm 任务平滑迁移至 Flink 的秘密》万石康、冯海涛

【开发者实践】

Apache Flink 1.9.0 为什么将支持 Python API ?孙金城(金竹)


玉兆的博客

Flink-分布式快照的设计-流程

 发表于 2018-01-29 |  分类于 Flink源码 |  |  阅读次数: 1151

Flink 通过 barrier 来协调 checkpoint 的时机,我们在介绍【Flink 网络栈】的时候已经有介绍:对于一个拓扑结构,只有上游算子 checkpoint 完成,下游算子的 checkpoint 才能开始并有意义,又因为下游算子的消费速率并不统一【有的 channel 快,有的 channel 慢】,barrier 就是这样一种协调上下游算子的机制。

stream_barriers

阅读全文 »

Flink 分布式快照的设计-存储

 发表于 2017-12-24 |  分类于 Flink源码 |  |  阅读次数: 1124

分布式快照是整个 Flink 计算框架中非常核心的模块,Flink 的 checkpoint、状态存储都依赖于其分布式快照;不仅框架自身借助于 chandy-lamda 算法,实现了算子状态的快照和恢复,也对用户暴露了一套简洁的状态存储的 API,用户无需关心快照自身的容错/扩展/一致性,这些 Flink 都已对用户透明;由于分布式快照这块的设计比较复杂,因此将拆成两篇文章来介绍,本篇文章主要介绍分布式快照存储部分的设计,下一篇会介绍快照的流程和细节。

代码参考版本: 1.5-SNAPSHOT。

核心模块

Flink 的分布式快照存储部分设计抽象出了大致 5 个层次:

  • 最底层是快照的物理存储,包括内存和文件系统两种形式
  • 再上层是 CheckpointStreamFactory:封装了具体的存储交互,也就是内存/文件系统读写
  • 再上层是 StateBackend:封装了工作状态的存储逻辑,包括内存和 RocksDB 两种形式
  • 再上层是 KeyedStateBackend:封装了快照的读写细节,快照分区策略等
  • 再上层是 State:封装了与 KeyedStateBackend 交互时状态的 val 序列化/反序列化 等逻辑

还有一个 StateContext 比较特殊,它不提供快照功能,只提供临时的状态读写,下面会讲到
核心模块的总体交互图:

checkpoint-sys-interact

阅读全文 »

Flink WindowOperator 的设计

 发表于 2017-10-18 |  分类于 Flink源码 |  |  阅读次数: 1075

本文将介绍 flink 1.3.1 版本 WindowOperator 的设计,主要涉及核心的抽象,角色功能以及之间的交互,因为 WindowOperator 的状态存储用到了 flink 的状态存储,所以这里会略带涉及状态存储的知识,具体细节会单独开一篇文章介绍。

阅读全文 »

Flink Kafka Connector 设计与实现

 发表于 2017-03-29 |  分类于 Flink源码 |  |  阅读次数: 1114

本文将介绍 flink 1.2 的 0.9.0 版本 kafka connector,主要讲解消费模型,快照策略/恢复等,尤其是为什么 flink kafka connector 借助于 flink 内核的分布式快照算法做到了 exact-once 语义

阅读全文 »

Flink AsyncIO 剖析

 发表于 2017-03-29 |  分类于 Flink源码 |  |  阅读次数: 765

AsyncIO 是由 flink 1.2 引入的新特性,其目的是优化流处理过程中外部服务访问的耗时造成的瓶颈问题

阅读全文 »

Flink 对用户代码异常处理

 发表于 2017-02-09 |  分类于 Flink源码 |  |  阅读次数: 707

flink 的架构在 flink 基本组件一节已经介绍过,其中的 TaskManager 负责监护 task 的执行,对于每个 task,flink 都会启动一个线程去执行,那么当用户的代码抛出异常时,flink 的处理逻辑是什么呢?

阅读全文 »

Flink Scheduler

 发表于 2017-02-09 |  分类于 Flink源码 |  |  阅读次数: 1552

前面已经介绍了一系列的 flink 任务抽象、网络传输、可靠性机制等细节,有了这些铺垫,终于可以开心的介绍 flink 的任务调度机制了

因为没有这些铺垫,就无法明白 flink 为什么要设计这样的一套调度机制!所以本章节讲解时会多穿插一些为什么

资源组

资源组模型

flink 的一个 Instance 可以被划分出多个 Slot,通过初始参数可以指定,他们既可以是 SimpleSlot,也可以是同时跑多个 task 的 SharedSlot,为了约束 task 之间的运行时的绑定关系,flink 抽象出了 SlotSharingGroup 和 CoLocationGroup 的概念。

一个 SlotSharingGroup 规定了一个 Job 的 DAG 图中的哪些 JobVertex 的 sub task 可以部署到一个 SharedSlot 上,这是一个软限制,并不是一定会满足,只是调度的时候有位置偏好,而 CoLocationGroup 是在 SlotSharingGroup 的基础上的硬限制,它限定了 CoLocationGroup 中的 JobVertex 中的 sub task 运行必须是一一对应的:假如 CoLocationGrou 限定了 JobVertex A 和 B ,那么 A 的编号为 i 的 sub task 必须和 B 的编号为 i 的 sub task 跑在一起。假如一个 job 的运算逻辑包括 source -> head -> tail -> sink,那么它的 task 运行时限制关系见下图:

flink-slot-group

阅读全文 »

Flink Watermark & Checkpoint

 发表于 2017-02-09 |  分类于 Flink源码 |  |  阅读次数: 1276

在前面一章 flink 网络栈的讲解中,我们介绍了 Barrier 的改变以及 Barrier 在 InputGate 消费数据的过程中扮演的时间对齐作用,同时,我们介绍了 InputProcessor 负责数据读取,同时会追踪 watermark 时间并分发到下游。这里我们从 InputProcessor 开始讲起,接着会介绍 checkpoint

阅读全文 »

Flink 网络栈

 发表于 2017-02-09 |  分类于 Flink源码 |  |  阅读次数: 1356

本章节主要介绍 flink 的网络交互,包括每个 task 的输入输出管理,内存分配和释放等,因为涉及到内存申请,这里会介绍 flink 的内存管理

阅读全文 »

TaskManager 基本组件

 发表于 2017-02-09 |  分类于 Flink源码 |  |  阅读次数: 793

TaskManager 在 Flink 中也被叫做一个 Instance,统一管理该物理节点上的所有 Flink job 的 tasks 运行,它的功能包括了 task 的启动销毁、内存管理、磁盘IO、网络传输管理等,本章将一一介绍这些功能,方面后续章节的开展

阅读全文 »

12

© Mon Oct 23 2017 08:00:00 GMT+0800 (CST) — 2018  chenyuzhao

由 Hexo 强力驱动

 | 

主题 — NexT.Mist



 


flink基本组件和逻辑计划生成

Flink是一个被誉为 the 4th G 的计算框架,不同的框架特性及其代表项目列表如下:

第一代 第二代 第三代 第四代
Batch BatchInteractive Batch Interactive Near-Real-TimeInterative-processing Hybrid Interactive Real-Time-StreamingNative-Iterative-processing
  DAG Dataflows RDD Cyclic Dataflows
Hadoop MapReduce TEZ Spark Flink

本文主要介绍flink核心组件以及物理计划的生成过程

将所有的 TS 起动起来,这样一个 flink 集群便构建出来了。下面附图解释下这个流程:

  1. flink cli 解析本地环境配置,启动 ApplicationMaster
  2. 在 ApplicationMaster 中启动 JobManager
  3. 在 ApplicationMaster 中启动YarnFlinkResourceManager
  4. YarnFlinkResourceManagerJobManager发送注册信息
  5. YarnFlinkResourceManager注册成功后,JobManagerYarnFlinkResourceManager发送注册成功信息
  6. YarnFlinkResourceManage知道自己注册成功后像ResourceManager申请和TaskManager数量对等的 container
  7. 在container中启动TaskManager
  8. TaskManager将自己注册到JobManager

接下来便是程序的提交和运行

DataStreamSource 是一个 DataStream 数据流抽象,StreamSource 是一个 StreamOperator 算子抽象,在 flink 中一个 DataStream 封装了一次数据流转换,一个 StreamOperator 封装了一个函数接口,比如 map、reduce、keyBy等。关于算子的介绍会另起一节:flink算子的声明周期

StreamTransformation是 flink关于数据流转换的核心抽象,只有需要 transform 的流才会生成新的DataStream 算子

这里会将 StreamTransformation 转换为 StreamNode,StreamNode 保存了算子的信

https://github.com/danny0405/flink-source-code-analysis/blob/master/flink/flink基本组件和JobGraph的生成/flink基本组件和逻辑计划生成.md


Flink执行计划生成

客户端的JobGraph生成之后,通过上面的LeaderRetrivalService获取JobManager的地址,接下来就是将JobGraph提交给JobManager去执行。flink 的核心进程通信是通过 Akka 来完成的JobManagerTaskManager都是一个 Akka system,所以这里的提交首先需要生成一个客户端actor与JobManager交互,然后执行rpc命令,

Graph

Edge

Vertex

  • 获取 JobEdge 的数据分发策略:如果非 shuffle 操作就是 DistributionPattern.POINTWISE 否则是 DistributionPattern.ALL_TO_ALL具体见代码:

JobManager执行计划生成

JobManager负责接收 flink 的作业,调度 task,收集 job 的状态、管理 TaskManagers。被实现为一个 akka actor。

客户端上传完 jar 包和JobGraph,flink 会进一步解析封装成运行时的执行计划ExecutionGraphJobManager的构造器在初始化的时候传入了很多组件,这里简单列举下功能方便后面的逻辑展开,具体的细节将会在下一节讲解。

  • BlobServer:实现了 BOLB server,其会监听收到的 requests,并会创建 目录结构存储 BLOBS 【持久化】或者临时性的缓存他们
  • InstanceManager:TaskManager在flink框架内部被叫做Instance,flink通过InstanceManager管理 flink 集群中当前所有活跃的 TaskManager,包括接收心跳,通知 InstanceListener Instance 的生成与死亡,一个典型的 InstanceListener 为 flink 的 Scheduler
  • BlobLibraryCacheManager:flink job 的 jar 包存储服务,使用上面的 BlobServer 完成。
  • MemoryArchivist备案已提交的flink作业,包括JobGraphExecutionGraph
  • ZooKeeperCompletedCheckpointStore:负责持久化 job 的 checkpoint 信息,一个 job 可以持久化多个 checkpoint,但只有最新的会被使用,具体方式为先在文件系统中持久化一份,再将文件句柄更新到 zk,并在 zk上依次递增节点路径号,zk 上保存了最近的 10 次 checkpoint
  • SavepointStore:flink 的状态存储,负责存储算子内部定义的状态,与 checkpoint 稍有区别,后者由 flink 框架来维护

JobManager中的基本组件

 

JobManager 是 flink 集群的中控节点,类似于 Apache Storm 的 Nimbus 以及 Apache Spark 的 Driver 的角色,它负责作业的调度、jar 包管理、checkpoint 的协调和发起等,为了后续章节的开展,本文将介绍 flink JobManager 中所部署的一些服务。

 

BlobServer

BlobStore

BlobCache

InstanceManager

ZooKeeperCompletedCheckpointStore


TaskManager基本组件

TaskManager 在 Flink 中也被叫做一个 Instance,统一管理该物理节点上的所有 Flink job 的 task 的运行,它的功能包括了 task 的启动销毁、内存管理、磁盘IO、网络传输管理等

MemoryManager 统一管理了 flink 的内存使用,内存被划分为相同大小的 segment,通过申请不同数量的 segment 来分配不同大小的内存

这里支持两种内存:on-heap 内存和 off-heap 内存,通过参数可以控制分配内存的种类

MemoryManager 管理内存也分两种模式:预分配和按需分配。预分配模式下,内存在启动时就会分好,这就会意味着不会发生 OOM 异常,释放的内存会重新归还 MemoryManager 的内存池;按需模式下,MemoryManager 仅仅追踪内存的使用【做记录】,释放内存不会归还 MemoryManager 的内存池,而是通过托管给 JVM 的垃圾回收来最终释放,这样便可能会发生 OOM

MemoryPool

MemoryPool 是 MemoryManager 用来统一管理资源的组件,具体又分为 HeapMemoryPool 和 HybridOffHeapMemoryPool,前者管理堆内存,后者管理非堆内存。

  • allocateNewSegment 走的是 on demand 模式,通过 new byte[] 从堆上分配内存
  • requestSegmentFromPool 走的是 pre allocate 模式,通过复用已有的堆对象

HybridOffHeapMemoryPool 的接口与其类似,不过分配内存走的是 ByteBuffer.allocateDirect(segmentSize); 直接分配了物理内存,也就是非堆内存

IOManager

flink 通过 IOManager 来控制磁盘 IO 的过程,提供同步和异步两种写模式【其实只有异步】,具体的读写方式又分为 block、buffer、bulk 三种方式;用户可以指定 IO 的文件目录集合,IOManager 会以 round-robin 的方式写不同目录的不同文件。

IOManager 的唯一实现类:IOManagerAsync 为每个人临时文件加【用户初始化的时候指定】维护了一个读线程和写线程,并且每个读写线程内部会维护一个请求队列: RequestQueue,上面的 FileIOChannel 通过将 读写请求加入到对应的 RequestQueue 中来实现文件读写

NetworkEnvironment

NetworkEnvironment 是每个 Instance 的网络 IO 组件,包含了追踪中间结果和数据交换的数据结构。它的构造器会统一将配置的内存先分配出来,抽象成 NetworkBufferPool 统一管理内存的申请和释放。

BufferPool

从 MemoryManager 的介绍中我们讲到 flink 是以 MemorySegment 为单位来管理内存的,而一个 MemorySegment 又被叫做一个 Buffer。BufferPool 是管理 Buffer 的工具。Buffer 的申请统一交给 NetworkBufferPool,具体的管理交给 LocalBufferPool。


Flink网络栈

本章节主要介绍 flink 的网络交互,包括每个 task 的输入输出管理,内存分配和释放等,因为涉及到内存申请,这里会介绍 flink 的内存管理

我们会从 flink 网络相关的核心抽象开始介绍

IntermediateResult

代表一个 Job Vertex 的中间执行结果,由于同一个 Job vertex 可能有多个线程并发执行,这里的 IntermediateResult 对应一个 Job Edge 的输出下游结果集,一个 IntermediateResult 包含多个 IntermediateResultPartition,一个 IntermediateResultPartition 对应一个并行任务 ExecutionVertex 的输出结果

InputGate

InputGate 是 flink 关于 task 的一个输入源的抽象,一个 InputGate 代表一个上游数据源,对应一个上游中间结果的一个或多个 partition

flink 依据中间结果的 producer 【生产task】的并发度来生成相应个数的 partition,每个 producer task 生产一个 partition,为了优化并发读写,flink 依据中间结果的消费者 task 的并发度进一步将每个 partition 划分为多个 subPartition

SingleInputGate 有是如何去建立和上游 ResultSubPartition 的连接关系的呢?SingleInputGate 在初始化的时候利用了关键的的参数 InputGateDeploymentDescriptor,其中包含了 partition 的一些关键信息,具体见 SingleInputGate line 502

注册 BufferPool

在 NetworkEnvironment 注册 Task 的时候会为每个 task 的每个 InputGate 申请一个 BufferPool:

InputProcessor

对于每个 task,flink 都会生成一个 InputProcessor,具体就是 StreamIputProcessor。StreamIputProcessor 干了两件事情:

  • 消息解序列化并交由算子处理
  • 追踪 Watermark Event 并分发时间对齐事件

时间 对齐会专门起一章讲解,这里只介绍 消息序列化 ^_^

Flink 将消息的序列化抽象为两个模块:

  • 内存申请和流数据读取:由 SpillingAdaptiveSpanningRecordDeserializer 负责,从名字就可以大致了解其会动态吐磁盘【当内存不够的时候】
  • 解序列化:由 NonReusingDeserializationDelegate 负责

NonReusingDeserializationDelegate 包含一个 serializer,依据是否需要发射水印区分为 MultiplexingStreamRecordSerializer 和 StreamRecordSerializer,这两个 serializer 只是在 TypeSerializer 上做了一层封装,这里就不做介绍了【TypeSerializer 是类型系统的重要组成,如果有时间的话我会单独开一章节介绍 flink 的类型系统】

这里重点讲一下流数据的读取部分,也就是 SpillingAdaptiveSpanningRecordDeserializer ###SpillingAdaptiveSpanningRecordDeserializer 每个 channel 都有一个 SpillingAdaptiveSpanningRecordDeserializer

前面提到,SpillingAdaptiveSpanningRecordDeserializer 主要负责流数据的读取,同时通过 NonReusingDeserializationDelegate 来解序列化,从而获得可以发送给算子处理的 StreamElement。SpillingAdaptiveSpanningRecordDeserializer 内部有两个实现类:NonSpanningWrapper 和 SpanningWrapper,前者将数据存储进内存,后者存储前者存不下的部分内存数据以及将超量数据吐到磁盘我们来解释下其核心方法:

####添加 Buffer

添加 Buffer 会首先 check spanningWrapper 中是否已有数据,如果有,说明 nonSpanningWrapper 中的数据已满,会继续走 spanningWrapper 添加数据,否则走 nonSpanningWrapper 添加数据。

 nonSpanningWrapper 或 spanningWrapper 都是从 buffer 中读取数据的,那么这个 buffer 是如何写入的呢?

答案是通过 BarrierHandler 写入

处理 barrier 的核心逻辑在 processBarrier 方法中:

在一轮 barrier 过程中,flink 接收每个 channel 的 barrier event,获取其 barrier id 与此轮之前最大的 id:checkpoint id 作比较:如果相等,则 block 对应的 channel,被 block 的 channel 在此轮中的数据会通过 BufferSpiller 吐到磁盘【大部分情况是 page cache】;如果大于 checkpoint id,会提升 checkpoint id 为此 id,并取消 block 所有的 channel,直接进入下一轮 barrier;如果小于,直接丢弃此事件;如果此轮所有的 channel 都发送了一致的 id,则以此 id 进行 checkpoint,并取消所有的 channel block

值得注意的是,每次取消所有的 channel block 都会将 BufferSpiller 中的数据暴露成 buffer sequence 并加入队列中,下次获取记录时会优先取 sequence 中的数据

列举几种典型的数据流向:

  • input-gate -> operator:消费顺畅,一轮无任何阻塞
  • input-gate -> BufferSpiller -> current-queue -> operator:相对于其它 channel,消费过快,此轮被吐到磁盘,下一轮再消费
  • current-queue -> operator:相对于其它 channel,消费过慢,被吐到磁盘,下一轮再消费

下面介绍数据的生产

在 flink 算子的声明周期一节我们介绍过:算子通过 RecordWriterOutput 将处理过的 record 写出去,下面我们来分析其行为

RecordWriterOutput 实现了接口 Output,具备以下功能接口:

  • collect:发射 record
  • emitWatermark:广播 watermark 到每个输出 channel
  • broadcastEvent:广播 Barrier Event 到每个输出 channel

它的内部维护了一个序列化器和真正的 RecordWriter【这里是 StreamRecordWriter】

这里补一张图,说明 flink 的消费与生产之间的关系:

https://github.com/danny0405/flink-source-code-analysis/blob/master/flink/flink网络栈/flink网络栈.md

感觉网络栈这篇挺核心的


在前面一章 flink 网络栈的讲解中,我们介绍了 Barrier 的改变以及 Barrier 在 InputGate 消费数据的过程中扮演的时间对齐作用,同时,我们介绍了 InputProcessor 负责数据读取,同时会追踪 watermark 时间并分发到下游。这里我们从 InputProcessor 开始讲起

为什么将 Watermark 和 Checkpoint 放在一起将,是因为它们在原理上有相似之处:上游节点逐级广播消息给下游节点来完成一次行为

WaterMark是什么

Watermark 是协调窗口计算的一种方式,它告诉了算子时间不大于 WaterMark 的消息不应该再被接收【如果出现意味着延迟到达】。WaterMark 从源算子开始 emit,并逐级向下游算子传递,算子需要依据自己的缓存策略在适当的时机将 WaterMark 传递到下游。当源算子关闭时,会发射一个携带 Long.MAX_VALUE 值时间戳的 WaterMark,下游算子接收到之后便知道不会再有消息到达。

Flink 提供三种消息时间特性:EventTime【消息产生的时间】、ProcessingTime【消息处理时间】 和 IngestionTime【消息流入 flink 框架的时间】,WaterMark 只在时间特性 EventTime 和 IngestionTime 起作用,并且 IngestionTime 的时间等同于消息的 ingestion 时间。

上面我们提到 WaterMark 最初由源算子负责发射到下游,那么它的生成规则是什么呢?又是如何协调的?

我们来看一个源算子的实现便知

在第一章 flink 逻辑计划生成,我们了解了 flink 所有的源算子都继承自 SourceFunction 接口,SourceFuntion 定义了管理消息发射环境的接口 SourceContext,SourceContext 的具体实现在 StreamSource 中,一共有三种:NonTimestampContext、AutomaticWatermarkContext、ManualWatermarkContext,我们来逐一分析。

CheckPoint 是 flink 保证消息不丢的机制,通过 Barrier 的方式来协调时机,那么什么是 Barrier 呢?

其实前一章介绍 flink 网络栈 的时候已经有介绍在消费端 flink 对于不同的 Barrier 处理,实际上,Barrier 是用来校准 checkpint 的方式。由于对于一个拓扑结构,只有上游算子 checkpoint 完,下游算子的 cehckpoint 才能开始并有意义,同时下游算子的消费速率并不统一【有的 channel 快,有的 channel 慢】,而 Barrier 就是这样一种协调上下游算子的机制。

JobManager 统一通知源算子发射 Barrier 事件,并向下游广播,当下游算子收到这样的事件后,它就知道自己处于两次 checkpoint 之间【一次新的 checkpoint 将被发起】

当下游算子收到了它所有的 InputChannel 的 Barrier 事件后,它便知道上游算子的一次 checkpoint 已完成,自己也可以做 checkpoint 了,完成之后继续将 checkpoint 事件广播到下游算子

在 Exact-once 语义下,消费端会延迟消费并校准不同 channel 的消费速率,这在 flink 网络栈一章有详细介绍!

附一张图描述交互过程:

Checkpoint 的存储和恢复

Checkpoint 的存储和恢复均是通过 AbstractStateBackend 来完成,AbstractStateBackend 有三个实现类,FsStateBackend 是通过 HDFS 来存储 checkpoint 状态,继承关系如下:

我们来看最常见的一种 FsStateBackend,AbstractStateBackend 内部通过 State 来管理状态数据,依据状态数据的不同特性,状态分为三种:

  • ValueState :最简单的状态,一个 key 一个单值 value,可以跟更新和删除
  • ListState:一个 key 对应一个 value list
  • ReducingState:一个 key 对应的 value 可以进行 reduce 操作
  • FoldingState:一个key,后续添加的值都会通过 folding 函数附加到第一个值上

AbstractStateBackend 内部通过 KvState 接口来管理用户自定义的 kv 数据,我们来看 FsValueState 的继承关系:

那么如何获取这些 State 呢?flink 抽象了另一套接口:StateDescriptor 来获取 State,通过绑定特定的 StateBackend 来获取。这样一层抽象,解耦了 State 的类型和底层的具体的存储实现。我们来看 StateDescriptor 的继承关系:

那么这些抽象是如何协调工作的呢?

 

上面的快照之行结束后,用户会获取 KvStateSnapshot 抽象,对于 FsState 来说,起内部封装了文件句柄以及序列化元数据等信息,同时提供了恢复快照的接口,其抽象关系如下:

flink 进一步将每个 task 的每个 operator 快照后获取的 KvStateSnapshot 封装成 StreamTaskState,并最终获取一个 StreamTaskState List【对应一个 task 的一组 operators】,分装成 StreamTaskStateList,随后通知 JobManager 的 CheckpointCoordinator:

JobManager 再将这些句柄的数据再快照到本地和zk,具体见 JobManager 基本组件。恢复的过程是逆向的,暂时就不分析了,有耐心的用户可以自行查看源码!


danny0405/flink-source-code-analysis

Flink算子的生命周期

前言

前面已经介绍了 flink 的逻辑计划、物理计划等相关信息,本文将重点介绍 flink 的 operator 以及运行时的 task,后续会介绍 flink task 的调度算法

算子

什么是一个算子

flink 中的一个 operator 代表一个最顶级的 api 接口,拿 streaming 来说就是,在 DataStream 上做诸如 map/reduce/keyBy 等操作均会生成一个算子

算子的生成

先来看 operator 的继承关系:

对于 Streaming 来说所有的算子都继承自 StreamOperator,StreamOperator 中定义了一系列的生命周期方法,同时也定义了 snapshot 的接口,AbstractStreamOperator 定义了基本的设置和声明周期方法,AbstractUdfStreamOperator 定义了用户自定义函数的生命周期和快照策略,这些接口的调用时机会在下面一一阐述?。

算子的生成触发于对 DataStream 的操作上,比如 map addSink等。

算子 chain

在 flink 基本组件和逻辑计划生成一节 我们介绍了 JobGraph 的生成过程,其中 JobGraph 的生成最大的意义在于做了一些算子的 chain 优化,那么什么样的节点可以被 chain 呢?如下图:

一些必须要经过 shuffle 的节点是 chain 或者 节点可达 的边界,非常类似于 Spark Streaming 中对于 Stage 的划分,上图中 keyBy 这样的 groupBy 操作就是划分是否可被 chain 的边界

在 StreamingJobGraphGenerator 的 createChain 方法中为每个 StreamNode 生成了一个 StreamConfig,并且对于可以生成 JobVertex 的节点[ chain 的起始节点 ]设置了如下属性:

//StreamingJobGraphGenerator line212
if (currentNodeId.equals(startNodeId)) {

   config.setChainStart();
   config.setChainIndex(0);
   config.setOutEdgesInOrder(transitiveOutEdges);
   config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

   for (StreamEdge edge : transitiveOutEdges) {
      connect(startNodeId, edge);
   }

   config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

}

上面的逻辑概括如下:

  • 标志本节点为 chain 的起始位置
  • 设置 chain 的索引
  • 设置可达输出边,就是与下游 JobVertex 直接连接的 StreamEdge
  • 设置自身的直接输出边 StreamEdge
  • 将本 JobVertex 与下游的 JobVertex 连接起来
  • 将被 chained 的可达的下游 StreamNode 的配置一同设置进本 JobVertex 的配置中,后面 task 运行时会用到

连接的逻辑如下:

//StreamingJobGraphGenerator line357
private void connect(Integer headOfChain, StreamEdge edge) {

   physicalEdgesInOrder.add(edge);

   Integer downStreamvertexID = edge.getTargetId();

   JobVertex headVertex = jobVertices.get(headOfChain);
   JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);

   StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

   downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

   StreamPartitioner<?> partitioner = edge.getPartitioner();
   if (partitioner instanceof ForwardPartitioner) {
      downStreamVertex.connectNewDataSetAsInput(
         headVertex,
         DistributionPattern.POINTWISE,
         ResultPartitionType.PIPELINED,
         true);
   } else if (partitioner instanceof RescalePartitioner){
      downStreamVertex.connectNewDataSetAsInput(
         headVertex,
         DistributionPattern.POINTWISE,
         ResultPartitionType.PIPELINED,
         true);
   } else {
      downStreamVertex.connectNewDataSetAsInput(
            headVertex,
            DistributionPattern.ALL_TO_ALL,
            ResultPartitionType.PIPELINED,
            true);
   }

   if (LOG.isDebugEnabled()) {
      LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
            headOfChain, downStreamvertexID);
   }
}

概括下逻辑:

  • 获取要连接的两个 JobVertex 对象
  • 设置下游 JobVertex 的输入 partition 算法,如果是 forward 或 rescale 的话为 POINTWISE,否则为全连接,也就是 shuffle,POINTWISE 的连接算法在 flink 物理计划生成 一节已经介绍,这里不再赘述

以上只是客户端生成逻辑计划时的算子 chain,在运行时算子的的 chain 被封装成了一个单独的对象 OperatorChain,里面在原有的基础上将 operators 的操作封装起来并且确定了下游的的输出入口

来看 OperatorChain 的核心实现

首先总结下构造器的功能:

  • 获取可达的 chain 的 StreamNode 配置
  • 为直接可达的输出 StreamEdge 分别创建一个 Output,这里为 RecordWriterOutput
  • 创建chain的入口
  • 如果创建有任何失败,释放掉 RecordWriterOutput 占用的资源,主要是内存 buffer,后面章节会介绍

这里的关键是算子 chain 的创建过程,见下图创建过程:

上图中 S 节点的下游 A/B/C 是可以与 S Chain 在一起的,D/E 是必须经过网络传输的节点,一个 OperatorChain 封装了图中的节点 S/A/B/C,也就是说上图可以被看做如下所示:

OperatorChain 中有两个关键的方法:createOutputCollector 和 createChainedOperator,前者负责获取一个 StreamNode 的输出Output,后者负责创建 StreamNode 对应的 chain 算子,两者相互调用形成递归,如上面的创建过程图,具体的流程如下:

  • 创建 S 的所有网络输出 RecordWriterOutput,这里会为 D 和 E 分别创建一个
  • 由于从 A 开始对于 S 是可被 chain 的,会递归创建从 C 开始
  • 先获取 C 的输出,这里为对应 D 的 RecordWriterOutput
  • 拿到 C 对应的 StreamOperator 并将 运行时的 StreamTask 和 Output 设置进去
  • 将 StreamOperator 封装成 ChainingOutput 并作为 Output 传给 B
  • B 将重复 C 的过程,直到 S/A/B/C 全部被创建

那么 S 发射一条消息后的处理流程是如何呢?

S 在调用 processElement 方法时会调用 output.collect,这里的 output 为 A 对应的 ChainingOutput,ChainingOutput 的 collect 调用了对应的算子 StreamOperator A 的 processElement 方法,这里又会调用 B 的 ChainingOutput 的 collect 方法,以此类推。这样便实现了可 chain 算子的本地处理,最终经由网络输出 RecordWriterOutput 发送到下游节点

算子的运行

flink 算子的运行牵涉到两个关键类 Task.java 和 StreamTask.java,Task 是直接受 TaskManager 管理和调度的,而 Task 又会调用 StreamTask,StreamTask 中封装了算子的处理逻辑

我们先来看 StreamTask

StreamTask 的 JavaDoc 上描述了其生命周期:

*  -- restoreState() -> restores state of all operators in the chain
*  
*  -- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> open-operators()
*        +----> run()
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()

StreamTask 运行之初会尝试恢复算子的 State 快照,然后由 Task 调用其 invoke 方法

下面重点分析一下其 invoke 方法的实现

  • 获取 headOperator,这里的 headOperator 在 StreamingJobGraphGenerator line 210 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);设置,对应上面算子 chain 中的 S 节点
  • 创建 operatorChain 并设置为 headOperator 的 Output
  • init()
  • restoreState
  • 执行 operatorChain 中所有 operator 的 open 方法
  • run()
  • 执行 operatorChain 中所有 operator 的 close 方法
  • 执行资源回收及 cleanup(),最主要的目的是回收内存 buffer

StreamTask 中还有关于 Checkpoint 和 StateBackup 的核心逻辑,这里先不介绍,会另开一篇?

我们来看 StreamTask 的实现类之一 OneInputStreamTask ,便可以知道 init() 和 run() 分别都做了什么:

init方法

  • 获取算子对应的输入序列化器 TypeSerializer
  • 获取输入数据 InputGate[],InputGate 是 flink 网络传输的核心抽象之一,其在内部封装了消息的接收和内存的管理,后面介绍 flink 网络栈的时候会详细介绍,这里只要了解从 InputGate 可以拿到上游传送过来的数据就可以了
  • 初始化 StreamInputProcessor
  • 设置一些 metrics 及 累加器

StreamInputProcessor 是 StreamTask 内部用来处理 Record 的组件,里面封装了外部 IO 逻辑【内存不够时将 buffer 吐到磁盘上】以及 时间对齐逻辑【Watermark】,这两个将会合并一节在下一章介绍^_^

run方法:

  • 从 StreamInputProcessor 中处理一条记录
  • check 是否有异常

真正的运行时类 Task

这里我们会详细的介绍下 Task 的核心逻辑

Task 代表一个 TaskManager 中所起的并行 子任务,执行封装的 flink 算子并运行,提供以下服务:消费输入data、生产 IntermediateResultPartition [ flink关于中间结果的抽象 ]、与 JobManager 交互

JobManager 分发 Task 时最初是抽象成了一个描述类 TaskDeploymentDescriptor,TaskManager 在抽到对应的 RPC 请求后会将 Task 初始化后将 线程 拉起,TaskDeploymentDescriptor 是提供 task 信息的核心抽象:

  • ResultPartitions:task 输出的 partition 数[ 通常和 JobVertex 的下游节点数对应 ]
  • InputGates:task 的输入中间结果 partition
  • operator-state:算子的状态句柄,由 TaskManager 上报给 JobManager,并统一维护
  • jar-files
  • class-paths

构造器的一些组件我们会在介绍 TaskManager 的时候再详述

其核心的运行方法 run()逻辑总结如下:

line408: run

  • 核心的运行逻辑
  • line429: 遇到错误后通知 TaskManager
  • line469: 从 NetworkEnvironment 中申请 BufferPool,包括 InputGate 的接收 pool 以及 task 的每个 ResultPartition 的输出 pool,申请的资源数[ num of Buffer ] 由 input channels 和 ResultSubPartition 数决定

关于网络管理[ 输入和输出 ] NetworkEnvironment,内存管理 MemoryManager 会分别开章节介绍

那么 StreamTask 是如何在 Task 中被实例化,又是如何被调用的呢?

//line 418
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
//一系列初始化操作 ...
//line 584
invokable.invoke();

上面的 invokable 就是 StreamTask,StreamTask 的继承关系:

那么具体是什么时候被 set 进去作为属性的呢?

在 StreamNode 生成的时候有这样一段逻辑:

public <IN, OUT> void addOperator(
      Integer vertexID,
      String slotSharingGroup,
      StreamOperator<OUT> operatorObject,
      TypeInformation<IN> inTypeInfo,
      TypeInformation<OUT> outTypeInfo,
      String operatorName) {

   if (operatorObject instanceof StoppableStreamSource) {
      addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
   } else if (operatorObject instanceof StreamSource) {
      addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
   } else {
      addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
   }

将 OneInputStreamTask 等 StreamTask 设置到 StreamNode 的节点属性中,同时在 JobVertex 的节点构造时也会做一次初始化:

jobVertex.setInvokableClass(streamNode.getJobVertexClass());

在 TaskDeploymentDescriptor 实例化的时候会获取 jobVertex 中的属性,见ExecutionVertex line673

算子初始化

那么算子是什么时候被初始化的呢?这就需要梳理下 StreamTask 的 init() 方法的处理时机,上面已经分析过 init() 方法会在 StreamTask 的 invoke() 方法中被调用,那么 invoke() 方法又是何时被调用的呢?这就涉及到另外一个重要的类 Task.java,Task 才是运行时真正直接被 TaskManager 实例化和调用的类,上面已经分析过 Task 的 run 方法,是 TaskManager 收到 rpc 命令后起起来的 具体的细节会另起一章 flink 任务分发

算子销毁

StreamTask 下执行完 invoke 方法之后[意味着流程正常结束或者有异常打断],会执行下面这段逻辑:

/**
 * Execute the operator-specific {@link StreamOperator#dispose()} method in each
 * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
 * from <b>tail to head</b> operator in the chain.
 */
private void tryDisposeAllOperators() throws Exception {
   for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
      if (operator != null) {
         operator.dispose();
      }
   }
}

所以,算子中有任何 hook 函数或者必须执行的销毁工作可以写在 dispose 方法里,这段逻辑是 flink 保证一定可以执行到的


flink 对用户代码异常的处理

前言

flink 的架构在 flink 基本组件一节已经介绍过,其中的 TaskManager 负责监护 task 的执行,对于每个 task,flink 都会启动一个线程去执行,那么当用户的代码抛出异常时,flink 的处理逻辑是什么呢?

异常后的组件通信

flink 的 task 的 Runnable 类是 Task.java,我们观察到它的 run() 方法真个被一个大的 try catch 包住,我们重点关注 catch 用户异常之后的部分:


 

简单总结其逻辑:

  • 如果当前的执行状态是 ExecutionState.RUNNING 或者 ExecutionState.DEPLOYING,表明是从正常运行到异常状态的过度,这时候判断是主动 Cancel 执行,如果是,执行 StreamTask 的 cancel 方法, 并通知观察者它的状态已变成:ExecutionState.CANCELED;如果不是主动 Cancel,表明是用户异常触发,这时候同样执行 StreamTask 的 cancel 方法,然后通知观察者它的状态变成:ExecutionState.FAILED,这里的 cancel 方法留给 flink 内部的算子来实现,对于普通 task ,会停止消费上游数据,对于 source task,会停止发送源数据

  • 对于用户异常来说,通知观察者的状态应该为 ExecutionState.FAILED我们下面详细分析

  • finally 的部分会释放掉这个 task 占有的所有资源,包括线程池、输入 InputGate 及 写出 ResultPartition 占用的全部 BufferPool、缓存的 jar 包等,最后通知 TaskManager 这个 Job 的 这个 task 已经执行结束:

    notifyFinalState()

  • 如果异常逻辑发生了任何其它异常,说明是 TaskManager 相关环境发生问题,这个时候会杀死 TaskManager

通知TaskManager

上面提到,finally 的最后阶段会通知 TaskManager,我们来梳理逻辑:


 

总结其逻辑:

  • 在一些合法性 check 之后,TaskManager 会给自己发送一条路由消息:UpdateTaskExecutionState,TaskManager 继而将这条消息转发给 JobManager
  • JobManager 会标志 Job 状态为 FAILING 并通知 JobCli,并且立即停止所有 task 的执行,这时候 CheckpointCoordinator 在执行 checkpoint 的时候感知到 task 失败状态会立即返回,停止 checkpoint

异常后的资源释放

主要包括以下资源:

  • 网络资源:InputGate 和 ResultPartiton 的内存占用
  • 其他内存:通过 MemoryManager 申请的资源
  • 缓存资源:lib 包和其他缓存
  • 线程池:Task 内部持有

前面已经介绍了一系列的 flink 任务抽象、网络传输、可靠性机制等细节,有了这些铺垫,终于可以开心的介绍 flink 的任务调度机制了,也是不易^_^

因为没有这些铺垫,就无法明白 flink 为什么要设计这样的一套调度机制!所以本章节讲解时会多穿插一些为什么

资源组

资源组模型

flink 的一个 Instance 可以被划分出多个 Slot,通过初始参数可以指定,他们既可以是 SimpleSlot,也可以是同时跑多个 task 的 SharedSlot,为了约束 task 之间的运行时的绑定关系,flink 抽象出了 SlotSharingGroup 和 CoLocationGroup 的概念。

一个 SlotSharingGroup 规定了一个 Job 的 DAG 图中的哪些 JobVertex 的 sub task 可以部署到一个 SharedSlot 上,这是一个软限制,并不是一定会满足,只是调度的时候有位置偏好,而 CoLocationGroup 是在 SlotSharingGroup 的基础上的硬限制,它限定了 CoLocationGroup 中的 JobVertex 中的 sub task 运行必须是一一对应的












原文地址:https://www.cnblogs.com/cx2016/p/12926169.html