flink流处理内容

Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能

Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和PythonDataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala

Table API,结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala

对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理

而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点

这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求

Flink的执行引擎同时支持了这两种数据传输模型,Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机,超时值为0,则是流处理系统的标准模型,此时可以获得最低的处理延迟,缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型。

缓存块的超时阈值越小,则流处理数据的延迟越低,但吞吐量也会变低。根据超时阈值来灵活权衡系统延迟和吞吐量。Flink基于分布式快照与可部分重发的数据源实现了容错。

用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。

按照用户自定义的快照间隔时间,flink会定时在数据源中插入快照标记的消息,快照消息和普通消息都在DAG中流动,但不会被用户定义的逻辑所处理,每一个快照消息都将其所在的数据流分成2部分:本次快照数据和下次快照数据。当操作符处理到快照标记消息,对自己的状态进行快照标记并缓存。操作符对自己的快照和状态可以是异步,增量操作,并不阻塞消息处理。当所有的终点操作符都收到快照标记信息并对自己的状态快照和存储后,整个分布式快照就完成了。同时通知数据源释放该快照标记消息之前的所有消息。若之后的节点崩溃等异常,就可以恢复分布式快照状态。并从数据源重发该快照以后的消息。

flink基于分布式快照实现了一次性。

目前大部分流处理系统来说,时间窗口一般是根据Task所在节点的本地时钟进行切分,

是可能无法满足某些应用需求,比如:

消息本身带有时间戳,用户希望按照消息本身的时间特性进行分段处理。

由于不同节点的时钟可能不同,以及消息在流经各个节点延迟不同,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不同的时间窗口中,从而产生不符合预期的结果

Flink支持3种类型的时间窗口:

1.Operator Time。根据Task所在节点的本地时钟来切分的时间窗口

2.Event Time。消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的所有消息一定会被正确处理。由于消息可能乱序流入Task,所以Task需要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的所有消息都被处理,才可以释放,如果乱序的消息延迟很高会影响分布式系统的吞吐量和延迟

3.ingress Time。有时消息本身并不带有时间戳信息,但用户依然希望按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时可以在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,之后处理的流程与Event Time相同。Ingress Time可以看成是Event Time的一个特例,由于其在消息源处时间戳一定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,因此对Flink分布式系统的吞吐量和延迟的影响也会更小。

操作符通过基于Event Time的时间窗口来处理数据时,它必须在确定所有属于该时间窗口的消息全部流入此操作符后才能开始数据处理。但是由于消息可能是乱序的,所以操作符无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入

一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果即可,无需缓存该时间窗口的所有消息

flink基于watermark实现了基于时间戳的全局排序:

排序操作:排序操作符缓存所有流入的消息,当接收到watermark时,对时间戳小于该watermark的消息进行排序,并发送到下一个节点。在此排序操作符中释放所有时间戳小于该watermark的消息,继续缓存流入的消息。等待下一次watermark触发下一次排序。

watermark保证了其之后不会出现时间戳比它小的消息,因此可以保证排序的正确性。请注意:排序操作符有多个节点,只能保证每个节点流出的消息有序,节点之间的消息不能有序,要实现全局有序,则只能有一个排序操作符节点。

Java对象的存储密度相对偏低,例如[1],“abcd”这样简单的字符串在UTF-8编码中需要4个字节存储

采用了UTF-16编码存储字符串的Java则需要8个字节,同时Java对象还有header等其他额外信息,一个4字节字符串对象在Java中需要48字节的空间来存储。对于大部分的大数据应用,内存都是稀缺资源,更有效率地内存存储,意味着CPU数据访问吞吐量更高,以及更少磁盘落地的存在。

垃圾回收也是Java应用的不定时炸弹,有时秒级甚至是分钟级的垃圾回收极大影响了Java应用的性能和可用性。

通过JVM参数调优提高垃圾回收效率需要用户对应用和分布式计算框架以及JVM的各参数有深入了解,而且有时候这也远远不够:

 

为了解决以上提到的问题,高性能分布式计算框架通常需要以下技术:

Flink的处理策略:

定制的序列化工具,显式内存管理的前提步骤就是序列化,用的序列化框架,如Java默认使用java.io.Serializable

制的序列化框架,如Hadoop的org.apache.hadoop.io.Writable需要用户实现该接口并自定义类的序列化和反序列化方法。这种方式效率最高。

对于计算密集的数据结构和算法,直接操作序列化后的二进制数据,而不是将对象反序列化后再进行操作。

缓存友好的数据结构和算法对于计算密集的数据结构和算法,直接操作序列化后的二进制数据,而不是将对象反序列化后再进行操作。同时,只将操作相关的数据连续存储,可以最大化的利用L1/L2/L3缓存,减少Cache miss的概率,提升CPU计算的吞吐量。以排序为例,由于排序的主要操作是对Key进行对比,如果将所有排序数据的Key与Value分开并对Key连续存储,那么访问Key时的Cache命中率会大大提高

分布式计算框架可以使用定制序列化工具的前提是要待处理数据流通常是同一类型,由于数据集对象的类型固定,从而可以只保存一份对象Schema信息,节省大量的存储空间

对于固定大小的类型,也可通过固定的偏移位置存取。在需要访问某个对象成员变量时,通过定制的序列化工具,并不需要反序列化整个Java对象,而是直接通过偏移量,从而只需要反序列化特定的对象成员变量。如果对象的成员变量较多时,能够大大减少Java对象的创建开销,以及内存数据的拷贝大小Flink数据集支持任意Java或是Scala类型,通过自动生成定制序列化工具,既保证了API接口对用户友好(不用像Hadoop那样数据类型需要继承实现org.apache.hadoop.io.Writable接口),也达到了和Hadoop类似的序列化效率

Flink对数据集的类型信息进行分析,然后自动生成定制的序列化工具类。Flink支持任意的Java或是Scala类型,通过Java Reflection框架分析基于Java的Flink程序UDF(User Define Function)的返回类型的类型信息,通过Scala Compiler分析基于Scala的Flink程序UDF的返回类型的类型信息。类型信息由TypeInformation类表示,这个类有诸多具体实现类

例如

1.BasicTypeInfo任意Java基本类型(装包或未装包)和String类型

 2.BasicArrayTypeInfo任意Java基本类型数组(装包或未装包)和String数组

3.WritableTypeInfo任意Hadoop的Writable接口的实现类

4.TupleTypeInfo任意的Flink tuple类型(支持Tuple1 to Tuple25)Flink tuples是固定长度固定类型的Java Tuple实

 5.CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)

 6.PojoTypeInfo任意的POJO (Java or Scala),Java对象的所有成员变量,要么是public修饰符定义,要么有getter/setter方法

7.GenericTypeInfo任意无法匹配之前几种类型的类。

前6种类型数据集几乎覆盖了绝大部分的Flink程序,针对前6种类型数据集,Flink皆可以自动生成对应的TypeSerializer定制序列化工具,非常有效率地对数据集进行序列化和反序列化

对于第7种类型,Flink使用Kryo进行序列化和反序列化

对于可被用作Key的类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据直接进行compare、hash等操作

对于Tuple、CaseClass、Pojo组合类型,Flink自动生成的TypeSerializerTypeComparator同样是组合的,并把其成员的序列化/反序列化代理给其成员对应的TypeSerializer、TypeComparator,如图6所示:

此外如有需要,用户可通过集成TypeInformation接口定制实现自己的序列化工具

JDK8的G1算法改善了JVM垃圾回收的效率和可用范围

通过JVM进行内存管理的话,OutOfMemoryError也是一个很难解决的问题

在JVM内存管理中,Java对象有潜在的碎片化存储问题

Flink将内存分为3个部分,每个部分都有不同用途:

1.Network buffers: 一些以32KB Byte数组为单位buffer,主要被网络模块用于数据的网络传输,基于Netty的网络传输

2.Memory Manager pool大量以32KB Byte数组为单位的内存池,所有的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存并将序列化后的数据存储其中,结束后释放回内存池。通常会配置为最大的一块内存,

3. Remaining (Free) Heap主要留给UDF中用户自己创建的Java对象,由JVM管理。Flink也不鼓励用户在UDF中缓存很多数据。。 Remaining Heap的内存虽然由JVM管理,但是由于其主要用来存储用户处理的流式数据,生命周期非常短,速度很快的Minor GC就会全部回收掉,一般不会触发Full GC

在Flink中,内存池由多个MemorySegment组成,每个MemorySegment代表一块连续的内存,底层存储是byte[],默认32KB大小。

MemorySegment提供了根据偏移量访问数据的各种方法,如get/put int、long、float、double等,MemorySegment之间数据拷贝等方法和java.nio.ByteBuffer类似。

于Flink的数据结构,通常包括多个向内存池申请的MemeorySegment,所有要存入的对象通过TypeSerializer序列化之后,将二进制数据存储在MemorySegment中,在取出时通过TypeSerializer反序列化

数据结构通过MemorySegment提供的set/get方法访问具体的二进制数据

Flink这种看起来比较复杂的内存管理方式带来的好处主要有:

1.二进制的数据存储大大提高了数据存储密度,节省了存储空间。所有的运行时数据结构和算法只能通过内存池申请内存,保证了其使用的内存大小是固定的,不会因为运行时数据结构和算法而发生OOM

Flink当前的内存管理在最底层是基于byte[],

flink排序算法的实现:

1.将待排序的数据经过序列化存储在两个不同的MemorySegment集中,数据全部的序列化值存放于其中一个MemorySegment集中。数据序列化后的Key和指向第一个MemorySegment集中值的指针存放于第二个MemorySegment集中。对第二个MemorySegment集中的Key进行排序,如需交换Key位置,只需交换对应的Key+Pointer的位置,第一个MemorySegment集中的数据无需改变。 当比较两个Key大小时,TypeComparator提供了直接基于二进制数据的对比方法,无需反序列化任何数据。排序完成后,访问数据时,按照第二个MemorySegment集中Key的顺序访问,并通过Pointer值找到数据在第一个MemorySegment集中的位置,通过TypeSerializer反序列化成Java对象返回。

通过Key和Full data分离存储的方式尽量将被操作的数据最小化,提高Cache命中的概率,从而提高CPU的吞吐量。 移动数据时,只需移动Key+Pointer,而无须移动数据本身,大大减少了内存拷贝的数据量。 TypeComparator直接基于二进制数据进行操作,节省了反序列化的时间。

DataSet API级别的执行计划优化器,原生的迭代操作符等,

原文地址:https://www.cnblogs.com/huiandong/p/10090912.html