简介
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
为什么选择Flink
流数据更真实地反映了我们的生活方式
传统的数据架构是基于有限数据集的
我们的目标:低延迟、高吞吐、结果的准确性和良好的容错性
传统数据处理架构:事务处理
分析处理架构:将数据从业务数据库复制到数仓,再进行分析和查询
有状态的流式处理:
流处理的演变:
lambda架构:用两套系统,同时保证低延迟和结果准确
fink:
Flink的主要特点
- 事件驱动(Event-driven)
-
基于流的世界观
在flink的世界观中,一切都是由流组成的,离线数据是有界的流;
实时数据是一个没有界限的流:这就是所谓的有界流和无界流
-
分层API
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活
Flink的其他特点
• 支持事件时间(event-time)和处理时间(processing-time) 语义
• 精确一次(exactly-once)的状态一致性保证
• 低延迟,每秒处理数百万个事件,毫秒级延迟
• 与众多常用存储系统的连接
• 高可用,动态扩展,实现7*24小时全天候运行
Flink vs Spark Streaming
- 流(stream)和微批(micro-batching)
- 数据模型
spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组组小批 数据 RDD 的集合
flink 基本数据模型是数据流,以及事件(Event)序列
- 运行时架构
spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个
flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
简单上手
批处理
下面提供一个例子:使用flink读取文件并计算每一个单词出现次数
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<!--2.12指的是scala版本-->
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
测试:
//批处理word count
public class WordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String inputPath = "D:\project\flink-demo\src\main\resources\hello.txt";
DataSet<String> inputDataSet = env.readTextFile(inputPath);
//对数据集进行处理,按空格分词展开,转换成(word,1)二元组进行统计
DataSet<Tuple2<String, Integer>> dataSet = inputDataSet.flatMap(new MyFlatMap())
.groupBy(0)//按照第一个位置的word分组
.sum(1);//将第二个位置上的数据求和
dataSet.print();
}
}
public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
//遍历所有word,包成二元组
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
hello.txt:
hello word
hello flink
hello java
hello scala
hello spring
how are you
are you ok
输出结果:
流处理
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String inputPath = "D:\project\flink-demo\src\main\resources\hello.txt";
DataStream<String> dataStream = env.readTextFile(inputPath);
//基于数据流进行转换计算
DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new MyFlatMap())
.keyBy(0).sum(1);
resultStream.print();
//执行任务
env.execute();
}
}
输出结果:
输出结果前面的数字:表示当前并行执行的线程编号,默认并行度跟自己电脑有有关,当然也可以自己设置并行度。
env.setParallelism(4);
流式数据源测试
linux上安装nc
yum install -y nc
nc -lk 7777
代码:
public static void main(String[] args) throws Exception {
//创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// String inputPath = "D:\project\flink-demo\src\main\resources\hello.txt";
// DataStream<String> dataStream = env.readTextFile(inputPath);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
DataStream<String> dataStream = env.socketTextStream(host, port);
//基于数据流进行转换计算
DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new MyFlatMap())
.keyBy(0).sum(1);
resultStream.print();
//执行任务
env.execute();
}
设置命令行参数:
启动方法后,我们在nc控制台输入的数据,flink都能进行实时的流式计算