Flink:简单上手

简介

官网:https://flink.apache.org/

image-20210831145112869

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Apache Flink 是一个框架分布式处理引擎,用于对无界和有界数据流进行状态计算。

流数据更真实地反映了我们的生活方式

传统的数据架构是基于有限数据集的

我们的目标:低延迟、高吞吐、结果的准确性和良好的容错性

传统数据处理架构:事务处理

image-20210831151845002

分析处理架构:将数据从业务数据库复制到数仓,再进行分析和查询

image-20210831152248048

有状态的流式处理

image-20210831152558366

流处理的演变:

lambda架构:用两套系统,同时保证低延迟和结果准确

image-20210831152942613

fink

image-20210831153235369

Flink的主要特点

  • 事件驱动(Event-driven)

image-20210831153510640

  • 基于流的世界观

    在flink的世界观中,一切都是由流组成的,离线数据是有界的流;

    实时数据是一个没有界限的流:这就是所谓的有界流和无界流

image-20210831153955065

  • 分层API

    越顶层越抽象,表达含义越简明,使用越方便

    越底层越具体,表达能力越丰富,使用越灵活

image-20210831155538615

Flink的其他特点

• 支持事件时间(event-time)和处理时间(processing-time) 语义

• 精确一次(exactly-once)的状态一致性保证

• 低延迟,每秒处理数百万个事件,毫秒级延迟

• 与众多常用存储系统的连接

• 高可用,动态扩展,实现7*24小时全天候运行

  • 流(stream)和微批(micro-batching)

image-20210831160139833

  • 数据模型

spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组组小批 数据 RDD 的集合

flink 基本数据模型是数据流,以及事件(Event)序列

  • 运行时架构

spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个

flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

简单上手

批处理

下面提供一个例子:使用flink读取文件并计算每一个单词出现次数

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <!--2.12指的是scala版本-->
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

测试:

//批处理word count
public class WordCount {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String inputPath = "D:\project\flink-demo\src\main\resources\hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        //对数据集进行处理,按空格分词展开,转换成(word,1)二元组进行统计
        DataSet<Tuple2<String, Integer>> dataSet = inputDataSet.flatMap(new MyFlatMap())
                .groupBy(0)//按照第一个位置的word分组
                .sum(1);//将第二个位置上的数据求和

        dataSet.print();
    }

}

public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = value.split(" ");
        //遍历所有word,包成二元组
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

hello.txt:

hello word
hello flink
hello java
hello scala
hello spring
how are you
are you ok

输出结果:

image-20210831191626564

流处理

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        //创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String inputPath = "D:\project\flink-demo\src\main\resources\hello.txt";
        DataStream<String> dataStream = env.readTextFile(inputPath);
        //基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new MyFlatMap())
                .keyBy(0).sum(1);

        resultStream.print();
        //执行任务
        env.execute();
    }
}

输出结果:

image-20210831192949254

输出结果前面的数字:表示当前并行执行的线程编号,默认并行度跟自己电脑有有关,当然也可以自己设置并行度。

env.setParallelism(4);

image-20210831193311253

流式数据源测试

linux上安装nc

yum install -y nc
nc -lk 7777

代码:

    public static void main(String[] args) throws Exception {
        //创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
//        String inputPath = "D:\project\flink-demo\src\main\resources\hello.txt";
//        DataStream<String> dataStream = env.readTextFile(inputPath);

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        DataStream<String> dataStream = env.socketTextStream(host, port);
        //基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new MyFlatMap())
                .keyBy(0).sum(1);

        resultStream.print();
        //执行任务
        env.execute();
    }

设置命令行参数:

image-20210831194805584

启动方法后,我们在nc控制台输入的数据,flink都能进行实时的流式计算

image-20210831195051079

原文地址:https://www.cnblogs.com/wwjj4811/p/15211936.html