Flink 源码(二): Flink Client 实现原理与源码解析(一)

来源:https://mp.weixin.qq.com/s/WiRyQEoDfuowT3LNfZ-NSw

0 本文大纲:

一、我们本次的目的是什么?

这次我们的目的是,在本地的 IDEA 中去 debug flink-clients 代码,然后远程提交给 flink standalone 集群上去执行,看一看 flink 客户端在提交代码之前都干了什么。就像下面这样:

 很简单是不是,瞬间自信心爆棚!在开始之前,我想要提两个发人深省的问题,你平时一定没有考虑过:

问题 1:用户代码如何变成 DAG 图

DataStream<Tuple2<String, Integer>> counts =
   text.flatMap(new Tokenizer())
   .keyBy(value -> value.f0).sum(1);

这个过程就是你经常刷源码解析博客看到的一个问题,“ StreamGraph 是如何生成的?”

问题2:如何保证 Flink 框架依赖和用户代码依赖不冲突

不知道你有没有想过,Flink 框架本身是要依赖很多类库的,但我们自己写的代码也是要依赖很多类库的,万一 Flink 框架依赖的类库,跟我们自己的类库冲突了怎么办?

举个例子,Flink 框架会去依赖一个 2.0 版本的 kafka,你自己的代码中依赖的是 2.1 版本的 kafka,那很有可能就类冲突了啊。这个问题该如何解决?

问题1,可能要下次再解答;问题2,这次就解答!

二、把环境弄好先跑一个,有一个直观感受

之前我已经写了一篇文章,关于搭建环境的,这次就不细说了,只想补充两个跟上次不一样的点。

(1)Virtual Box 虚拟机,装好 linux,并且下载了官方发布包(1.12.0),解压到服务器某个目录上

(2)IDEA 环境中,已经导入了 flink 1.12.0 的源码(注意这里是源码,上面是发布包)

File - Project Structure - Project 改成 1.8

 File - Settings - Build,Execution,Deployment - Compiler - Java Compiler  这里全部改成 8

 然后把下面这些全部删掉

 把 Maven 标签,这里的 java11 勾掉

 然后重新 reload 工程

 (3)重新 reload 之后,把这个工程 package 一下

 (4)IDEA 中新建一个运行配置

 参数如下:

几个比较长的字符如下(根目录换成你自己的)
org.apache.flink.client.cli.CliFrontend
 run D:Codeflinkflinkflink-examplesflink-examples-streaming	argetWordCount.jar
FLINK_CONF_DIR=D:Codeflinkflinkflink-distsrcmain
esources

(5)这个 JobManager 修改成你虚拟机的地址

 (6)直接 debug 运行

 (7)可以顺利执行。

 

原文地址:https://www.cnblogs.com/qiu-hua/p/14383545.html