Flink 学习(一)

摘自Flink官网https://flink.apache.org/

最近看到公司有Flink平台,正好做过storm和spark streaming上的业务,借着这个机会把flink也学了。正好比较下他们之间的优缺点。

一、流式处理平台

1.Storm

Topology为处理拓扑图

组成:

(1)Spout. 数据分发中心。

(2)Bolt. 数据处理中心

数据单元为Tuple。在Bolt处理完的数据可以发射给下一个Bolt。此时接收到的为Tuple。

缺点:

(1)消息传输保证为At least once. 但是可能出现重复发消息的情况。对每一条数据都做ack,所以容错的开销很大。

(2)延迟比flink大。

(3)吞吐量不如flink 

(4)不支持批处理 

2.Spark Streaming

(1)比较主流的实时计算引擎。但是是居于micro batch处理,并不是纯正的流式处理。

(2)支持处理时间,Structured streaming 支持处理时间和事件时间,同时支持 watermark 机制处理滞后数据。

(3)与Hadoop家族组件交互良好,例如Hbase等。

(4)容错机制,checkpoint。

(5)Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数.

(6)数据单元是RDD,新增了Dstream.直接度kafka获得。

(7)处理过程大致是transformation和action。

3.Flink

(1)数据形式DataStream(Streaming),DataSet(Batch)。

(2)处理过程是Source,Transformation 和 sink。

(3)时间。创建时间EventTime, 进入Flink DataFlow的时间。IngestionTime,对事件进行处理的本地系统时间Processing Time。

(4)窗口。按分割标准划分:timeWindow、countWindow。按窗口行为划分:Tumbling Window、Sliding Window、自定义窗口。

(5)轻量级容错机制。保证Execatly once执行。使用stream replay 和 checkpointing容错。

二、各个组件的介绍

1.JobManager用来分配任务,也就是常说的master

2.TaskManager用来分发task,缓存和交换数据流

3.Slot,把TaskManager根据task把内存抽象很多个slot,用来执行task。

三、Mac系统下安装Flink

Mac下很方便,mac装东西确实是方便。------brew install apache-flink

四、启动

1.启动本地集群环境,很快就能启动起来。在/usr/local/Cellar/apache-flink/1.7.0/libexec目录下。

./bin/start-cluster.sh 

2.然后在 http://localhost:8081/#/overview 就可以看见Flink的监控平台。

可以看到Task Managers是1个。Slots也是一个。

下面还有好几个选项,可以看到你的集群配置环境。

五、Example

WordCount

(1)Code分析

 1 package flinkjob;
 2 
 3 import org.apache.flink.api.common.functions.FlatMapFunction;
 4 import org.apache.flink.api.common.functions.ReduceFunction;
 5 import org.apache.flink.api.java.utils.ParameterTool;
 6 import org.apache.flink.streaming.api.datastream.DataStream;
 7 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 8 import org.apache.flink.streaming.api.windowing.time.Time;
 9 import org.apache.flink.util.Collector;
10 
11 /**
12  * Created by adrian.wu on 2018/12/17.
13  */
14 public class SocketWindowWordCount {
15     public static class WordWithCount {
16 
17         public String word;
18         public long count;
19 
20         public WordWithCount() {}
21 
22         public WordWithCount(String word, long count) {
23             this.word = word;
24             this.count = count;
25         }
26 
27         @Override
28         public String toString() {
29             return word + " : " + count;
30         }
31     }
32 
33 
34     public static void main(String[] args) throws Exception{
35         final int port;
36         try {
37             //得到提交时候的参数
38             final ParameterTool params = ParameterTool.fromArgs(args);
39             //得到端口号,因为这个例子是监听9000端口的例子
40             port = params.getInt("port");
41         } catch (Exception e) {
42             System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
43             return;
44         }
45         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
46         
47         //数据单元 DataStream
48         DataStream<String> text = env.socketTextStream("localhost", port, "
");
49         DataStream<WordWithCount> windowCounts = text
50                 .flatMap(new FlatMapFunction<String, WordWithCount>() {   //map
51                     @Override
52                     public void flatMap(String value, Collector<WordWithCount> out) {
53                         for (String word : value.split("\s")) {
54                             out.collect(new WordWithCount(word, 1L));
55                         }
56                     }
57                 })
58                 .keyBy("word")
59                 .timeWindow(Time.seconds(5), Time.seconds(1))  //Window function, 5秒一个window,间隔1
60                 .reduce(new ReduceFunction<WordWithCount>() {
61                     @Override
62                     public WordWithCount reduce(WordWithCount a, WordWithCount b) {  //reduce
63                         return new WordWithCount(a.word, a.count + b.count);
64                     }
65                 });
66         windowCounts.print().setParallelism(1);
67 
68         env.execute("Socket Window WordCount");
69 
70     }
71 }

(2)打包提交代码

 ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 #提交job
 nc -l 9000 #监听端口
tail -f log/flink-*-taskexecutor-*.out #查看log

(3)在监控平台可以看到你的job情况

谢谢!
原文地址:https://www.cnblogs.com/ylxn/p/10038546.html