Stream Processing with Apache Flink中文版--第3章 Apache Flink的体系结构

前一章讨论了分布式流处理的重要概念,比如并行化、时间和状态。在本章中,我们将介绍Flink的体系结构,并描述Flink如何处理我们之前讨论过的流处理方面的问题。特别地,我们解释了Flink的流程架构及其网络堆栈的设计。我们将展示Flink如何处理流应用程序中的时间和状态,并讨论其容错机制。本章提供了使用Apache Flink成功实现和操作高级流处理应用程序的相关背景信息。它将帮助您理解Flink的内部原理,并深入理解流应用程序的性能和行为。

系统架构

Flink是一个用于有状态并行数据流处理的分布式系统。Flink集群由分布在多台机器上的多个进程组成。分布式系统需要解决的常见挑战是集群中计算资源的分配和管理、流程协调、持久可用的数据存储和故障恢复。

Flink本身不能实现所有需要的功能。相反,它侧重于其核心功能——分布式数据流处理——并利用现有的集群基础设施和服务。Flink与集群资源管理器(如Apache Mesos、YARN和Kubernetes)紧密集成,但也可以配置为作为独立集群运行。Flink不提供持久的分布式存储。相反,它支持分布式文件系统(如HDFS)或对象存储(如S3)。对于高可用性设置中的Leader选举,Flink依赖于Apache ZooKeeper。

在本节中,我们将描述组成Flink集群的不同组件,并讨论它们的职责以及它们如何相互交互以执行应用程序。我们将介绍两种不同的Flink应用程序部署方式,并讨论如何分配和执行任务。最后,我们解释了Flink的高可用模式是如何工作的。

Flink集群组件

Flink集群由四个不同的组件组成,它们共同执行流应用程序。这些组件是JobManager、ResourceManager、TaskManager和Dispatcher。因为Flink是用Java和Scala实现的,所以所有组件都运行在Java虚拟机(JVM)上。我们将讨论每个组件的职责以及它如何与其他组件交互。

  • JobManager是控制单个应用程序执行的master进程,即,每个应用程序由不同的JobManager控制。JobManager接收要执行的一个应用程序。该应用程序由一个JobGraph、一个逻辑数据流图(参见第2章)和一个JAR文件组成,JAR文件打包了所有必需的类、库和其他资源。JobManager将JobGraph转换为名为ExecutionGraph的物理数据流图,其中包含可以并行执行的任务。JobManager要求ResourceManager提供必要的资源(TaskManager slots)来执行任务, 一旦它接收到足够的TaskManager slots,就会将ExecutionGraph中的任务分配给执行它们的TaskManager。在执行期间,JobManager负责所有需要中心协调的操作,比如检查点的协调(请参阅“检查点、保存点和状态恢复”)。

  • Flink为不同的环境和资源提供者(如YARN、Mesos、Kubernetes和standalone部署)提供多种ResourceManager实现。ResourceManager负责管理TaskManager slots,即Flink的计算资源单元。当JobManager请求TaskManager slots时,ResourceManager请求具有空闲slots的TaskManager,将它们提供给JobManager。如果没有足够的slots来满足JobManager的请求,ResourceManager可以与资源提供者进行对话,让资源提供者提供容器启动新的TaskManager进程。ResourceManager还负责杀死空闲的TaskManagers来释放计算资源。

  • TaskManagers是Flink的工作(worker)进程。通常,在Flink 集群中运行多个TaskManagers。TaskManagers提供一定数量的slots。slots的数量限制了TaskManager可以执行的任务数量。TaskManagers启动之后,向ResourceManager注册它的slots。当收到ResourceManager指令时,TaskManager向JobManager提供一个或多个slots。然后JobManager可以将任务分配给slots来执行它们。在执行期间,TaskManager与其他运行相同应用程序任务的TaskManager交换数据。任务的执行和slots的概念将在后面的小节中详细讨论。

  • Dispatcher跨作业执行运行,并提供一个REST接口来提交执行应用程序。接收到应用程序后,启动JobManager并将应用程序交给它。REST接口使dispatcher能够作为防火墙集群的入口点。dispatcher还运行一个web仪表板,提供关于过去作业执行的信息。根据应用程序提交执行的方式("应用程序部署"章节中讨论),单独一个Dispatcher不是必须的。

当提交一个应用程序到Flink集群时,图3-1显示了这些组件是如何交互的。

图片

请注意,图3-1是可视化组件职责和交互的高层示意图。根据环境(yarn、Mesos、Kubernetes、独立集群)的不同,可以省略一些步骤,或者组件可以在相同的进程中运行。例如,在独立集群中,即在没有资源提供程序的情况下,ResourceManager只能分发手动启动的任务管理器的slots,而不能启动新的任务管理器。在第9章中,我们将讨论如何为不同的环境设置和配置Flink。

应用程序部署

Flink应用程序可以以两种不同的方式部署。

  • 框架(Framework)模式:在这种模式下,Flink应用程序被打包到一个JAR文件中,并由client提交给正在运行的服务。服务可以是Flink Dispatcher、Flink作业管理器或YARN的资源管理器。在任一情况下,都有一个服务在运行,它接受Flink应用程序并确保它被执行。如果将应用程序提交给JobManager,它将立即开始执行应用程序。如果应用程序被提交给Dispatcher或YARN ResourceManager,它将启动JobManager,提交应用程序,JobManager继续执行应用程序。

  • 库(Library)模式:在这种模式下,Flink应用程序被绑定到特定于应用程序的容器映像中,例如Docker映像。该映像还包括运行JobManager和ResourceManager的代码。当容器从映像启动时,它会自动启动ResourceManager和JobManager并移交绑定的作业。此外,job独立的映像(image)用于部署TaskManagers容器。当容器启动时,它会自动启动一个TaskManager,它连接到ResourceManager并注册它的slots。TaskManager映像可以是独立于应用程序的。通常,Kubernetes之类的外部资源管理器负责启动映像,确保在发生故障时重新启动容器。

framework模式遵循通过client向运行中的服务提交应用程序(或查询)的传统方法。在library模式中,没有Flink服务持续运行。相反,在容器映像中,Flink与应用程序绑定为一个库。这种部署模式在微服务体系结构中也很常见。我们将在第10章中更详细地讨论应用程序部署的主题。

任务执行

TaskManager可以同时执行多个任务。这些任务可以属于相同的操作符(数据并行性)、不同的操作符(任务并行性),甚至来自不同的应用程序(作业并行性)。TaskManager提供一定数量的slots来控制并发执行的任务数量。slots能够执行应用程序的一个部分,即,每个应用程序操作符的一个任务。图3-2显示了TaskManagers, slots, tasks, 和operators操作符之间的关系。

图片

在图3-2的左侧,可以看到一个jobgraph—应用程序的非并行表示,它由5个操作符组成。操作符A和C是源,操作符E是汇聚。算子C和E的并行度是2。其他操作符的并行度为4。因为最大的运算符并行度是4,所以应用程序需要至少4个可用的处理槽才能执行。给定两个taskmanager,每个taskmanager有两个处理槽,就满足了这个需求。JobManager将JobGraph并行化为一个ExecutionGraph,并将任务分配给四个可用的插槽。将并行度为4的操作符的任务分配给每个槽。操作符C和E的两个任务分别分配给槽1.1和2.1,槽1.2和2.2。将任务作为片分配到插槽的优点是许多任务都集中在TaskManager上,这意味着它们可以在相同的进程中高效地交换数据,而不需要访问网络。然而,过多的协同任务也会使任务管理器超载,导致性能下降。在“控制任务调度”中,我们讨论了如何控制任务的调度。

TaskManager在同一个JVM进程中用多线程执行它的任务。线程比单个进程更轻量,通信成本更低,但并不严格地将任务彼此隔离。因此,一个出现问题的任务可以杀死整个TaskManager进程,以及在TaskManager上运行的所有任务。因此,可以跨TaskManager隔离应用程序,即TaskManager只运行一个应用程序的任务。通过利用TaskManager内部的线程并行性和为每个主机部署多个TaskManager进程的选项,Flink提供了很多灵活性,可以在部署应用程序时平衡性能和资源隔离。我们将在第9章详细讨论Flink集群的配置和设置。

配置高可用

流处理应用程序通常设计为24/7运行。因此,即使所涉及的进程失败,它们的执行也不要停止,这一点很重要。从失败中恢复包括两个方面,首先重新启动失败的进程,然后重新启动应用程序并恢复其状态。在本节中,我们将解释Flink如何重新启动失败的进程。本章后面的部分将讨论如何恢复应用程序的状态。

TASKMANAGER运行失败

如前所述,Flink需要足够数量的处理槽(slots)来执行应用程序的所有任务。如果Flink设置有四个taskmanager,每个taskmanager提供两个插槽,那么流应用程序的最大并行度为8。如果其中一个任务管理器失败,可用插槽的数量将下降到6个。在这种情况下,JobManager将要求ResourceManager提供更多的处理插槽。如果这是不可能的—例如,因为应用程序在standalone集群中运行—JobManager在有足够的插槽可用之前无法重新启动应用程序。应用程序的重新启动策略决定了JobManager重新启动应用程序的频率以及两次重新启动尝试之间的等待时间。

JOBMANAGER 运行失败

比TaskManager失败更具挑战性的问题是JobManager失败。JobManager控制流应用程序的执行,并保存关于其执行的元数据,例如指向已完成检查点的指针。如果关联的JobManager进程宕机,则流应用程序无法继续处理,从而使JobManager成为Flink中的单点故障。为了克服这个问题,Flink提供了一个highavailability模式,该模式将作业的元数据迁移到另一个JobManager,以防原来的JobManager宕机。

Flink的高可用性模式基于Apache ZooKeeper,这是一个提供协调和一致性的分布式服务系统。Flink使用ZooKeeper进行leader选举,并作为一个高可用性和持久的数据存储。在高可用性模式下操作时,JobManager将JobGraph和所有需要的元数据(如应用程序的JAR文件)写入远程持久存储系统。此外,JobManager将指向存储位置的指针写入ZooKeeper的数据存储中。在应用程序执行期间,JobManager接收各个任务检查点的状态句柄(存储位置)。当完成一个检查点,即,所有任务都成功地将它们的状态写入远程存储时,JobManager将状态句柄写入远程存储,并将指向此位置的指针写入ZooKeeper。因此,从JobManager故障恢复所需的所有数据都存储在远程存储中,ZooKeeper持有指向存储位置的指针。图3-3说明了这种设计。

图片

当JobManager失败时,属于其应用程序的所有任务都会自动取消。一个新的JobManager接管失败master的工作,执行以下步骤。

  1. 向ZooKeeper请求存储位置,以便从远程存储获取应用程序最后一个检查点的JobGraph、JAR文件和状态句柄。

  2. 从ResourceManager请求处理slots来继续执行应用程序。

  3. 重新启动应用程序,并将所有任务的状态重置为最后一个完成的检查点。

在容器环境(如Kubernetes)中作为库部署运行应用程序时,可以自动重新启动失败的JobManager或TaskManager容器。当在YARN或Mesos上运行时,Flink的剩余进程将触发JobManager或TaskManager进程的重新启动。在独立集群中运行时,Flink不提供重启失败进程的工具。因此,运行备用的jobmanager和TaskManager很有用,它们可以接管失败进程的工作。我们将在后面的第9章中讨论高可用性的Flink配置。

Flink中的数据传输

正在运行的应用程序的任务不断地交换数据。taskmanager负责将数据从发送任务发送到接收任务。TaskManager的网络组件在发送之前收集数据到缓冲区中,即记录不是一个接一个地传送,而是成批地传送到缓冲区中。该技术是有效利用网络资源,实现高吞吐量的基础。该机制类似于网络或磁盘IO协议中使用的缓冲技术。注意:缓冲区中的装运记录确实暗示了Flink的处理模型是基于微批量的。

每个TaskManager都有一个网络缓冲区池(默认大小为32KB),用于发送和接收数据。如果发送方和接收方任务在单独的TaskManager进程中运行,它们通过操作系统的网络堆栈进行通信。流式应用程序需要以流水线方式交换数据,即,每对taskmanager维护一个永久的TCP连接来交换数据。在shuffle连接模式下,每个发送方任务需要能够向每个接收方任务发送数据。TaskManager需要为每个接收任务提供一个专用的网络缓冲区,任何任务都需要将数据发送给这个缓冲区。一旦缓冲区被填满,它就通过网络传送到接收任务。在接收端,每个接收任务对应的每个发送任务连接,都需要一个网络缓冲区。图3-4显示了这个架构。

图片

图中显示了四个发送方任务和四个接收方任务。每个发送方任务都有四个网络缓冲区来发送数据给每个接收方任务,每个接收方任务都有四个缓冲区来接收数据。需要发送给其他TaskManager的缓冲区通过相同的网络连接进行多路复用。为了实现流畅的流水线数据交换,TaskManager必须能够提供足够的缓冲区,以便同时为所有传出和传入的连接提供服务。在shuffle或广播连接的情况下,每个发送任务需要为每个接收任务提供一个缓冲区。即,所需要的缓冲区数量是所涉及操作符的并行度的平方。Flink对网络缓冲区的默认配置对于中小型的设置来说已经足够了。对于较大的设置,您需要按照“主内存和网络缓冲区”中的描述调整配置。

如果发送方和接收方任务在相同的TaskManager进程中运行,发送方任务将输出记录序列化到一个字节缓冲区中,并在缓冲区填满后将其放入队列中。接收任务从队列中取出缓冲区并反序列化传入的记录。因此,不涉及网络通信。在TaskManager-local任务之间序列化记录的优点是它可以解耦任务,并允许在任务中使用可变对象,这可以显著提高性能,因为它减少了对象实例化和垃圾收集。对象一旦序列化,就可以安全地修改它。

另一方面,序列化会导致大量的计算开销。因此,在某些条件下,Flink可以将多个DataStream操作符链接到一个任务中。同一任务中的操作符通过嵌套函数调用传递对象进行通信,从而避免了序列化。第10章详细讨论了算子链的概念。

Flink采用不同的技术来降低任务之间的通信成本。在下面的部分中,我们将简要讨论基于信约的流控制和任务链。

基于信约的流控制

通过网络连接发送单独的记录是低效的,并且会导致大量的开销。缓冲(Buffering)需要充分利用网络连接的带宽。在流处理上下文中,缓冲的一个缺点是增加了延迟,因为记录是在缓冲区中收集的,而不是立即发送。

Flink实现了一个基于信约的流控制机制,其工作原理如下。一个接收任务向发送任务授予一定的信任,即保留一定数量的网络缓冲区,用于接收发送任务发送的数据。一旦发送方收到信用通知,它就会发送与所授缓冲区一样多以及一样大小的缓冲区——即已填充并准备发送的网络缓冲区的数量。接收方使用保留的缓冲区处理已发送的数据,并使用发送方的积压大小为所有已连接的发送方,确定下一个信用授予的优先级。

基于信约的流控制减少了延迟,因为发送方可以在接收方有足够的资源接受数据时发送数据。此外,在数据分布不均匀的情况下,它是分配网络资源的一种有效机制,因为信约是根据发送方的backlog大小来授予的。因此,基于信约的流控制是Flink实现高吞吐量和低延迟的一个重要构件。

任务链

Flink提供了一种称为任务链的优化技术,它可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须使用相同的并行度配置两个或多个操作符,并通过本地转发通道进行连接。图3-5中显示的操作符管道满足了这些要求。它由三个操作符组成,所有操作符都配置为两个任务并行性,并使用本地转发通道进行连接。

图片

图3-6描述了如何使用任务链执行管道(pipeline)。操作符函数被融合到一个由单个线程执行的任务中。一个函数产生的记录通过一个简单的方法调用被单独传递给下一个函数。因此,在函数之间传递记录基本上不需要序列化和通信成本。

图片

任务链可以显著降低本地任务之间的通信成本,但是也有不使用任务链执行管道的情况。例如,可以将一个长管道的链式任务断开,或者将一个链分成两个任务,进而将一个昂贵的函数调度到不同的slots。图3-7显示了在没有任务链接的情况下执行的相同管道。所有函数都由运行在独立线程中的单个任务进行计算。

图片

任务链在Flink中是默认启用的。在“控制任务链”中,我们展示了如何禁用应用程序的任务链,以及如何控制单个操作符的任务链行为。


[^]    //以下是旧版书籍中的内容------------------------------开始 

高吞吐量和低延迟

通过网络连接发送单独的记录是低效的,并且会导致大量的开销。缓冲区是充分利用网络连接带宽的一种强制性技术。在流处理上下文中,缓冲的一个缺点是增加了延迟,因为记录是在缓冲区中收集的,而不是立即发送。如果发送方任务很少为特定的接收任务生成记录,则可能需要很长时间才能填充和发送相应的缓冲区。这将导致较高的处理延迟,所以Flink确保每个缓冲区在一段时间之后被发送,而不管它被填充了多少。这个超时可以解释为网络连接所增加的延迟的上限。但是,阈值并不能作为整个作业的严格延迟SLA,因为一个作业可能涉及多个网络连接,而且阈值也不能解释实际处理造成的延迟。

背压的流程控制

接收大数据量的流式应用程序很容易出现这样的情况,即任务无法按其到达的速度处理其输入数据。如果输入流的容量对于分配给某个操作符的资源量来说过高,或者操作符的输入速率显著变化并导致高负载峰值,则可能发生这种情况。不管操作符不能处理其输入的原因是什么,这种情况都不应该成为流处理器终止应用程序的原因。相反,流处理器应该优雅地调整流应用程序接收输入的速度,使其达到应用程序处理数据的最大速度。有了适当的监视基础设施,就可以很容易地检测到流瓶颈情况,通常可以通过添加更多的计算资源和增加瓶颈操作符的并行性来解决这个问题。上述的流程控制技术称为反压,是流处理引擎的一个重要特性。

由于其网络层的设计,Flink内置支持背压。图3-5说明了当接收任务无法按照发送方任务发出的速率处理其输入数据时,网络堆栈的行为。

图片

图中显示了在不同机器上运行的发送方任务和接收方任务。

  1. 当应用程序的输入速率增加时,发送方任务的处理速率可以跟上,但是接收方任务开始落后,并且不再能够按照记录到达的速率处理记录。现在,接收任务管理器开始将接收到的数据放入缓冲区进行排队。在某个时候,接收TaskManager的缓冲池已经用完,不能继续缓冲到达的数据。

  2. 发送任务管理器开始将输出记录放入缓冲区进行排队,直到它自己的缓冲池为空(数据发送完成)。

  3. 最后,发送方任务在新的缓冲区可用之前不能发出更多的数据块。由于接收速度慢而被阻塞的任务,表现得像一个slow receiver,从而使它的前一个任务变慢,并会影响到流应用程序的源。最终,整个应用程序的速度降低到最慢操作符的处理速度。

[^]    //以上是旧版书籍中的内容------------------------------结束 

事件时间处理

在“时间语义”中,我们强调了时间语义对于流处理应用程序的重要性,并解释了处理时间和事件时间之间的区别。虽然处理时间很容易理解,因为它基于处理机器的本地时间,但是它会产生一些任意的、不一致的和不可复制的结果。相反,事件时间语义产生可重复的和一致的结果,这是许多流处理用例的严格要求。然而,与具有处理时间语义的应用程序相比,事件时间应用程序需要额外的配置。此外,支持事件时间的流处理器的内部机制比纯粹在处理时间内操作的系统的内部机制更复杂。

Flink为常见的事件处理操作提供了直观且易于使用的原语,但也提供了表现力丰富的api,以便使用自定义操作符实现更高级的事件时间应用程序。对于这样的高级应用程序,很好地理解Flink的内部时间处理通常是有帮助的,有时也是必需的。前一章介绍了Flink利用两个概念来提供事件时间语义:记录时间戳和水印。下面,我们将描述Flink如何在内部实现和处理时间戳和水印,以支持具有事件时间语义的流应用程序。

时间戳

由Flink事件时间流应用程序处理的所有记录必须附带一个时间戳。时间戳将记录与特定的时间点关联起来,通常是记录表示的事件发生的时间点。但是,只要流记录的时间戳大致随着流的前进而上升,应用程序就可以自由地选择时间戳的含义。正如在“时间语义”中所看到的,基本上在所有实际用例中都给出了一定程度的时间戳的无序性。

当Flink以事件时间模式处理数据流时,它根据记录的时间戳计算基于时间的操作符。例如,时间窗口操作符根据相关的时间戳将记录分配给窗口。Flink将时间戳编码为16字节Long值,并将它们作为元数据附加到记录中。它的内置操作符将Long值解释为精度为毫秒的Unix时间戳(从1970-01-01-00:00:00.000开始的毫秒数)。但是,自定义操作符可以有自己的解释,例如,可以将精度调整为微秒。

水印

除了记录时间戳外,Flink事件时间应用程序还必须提供水印。在事件时间应用程序中,使用水印派生每个任务的当前事件时间。基于时间的操作符使用这个时间来触发计算并取得进展。例如,时间窗口任务完成窗口计算,并在任务事件时间通过窗口的结束边界时输出结果。

在Flink中,水印被实现为特殊的记录(带有一个Long值时间戳的记录)。带有注释的时间戳的常规记录流中的水印如图3-8所示。

图片

水印有两个基本属性:

  1. 它们必须是单调递增的,以确保任务的eventtime时钟是前进的,而不是后退的。

  2. 它们与记录的时间戳相关。带有时间戳T的水印表示所有后续记录都应该具有时间戳> T。

第二个属性用于处理具有无序记录时间戳的流,例如图3-8中具有时间戳3和5的记录。基于时间的操作符的task收集和处理可能具有无序时间戳的记录,并在其eventtime时钟指示:不再需要具有相关时间戳的记录时,完成计算。当一个任务接收到一条违反了水印属性的记录,并且该记录的时间戳比之前接收到的水印小,那么它所属的计算可能已经完成。这样的记录称为延迟记录。Flink提供了处理延迟记录的不同方法,这些方法将在“处理延迟数据”中讨论。

水印的一个有趣的特性是,它们允许应用程序控制结果的完整性和延迟。与记录的时间戳非常接近的水印会导致较低的处理延迟,因为任务在完成计算之前只会短暂地等待更多记录的到来。同时,结果的完整性可能会受到影响,因为相关的记录可能不会包含在结果中,并且会被认为是延迟记录。相反,非常保守的水印增加了处理延迟,但提高了结果的完整性。

水印传播和事件时间

在本节中,我们将讨论操作符如何处理水印。Flink将水印实现为特殊的记录,由操作符task接收和发出。任务有一个内部时间服务,它维护计时器,并在收到水印时激活。任务可以在定时器服务中注册定时器,以便在将来某个特定的时间点执行计算。例如,窗口操作符为每个活动窗口注册一个定时器,当事件时间超过窗口的结束时间时,定时器将清除窗口的状态。

当任务(task)收到水印时,会执行以下操作:

  1. 任务根据水印的时间戳更新其内部事件时间时钟。

  2. 任务的时间服务用小于更新事件时间的时间标识所有定时器。对于每个过期的定时器,该任务调用一个回调函数,该函数可以执行计算并发出记录。

  3. 任务发出带有更新事件时间的水印。

                                         请注意
Flink限制通过DataStream API访问时间戳或水印。函数不能读取或修改记录时间戳和水印,但处理函数可以读取当前处理的记录的时间戳,请求操作符的当前事件时间,并注册定时器。这些函数都没有公开API来设置输出记录的时间戳、操作任务的事件时间时钟或发出水印。相反,基于时间的DataStream操作符task配置发出记录的时间戳,以确保它们与发出的水印正确对齐。例如,timewindow操作符任务,在其使用触发窗口计算的时间戳输出水印之前,将窗口的结束时间作为时间戳附加到窗口计算输出的所有记录上。

现在,让我们更详细地解释任务在接收新水印时如何发出水印并更新其事件时间时钟。正如您在“数据并行性和任务并行性”中看到的,Flink将数据流分割为分区,并通过一个单独的操作符task并行处理每个分区。每个分区都是带有时间戳的记录和水印的流。根据操作符与前一个或后一个操作符的连接方式,其任务可以从一个或多个输入分区接收记录和水印,并将记录和水印发送到一个或多个输出分区。在下面,我们将详细描述一个任务如何向多个输出任务发出水印,以及如何使用从输入任务接收的水印推进其事件时间时钟。

一个task为每个输入分区维护一个分区水印。当它从一个分区接收一个水印时,它将相应的分区水印更新为接收值和当前值的最大值。随后,任务将其事件时间时钟更新为所有分区水印的最小值。如果事件时间时钟前进,任务将处理所有触发的定时器,最终,通过向所有连接的输出分区发出相应的水印的方式,将其新的事件时间广播给所有下游任务。

图3-9显示了具有四个输入分区和三个输出分区的任务如何接收水印、更新其分区水印和事件时间时钟并发出水印。

图片

具有两个或多个输入流(如Union或CoFlatMap,请参阅“多流转换”)的操作符的任务还将其事件时间时钟计算为所有分区水印的最小值——它们不区分不同输入流的分区水印。因此,两个输入流的记录都基于相同的事件时间时钟进行处理。如果应用程序的各个输入流的事件时间不一致,则此行为可能导致问题。

Flink的水印处理和传播算法确保操作符task输出正确对齐的时间戳记录和水印。然而,它依赖于这样一个事实,即所有的分区不断地提供递增的水印。一旦一个分区没有推进它的水印,或者变得完全空闲,没有发送任何记录或水印,任务的事件时间时钟将不会推进,任务的定时器将不会触发。对于依赖于前进时钟执行计算并清理其状态的基于时间的操作符来说,这种情况是有问题的。因此,如果任务没有定期从所有输入任务接收新水印,则基于时间的操作符的处理延迟和状态大小可能会显著增加。

类似的效果也出现在两个输入流的操作符上,它们的水印明显不同。具有两个输入流的任务的事件时间时钟将对应于较慢流的水印,而较快流的记录或中间结果通常处于缓冲状态,直到事件时间时钟允许处理它们。

时间戳分配和水印生成

到目前为止,我们已经解释了什么是时间戳和水印,以及它们是如何由Flink内部处理的。然而,我们还没有讨论它们的起源。时间戳和水印通常是在流应用程序接收流时分配和生成的。因为时间戳的选择是特定于应用程序的,而水印依赖于时间戳和流的特征,所以应用程序必须显式地分配时间戳并生成水印。Flink DataStream应用程序可以通过三种方式分配时间戳和生成流水印:

  1. 在源(source)端:时间戳和水印可以由SourceFunction在将流注入应用程序时分配和生成。源函数发出记录流。记录可以与相关的时间戳一起发出,而水印可以作为特殊记录在任何时间点发出。如果源函数(暂时)不再发出任何水印,它可以声明自己为空闲。Flink将从后续操作符的水印计算中排除空闲源函数产生的流分区。如前所述,源的空闲机制可用于解决不推进水印的问题。在“实现自定义源函数”一节中更详细地讨论了源函数。

  2. 周期性assigner: DataStream API提供了一个名为AssignerWithPeriodicWatermarks的用户定义函数,该函数从每个记录中提取时间戳,并定期查询当前水印。提取的时间戳被分配给相应的记录,被查询的水印被放入流中。这个函数将在“分配时间戳和生成水印”中讨论。

  3. Punctuated assigner:AssignerWithPunctuatedWatermarks是另一个用户定义的函数,它从每个记录中提取时间戳。它可用于生成特殊输入记录中编码的水印。与AssignerWithPeriodicWatermarks函数不同,这个函数可以(但不需要)从每个记录中提取水印。我们在“分配时间戳和生成水印”中也详细讨论了这个函数。

用户定义的时间戳分配函数通常应用于尽可能靠近源操作符的地方,因为在操作符处理记录及其时间戳之后,很难推断它们的顺序。这也是为什么在流应用程序中间重写现有的时间戳和水印不是一个好主意,尽管这可以通过用户定义的函数实现。

状态管理

在第2章中,我们指出大多数流应用程序是有状态的。许多操作符不断地读取和更新某种状态,如窗口中收集的记录、输入源的读取位置,或自定义的、特定于应用程序的操作符状态(如机器学习模型)。Flink处理所有状态—不管内置的还是用户定义的操作符—都是一样的。在本节中,我们将讨论Flink支持的不同类型的状态。我们将解释状态如何由状态后端存储和维护,以及如何通过重新分布状态来扩展有状态的应用程序。

通常,由任务维护并用于计算函数结果的所有数据都属于任务的状态。您可以将状态视为任务的业务逻辑访问的本地变量或实例变量。图3-10显示了任务及其状态之间的典型交互。

图片

任务接收一些输入数据。在处理数据时,任务可以读取和更新其状态,并根据其输入数据和状态计算其结果。一个简单的例子是一个任务,它连续地计算它接收了多少条记录。当任务接收到一条新记录时,它将访问该状态以获取当前计数、增加计数、更新状态并输出新的计数。

读写状态的应用程序逻辑通常很简单。然而,高效可靠的状态管理更具挑战性。这包括处理非常大的状态(可能超过内存),并确保在发生故障时不会丢失任何状态。与状态一致性、故障处理及有效存储和访问相关的所有问题都由Flink负责,这样开发人员就可以专注于他们的应用程序的逻辑。

在Flink中,状态总是与特定的操作符相关联。为了让Flink的运行时知道操作符的状态,操作符需要注册它的状态。有两种类型的状态,操作符状态和键控状态,它们可以从不同的范围中访问,并将在接下来的部分中进行讨论。

Operator State(操作符状态)

操作符状态的作用域是操作符任务。这意味着由同一并行任务处理的所有记录都可以访问相同的状态。操作符状态不能被相同或不同操作符的另一个任务访问。图3-11显示了任务如何访问操作符状态。

图片

Flink为操作符状态提供了三种基本类型:

List state

将状态表示为条目列表。

Union list state

也将状态表示为条目列表。但它与常规列表状态的不同之处在于,在发生故障或从保存点启动应用程序时,它是如何恢复的。我们将在本章后面讨论这种差异。

Broadcast state

针对操作符的每个任务的状态相同的特殊情况而设计。此属性可在检查点期间和扩展操作符时使用。这两个方面都将在本章后面的章节中讨论。

Keyed State(键控状态)

键控状态是根据操作符输入流记录中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并使用与维护该键状态的操作符任务相同的键对所有记录进行分区。当任务处理一条记录时,它会自动确定对当前记录的键的状态访问范围。因此,所有具有相同key的记录访问相同的状态。图3-12显示了任务如何与键控状态交互。

图片

您可以将键控状态看作是一个键值映射,它跨操作符的所有并行任务对键进行分区(或分片)。Flink为键控状态提供了不同的原语,这些原语决定了为这个分布式键值映射中的每个键存储的值的类型。我们将简要讨论最常见的键控状态原语。

Value state

每个键存储任意类型的单个值。复杂的数据结构也可以存储为值状态。

List state

存储每个键的值列表。列表项可以是任意类型的。

Map state

为每个键存储键值映射。映射的键和值可以是任意类型的。

状态原语将状态的结构暴露给Flink,并支持更有效的状态访问。它们将在“在RuntimeContext中声明键控状态”中进一步讨论。

状态后端

有状态操作符的任务通常读取并更新每个传入记录的状态。由于高效的状态访问对于处理低延迟的记录至关重要,因此每个并行任务都在本地维护其状态,以确保快速的状态访问。状态存储、访问和维护的确切方式由称为状态后端(state backend)的可插拔组件决定。状态后端负责两件事:本地状态管理和远程位置的检查点状态。

对于本地状态管理,状态后端存储所有键控状态,并确保所有访问都正确地限定到当前键。Flink提供状态后端,将键控状态作为对象存储在JVM堆的内存数据结构中。此外,状态后端序列化状态对象并将它们放入RocksDB,后者将它们写入本地硬盘。虽然第一个选项提供非常快速的状态访问,但它受到内存大小的限制。访问RocksDB状态后端存储的状态比较慢,但是它的状态可能会变得非常大。

状态检查点很重要,因为Flink是一个分布式系统,状态只在本地维护。TaskManager进程(以及在其上运行的所有任务)可能在任何时候失败。因此,必须将其存储视为易失性的。状态后端负责将任务的状态检查点指向远程和持久存储。检查点的远程存储可以是分布式文件系统或数据库系统。状态后端在如何生成状态检查点上是不同的。例如,RocksDB状态后端支持增量检查点,这可以显著减少非常大的状态检查点的开销。

我们将在“选择状态后端”中更详细地讨论不同的状态后端及其优缺点。

扩展状态操作符

流处理应用程序的一个常见需求是,由于输入速率的增加或减少而调整操作符的并行度。虽然扩展无状态操作符很简单,但是更改有状态操作符的并行度却很有挑战性,因为它们的状态需要重新分区并分配给更多或更少的并行任务。Flink支持四种模式来扩展不同类型的状态。

具有键控状态的操作符通过将键重新划分为更少或更多任务的方式来扩展。但是,为了提高任务之间必要的状态传输的效率,Flink不会重新分配单个键。相反,Flink在所谓的键组中组织键。key组是key的分区,是Flink将key分配给任务的方式。图3-13显示了如何在key组中重新划分键控状态。

图片

具有操作符列表状态的操作符通过重新分发列表项来缩放。从概念上讲,所有并行操作符任务的列表条目都被收集起来,并均匀地重新分配给更小或更大数量的任务。如果列表项比运算符的新并行度少,一些任务将以空状态开始。图3-14显示了操作符列表状态的重新分配。

图片

具有操作符联合列表状态(union list state)的操作符通过向每个任务广播状态项的完整列表来伸缩。然后,任务可以选择使用哪些条目和丢弃哪些条目。图3-15显示了如何重新分配操作符联合列表状态。

图片

具有操作符广播状态(broadcast state)的操作符通过将状态复制到新任务来扩展。这是因为广播状态确保所有任务具有相同的状态。在缩小规模的情况下,多余的任务被简单地取消,因为状态已经被复制,不会丢失。图3-16显示了操作符广播状态的重新分配。

图片

检查点、保存点和状态恢复

Flink是一个分布式数据处理系统,因此,它必须处理诸如进程终止、机器故障和网络连接中断等故障。因为任务在本地维护它们的状态,所以Flink必须确保这个状态不会丢失,并且在失败的情况下保持一致。

在本节中,我们将介绍Flink的检查点和恢复机制,以确保精确一次的状态一致性。我们还讨论了Flink独特的savepoint特性,这是一种类似“瑞士军刀”的工具,它解决了操作流处理应用程序的许多挑战。

一致检查点

Flink的恢复机制基于应用程序状态的一致检查点。有状态流应用程序的一致检查点是在所有任务都处理了完全相同的输入时,其每个任务的状态的副本。这可以通过查看采用应用程序一致检查点的简单算法的步骤来解释。这个朴素算法的步骤是:

  1. 暂停读取数据的所有输入流。

  2. 等待所有处理中的数据被完全处理,这意味着所有的任务已经处理了所有的输入数据。

  3. 通过将每个任务的状态复制到远程存储来生成检查点。当所有任务都完成其状态副本时,检查点就完成了。

  4. 恢复读取数据的所有输入流。

请注意,Flink并不实现这种简单的机制。我们将在本节的后面介绍Flink更复杂的检查点算法。

图3-17显示了一个简单应用程序的一致检查点。

图片

应用程序有一个单一的源任务,它使用一个递增的数字流——1、2、3,等等。数字流被分成偶数和奇数流。sum运算符的两个任务是计算所有偶数和奇数的和。源任务将其输入流的当前偏移量存储为状态。sum任务将当前sum值保存为状态。在图3-17中,当输入偏移量为5,和为6和9时,Flink采用检查点。

从一致检查点恢复

在流应用程序执行期间,Flink会周期性地接受应用程序状态的一致检查点。如果出现故障,Flink使用最新的检查点一致地恢复应用程序的状态并重新启动数据处理。图3-18显示了恢复过程。

图片

应用程序可分三步恢复:

  1. 重新启动整个应用程序。

  2. 将所有有状态任务的状态重置为最新的检查点。

  3. 恢复所有任务的数据处理。

这种检查点和恢复机制可以提供应用程序状态的精确一次一致性,前提是所有操作符都通过检查点并恢复它们的所有状态,并且所有输入流都被重置到检查点所在的位置。数据源是否可以重置其输入流取决于其实现和使用该流的外部系统或接口。例如,像Apache Kafka这样的事件日志可以提供来自流以前偏移量的记录。相反,从Socket读取数据的流不能被重置,因为Socket一旦使用了数据,就会丢弃它。因此,如果所有的输入流消费的数据都来自重置消费位置的数据源,那么应用程序就可以在精确一次一致性的情况下运行。

从检查点重新启动应用程序后,其内部状态与采取检查点时完全相同。然后它开始消费和处理检查点和故障之间处理的所有数据。尽管这意味着Flink会对某些消息进行两次处理(在失败之前和失败之后),但是该机制仍然能够实现精确一次状态一致性,因为所有操作符的状态都被重置为一个尚未看到此数据的点。

我们必须指出,Flink的检查点和恢复机制只重置流应用程序的内部状态。根据应用程序的sink操作符,某些结果记录可能会在恢复期间多次发送到下游系统,如事件日志、文件系统或数据库。对于某些存储系统,Flink提供了接收函数,这些函数只具有一次输出,例如,通过在检查点完成时提交发出的记录。另一种适用于许多存储系统的方法是幂等更新。在“应用程序一致性保证”中详细讨论了端到端的一次性应用程序和解决方法的挑战。

Flink的检查点算法

Flink的恢复机制基于一致的应用程序检查点。从流应用程序获取检查点的原始方法(停止, 获取检查点, 恢复应用程序数据处理),对于那些由于其“停止一切”的行为,而具有低延迟需求的应用程序来说是不实际的。

相反,Flink为分布式快照实现了基于Chandy-Lamport算法的检查点。该算法不会暂停整个应用程序,而是将检查点从处理中分离出来,这样一些任务可以继续处理,而另一些任务则持久化它们的状态。接下来,我们将解释这个算法是如何工作的。

Flink的检查点算法使用一种特殊类型的记录,称为检查点barrier(checkpoint barrier)。与水印类似,检查点barrier由源操作符注入到常规记录流中,不能超越或被其他记录超越。检查点barrier携带一个检查点ID来标识它所属的检查点,并在逻辑上将流分成两部分。由于barrier之前的记录所有状态修改都包含在barrier的检查点中,而由于barrier之后的记录所有状态修改都包含在后面的检查点中。

我们使用一个简单的流处理应用程序的例子来逐步解释算法。应用程序由两个源任务组成,每个源任务消费一个不断增长的数字流。源任务的输出被划分为偶数和奇数流。每个分区由一个任务处理,该任务计算所有接收到的数字的和,并将更新后的和转发给接收器(sink)。应用程序如图3-19所示。

图片

JobManager通过向每个数据源任务发送带有新检查点ID的消息来启动检查点,如图3-20所示。

图片

当数据源任务接收到消息时,它暂停发出记录,在状态后端触发其本地状态的检查点,并使用检查点ID通过所有输出流分区广播检查点barriers。状态后端在其状态检查点完成后通知任务,任务在JobManager上确认检查点。在发出所有barriers之后,源将继续其常规操作。通过将barrier注入它的输出流,源函数定义了生成检查点所在的流位置。图3-21显示了两个源任务生成它们的本地状态检查点并发出检查点barriers之后的流应用程序。

图片

源任务(task)发出的检查点barriers被发送到连接的任务(task)。与水印类似,将检查点barriers广播到所有连接的并行任务,以确保每个任务从其每个输入流接收一个barriers。当一个任务接收到一个新检查点的barriers时,它将等待来自该检查点的所有输入分区的barriers的到来。在等待期间,它将继续处理来自流分区的记录,即使这些分区还没有提供barriers。在到达的分区记录中,已经转发了某个barriers的记录不再处理,需要进行缓冲。等待所有barriers到达的过程称为barriers对齐,如图3-22所示。

图片

图3-22 任务等待在每个输入分区上接收一个barrier;已经到达barrier的输入流记录被缓冲;所有其他记录都定期处理

一旦一个任务从它的所有输入分区接收到barriers,它就在状态后端启动一个检查点barriers,并将这个检查点barriers广播给它的所有下游连接的任务,如图3-23所示。

图片

一旦发出了所有检查点barriers,任务就开始处理缓冲的记录。发出所有缓冲记录之后,任务将继续处理其输入流。图3-24显示了此时的应用程序。

图片

最终,检查点barriers到达一个sink任务。当sink任务接收到一个barriers时,它执行一个barriers对齐,生成自己的检查点状态,并向JobManager确认接收到的barriers。JobManager从应用程序的所有任务接收到检查点确认后,将应用程序的检查点记录为完成。图3-25显示了检查点算法的最后一步。如前所述,完成的检查点可用于从故障中恢复应用程序。

图片

检查点对性能的影响

Flink的检查点算法在不停止整个应用程序的情况下,从流应用程序生成一致的分布式检查点。但是,它会增加应用程序的处理延迟。Flink实现了一些调整,可以在某些条件下减轻性能影响。

当任务生成状态检查点时,它被阻塞,其输入被缓冲。由于状态可能变得非常大,而且检查点需要通过网络将数据写入远程存储系统,所以采用检查点很容易花费几秒钟到几分钟的时间—对于延迟敏感的应用程序来说太长了。在Flink的设计中,状态后端负责执行检查点。如何准确地复制任务的状态取决于状态后端的实现。例如,文件系统状态后端和RocksDB状态后端支持异步检查点。当触发检查点时,状态后端创建状态的本地副本。当本地副本完成后,任务将继续其常规处理。后台线程异步地将本地快照复制到远程存储,并在完成检查点后通知任务。异步检查点大大减少了任务继续处理数据之前的时间。此外,RocksDB状态后端还具有增量检查点功能,这减少了要传输的数据量。

另一种减少检查点算法对处理延迟影响的技术是调整barrier对齐步骤。对于那些需要非常低的延迟并且能够容忍至少一次状态保证的应用程序,可以将Flink配置为在缓冲区对齐期间处理所有到达的记录,而不是缓冲那些已经到达的记录。一旦一个检查点的所有barrier都到达了,操作符就会检查状态,现在可能还包括由通常属于下一个检查点的记录引起的修改。在出现故障时,这些记录将被再次处理,这意味着检查点提供至少一次一致性保证,而不是精确一次一致性保证。

Savepoints(保存点)

Flink的恢复算法基于状态检查点。检查点根据一个可配置的策略,定期生成和自动舍弃。由于检查点的目的是确保在出现故障时可以重新启动应用程序,所以当应用程序被显式取消时,检查点将被删除。然而,除了故障恢复之外,应用程序状态的一致快照还可以用于更多的事情。

savepoints(保存点)是Flink最有价值和最独特的特性之一。原则上,保存点是使用与检查点相同的算法创建的,因此基本上是带有一些附加元数据的检查点。Flink不会自动生成保存点,因此用户(或外部调度器)必须显式地触发它的创建。Flink也不会自动清除保存点。第10章描述了如何触发和释放保存点。

使用保存点

给定一个应用程序和一个兼容的保存点,您可以从该保存点启动应用程序。这将把应用程序的状态初始化为保存点的状态,并从保存点处运行应用程序。虽然这种行为看起来与使用检查点从故障中恢复应用程序完全相同,但故障恢复实际上只是一种特殊情况。它在相同的集群上以相同的配置启动相同的应用程序。从保存点启动应用程序允许您做更多的事情。

  • 可以从保存点启动一个不同但兼容的应用程序。因此,您可以修复应用程序逻辑中的bug,并重新处理尽可能多的事件,以便修复结果。修改后的应用程序还可以用于运行具有不同业务逻辑的A/B测试或假设场景。注意,应用程序和保存点必须是兼容的——应用程序必须能够加载保存点的状态。

  • 可以使用不同的并行度启动相同的应用程序,并将应用程序向外扩展或向内扩展。

  • 可以在不同的集群上启动相同的应用程序。这允许您将应用程序迁移到更新的Flink版本或不同的集群或数据中心。

  • 可以使用保存点暂停应用程序并稍后继续。这使得为高优先级应用程序或输入数据没有连续输出时释放集群资源成为可能。

  • 还可以使用版本保存点来存档应用程序的状态。

由于保存点是如此强大的功能,许多用户会定期创建保存点,以便能够及时返回。我们看到的最有趣的保存点应用程序之一是不断地将流应用程序迁移到提供最低实例价格的数据中心。

从保存点启动应用程序

前面提到的所有保存点用例都遵循相同的模式。首先,获取正在运行的应用程序的保存点,然后将其用于恢复启动应用程序中的状态。在本节中,我们将描述Flink如何初始化从保存点开始的应用程序的状态。

一个应用程序由多个操作符组成。每个操作符可以定义一个或多个键控状态和运算符状态。操作符由一个或多个操作符任务并行执行。因此,典型的应用程序由多个状态组成,这些状态分布可以分布在不同TaskManager进程上运行的多个操作符任务上。

图3-26显示了一个具有三个操作符的应用程序,每个操作符运行两个任务。一个操作符(OP-1)有一个操作符状态(OS-1),另一个操作符(OP-2)有两个键控状态(KS-1和KS-2)。在生成保存点时,将所有任务的状态复制到持久存储位置。

图片

保存点中的状态副本由操作符标识符和状态名组织。操作符标识符和状态名需要能够将保存点的状态数据映射到启动应用程序的操作符的状态。当从保存点启动应用程序时,Flink将保存点数据重新分发给相应操作符的任务。

                                         请注意
注意,保存点不包含有关操作符任务的信息。这是因为当应用程序以不同的并行度启动时,任务的数量可能会发生变化。在本节的前面,我们讨论了Flink扩展有状态操作符的策略。

如果修改后的应用程序是从保存点启动的,则保存点中的状态只能在包含具有相应标识符和状态名的操作符的情况下映射到应用程序。默认情况下,Flink分配唯一的操作符标识符。但是,操作符的标识符是根据前面操作符的标识符确定生成的。因此,当一个操作符的前一个版本操作符改变时,例如,当一个操作符被添加或删除时,操作符的标识符也会改变。因此,具有缺省操作符标识符的应用程序,在不丢失状态的情况下进行演化方面的功能非常有限。因此,我们强烈建议手动为操作符分配唯一标识符,不要依赖于Flink的默认分配。我们在“指定唯一操作符标识符”中详细描述了如何分配操作符标识符。

结束语

在本章中,我们讨论了Flink的高级体系结构及其网络堆栈的内部机制、事件处理模式、状态管理和故障恢复机制。这些信息在设计高级流应用程序、设置和配置集群、操作流应用程序以及对它们的性能进行推断时非常有用。

原文地址:https://www.cnblogs.com/lanblogs/p/15162678.html