Flink Application Development DataStream API Overview

【翻译来源-Application Development DataStream API】

Flink中的DataStream程序是常规程序,可对数据流实现transformations(例如,过滤,更新状态,定义窗口,聚合)。最初从各种sources(例如,消息队列,套接字流,文件)创建数据流。结果通过sinks返回,sinks可能做比如将数据写入文件或标准输出(例如命令行终端)。Flink程序可在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地JVM或许多计算机的群集中进行。

为了创建自己的Flink DataStream程序,我们建议您从Flink程序的剖析开始,并逐步添加自己的流转换。其余部分用作其他操作和高级功能的参考。

什么是数据流?

DataStream API的名称来自特殊DataStream类,该特殊类用于表示Flink程序中的数据集合。您可以将它们视为包含重复项的不可变数据集合。此数据可以是有限的,也可以是无界的,用于处理它们的API是相同的。

就用法而言,一个DataStream与常规Java Collection类似,但在某些关键方面有很大不同。它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。您还不能仅检查内部元素,而只能使用DataStream API操作(也称为transformations)对其进行处理。

您可以通过在Flink程序中添加源来创建最初的DataStream。然后,您可以从中派生新的流,并通过使用API​​方法(例如map,filter等)将它们组合在一起。

Flink程序剖析

Flink程序看起来像可以转换DataStreams的常规程序。每个程序都包含相同的基本部分:

  1. 获得execution environment,
  2. 加载/创建初始数据,
  3. 指定对此数据的转换,
  4. 指定将计算结果放在何处,
  5. 触发程序执行

译者注:其实包含了定义1-4和提交执行5两部分,理解这点非常重要,代码并不是按照定义简单的顺序执行,而是由Flink解释成图之后通过调度执行。

Java

现在,我们将对每个步骤给出概述,有关更多详细信息,请参阅相应的章节。请注意,可以在org.apache.flink.streaming.api中找到Java DataStream API的所有核心类。

StreamExecutionEnvironment是所有Flink程序的基础。您可以使用以下静态方法获得一个StreamExecutionEnvironment:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常,您只需要使用getExecutionEnvironment(),因为他将根据上下文执行正确的操作:如果您是在IDE中执行程序或作为常规Java程序执行,它将创建一个本地环境,该环境将在本地计算机上执行您的程序。如果您是通过程序创建的JAR文件,并通过命令行调用它,则Flink集群管理器将执行您的main方法, 并且getExecutionEnvironment()返回一个用于在集群上执行程序的执行环境。

对于指定数据源的情况,执行环境有几种类方法可以使用各种方式从文件读取:您可以逐行,以CSV文件的形式读取,或者使用任何其他提供的源。要将文本文件读取为一系列行,可以使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

这将为您提供一个DataStream,然后您可以在其上应用transformations以创建新派生的DataStream。

您可以通过使用transformation函数在DataStream上调用方法来应用transformations。例如,map转换如下所示:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

通过将原始集合中的每个String转换为Integer,将创建一个新的DataStream。

一旦有了包含最终结果的DataStream,就可以通过创建一个sink将其写入外部系统。这些只是创建接收器的一些示例方法:

writeAsText(String path)

print()

一旦您指定的完整程序,你需要调用StreamExecutionEnvironment上的execute()触发执行程序。根据ExecutionEnvironment类型的不同,执行过程将在本地计算机上触发或将您的程序提交到群集上执行。

该execute()方法将等待job完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。

如果你不想等待作业完成,您可以通过调用StreamExecutionEnvironment上的executeAysnc()触发异步作业执行。它将返回一个JobClient,该JobClient可以与您刚提交的作业进行通信。例如,这是如何通过使用executeAsync()实现execute()的语义 。

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

关于程序执行的最后一部分对于理解何时以及如何执行Flink operations 至关重要。所有Flink程序都是延迟执行的:执行程序的main方法时,不会直接进行数据加载和转换。而是,将创建每个操作并将其添加到数据流图(dataflow graph)。当在执行环境上通过execute()的调用显式触发执行时,operations会被实际执行。程序是在本地执行还是在群集上执行取决于执行环境的类型。

延迟运行使您可以构建复杂的程序,Flink执行程序时将其作为一个整体计划的单元。

范例程序

以下程序是流式窗口字数统计应用程序的一个完整工作示例,该程序在5秒的窗口中统计来自Web套接字的字数。您可以复制并粘贴代码以在本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

要运行示例程序,请首先从终端使用netcat启动输入流:

nc -lk 9999

在终端中只需输入一些单词,然后按回车键即可输入一个新单词。这些单词将作为字数统计程序的输入。如果你想查看计数大于1,请在5秒钟内一次又一次键入相同的单词(如果无法快速键入,增加窗口大小超过5秒 ☺)。

数据源

Data Sources

Java

源是您的程序从中读取其输入的位置。您可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序。Flink附带了许多预实现的源函数,但是您可以编写自己的自定义源,非并行源实现SourceFunction ,并行源实现ParallelSourceFunction接口或扩展 RichParallelSourceFunctionfor。

有以下几个可从StreamExecutionEnvironment无障碍访问的预定义的流源。

一、基于文件:

  • readTextFile(path)-逐行读取文本文件,即符合TextInputFormat规范的文件,并将其作为Strings返回。
  • readFile(fileInputFormat, path) -根据指定的文件输入格式读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)-这是前两个方法内部调用的方法。它在path中根据给定的fileInputFormat格式读取文件。根据提供的watchType监视类型,此源可以定期(每个周期 ms)监视路径中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理一次当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步从路径中排除文件。

实现 IMPLEMENTATION:
在后台,Flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个都是由单独的实体实现的。监视由单个非并行(并行度= 1)任务实现,而读取由多个并行运行的任务执行。后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(根据watchType参数定期扫描或仅扫描一次),查找要处理的文件,将其拆分,并将这些拆分分配给下游读取器。读取器将是读取实际数据的组件。每个拆分只能由一个读取器读取,而一个读取器可以一个接一个阅读多个拆分。

重要笔记:

  1. 如果将watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。这可能会破坏“完全一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
  2. 如果将watchType设置为FileProcessingMode.PROCESS_ONCE,则源source将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。当然,阅读器将继续读取,直到读取了所有文件内容。关闭源source将导致在该点之后没有更多检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

二、基于套接字:

  • socketTextStream-从套接字读取。元素可以由定界符delimiter分隔。

三、基于集合:

  • fromCollection(Collection)-从Java Java.util.Collection创建数据流。集合中的所有元素必须具有相同的类型。
  • fromCollection(Iterator, Class)-从迭代器创建数据流。Class指定迭代器返回的元素的数据类型。
  • fromElements(T ...)-从给定的对象序列创建数据流。所有对象必须具有相同的类型。
  • fromParallelCollection(SplittableIterator, Class)-并行的方式从迭代器创建数据流。Class指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) -并行的方式在给定间隔内生成数字序列。

四、自定义

  • addSource-附加新的source funciton。例如,要读取Apache Kafka,可以使用 addSource(new FlinkKafkaConsumer<>(...))。有关更多详细信息,请参见连接器

DataStream Transformations

转换。
请参考operators以获取可用的流转换概述。

Data Sinks

数据接收器。

Java

Data Sinks消费DataStreams并将其转发到文件,套接字,外部系统或打印它们。Flink附带了多种内置输出格式,这些格式封装在DataStreams上的operations后面:

  • writeAsText()/ TextOutputFormat-将元素逐行写为字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsCsv(...)/ CsvOutputFormat-将元组写为逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr() -在标准输出/标准错误流上打印每个元素的toString()值。可选地,可以提供前置输出的前缀(msg)。这可以帮助区分不同的打印调用。如果并行度大于1,输出任务还会产生前置输出标识符。
  • writeUsingOutputFormat()/ FileOutputFormat- 方法和基类用于自定义文件的输出。支持自定义对象到字节的转换。
  • writeToSocket -根据一个SerializationSchema将元素写入套接字
  • addSink-调用自定义sink function。Flink与其他系统(例如Apache Kafka)的连接器捆绑在一起,这些系统已实现为sink functions。

请注意,DataStream上的write*()方法主要用于调试目的。它们不参与Flink的检查点,这意味着这些功能通常具有至少一次语义。数据刷新到目标系统取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。同样,在失败的情况下,这些记录可能会丢失。

为了可靠的确保一次的将流传输到文件系统中,请使用StreamingFileSink。同样,通过该.addSink(...)方法的自定义实现可以具有Flink确保一次语义的检查点。

迭代器

[feedback解释](https://www.cnblogs.com/Springmoon-venn/p/13857002.html)

Java

反复的流程序实现了一个step函数,并将它嵌入到一个IterativeStream。由于DataStream程序可能永远不会结束,因此没有最大迭代次数。相反,您需要指定流的哪一部分反馈(fed back)给迭代,哪一部分使用侧面输出side output或filter向下游转发。在这里,我们展示了一个使用filters的示例。首先,我们定义一个IterativeStream

IterativeStream<Integer> iteration = input.iterate();

然后,我们指定在循环内执行的使用一系列转换(这里是一个简单的map转换)的逻辑

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

要关闭迭代器并定义迭代尾部,请调用IterativeStream的closeWith(feedbackStream)方法。提供给该closeWith函数的DataStream将被反馈到迭代器头部。一种常见的模式是使用过滤器将流反馈的部分和流向前传播的部分分开。这些过滤器可以做,例如,定义“终止”逻辑,一个元素被允许向下游传播而不是被反馈。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,以下程序从一系列整数中连续减去1,直到它们达到零为止:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

执行参数

StreamExecutionEnvironment包含了用于设置job运行期指定配置值的ExecutionConfig。

请参考执行配置 以获取大多数参数的说明。这些参数专门与DataStream API有关:

  • setAutoWatermarkInterval(long milliseconds):设置水印自动发出的间隔。您可以使用getAutoWatermarkInterval()获取long类型的当前间隔值

容错能力

状态和检查点描述了如何启用和配置Flink的检查点机制。

控制延迟

默认情况下,元素们不会在网络上一对一传输(这会导致不必要的网络通信),但是会进行缓冲。缓冲区(实际上是在计算机之间传输的)的大小可以在Flink配置文件中设置。尽管此方法可以优化吞吐量,但是当传入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,您可以env.setBufferTimeout(timeoutMillis)在执行环境(或在各个操作符上)上使用来设置缓冲区填充的最大等待时间。在此时间之后,即使缓冲区未满,也会自动发送缓冲区。此超时的默认值为100毫秒。

用法:

Java

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大程度地提高吞吐量,请设置setsetBufferTimeout(-1)来消除超时,并且仅在缓冲区已满时才刷新它们。为了使延迟最小化,请将超时设置为接近0的值(例如5或10 ms)。应避免将缓冲区超时设置为0,因为它可能导致严重的性能下降。

调试

在分布式群集中运行流式程序之前,最好确保已实现的算法能按预期工作。因此,实现数据分析程序通常是检查结果,调试和改进的增量过程。

Flink通过支持从IDE内部进行本地调试,注入测试数据和收集结果数据的功能,来大大简化数据分析程序的开发过程。本节提供一些提示,说明如何简化Flink程序的开发。

本地执行环境

一个 LocalStreamEnvironment在它被创建的同一个JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

一个LocalEnvironment被创建并按如下方式使用:

Java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

收集数据源

Flink提供了由Java集合支持的特殊data sources,以简化测试。一旦测试了程序,就可以轻松地将sources和sinks替换为可读取/写入外部系统的sources和sinks。

集合data sources可以按如下方式使用:

Java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意:当前,集合data sources要求数据类型和迭代器实现 Serializable接口。此外,收集data sources不能并行执行(并行度= 1)。

迭代器数据接收器

Flink还提供sink收集DataStream结果来进行测试和调试。它可以按如下方式使用:

Java

import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

注意: flink-streaming-contrib模块已从Flink 1.5.0中删除。它下面的类已移至flink-streaming-java和中flink-streaming-scala。

接下来要去哪里?

操作符:可用的流操作符的规范。
事件时间:Flink时间概念的简介。
状态和容错:有关如何开发有状态应用程序的说明。
连接器:可用的输入和输出连接器的描述。

原文地址:https://www.cnblogs.com/qlxm/p/14445665.html