深入Apache NiFi 之源码学习

前言

要问 Hortonworks 这家公司最有产品力的产品是什么,我觉得是 Apache NiFi.去年Cloudera 和 Hortonworks 合并之后,以 Cloudera 为主,两家公司进行产品整合.Cloudera 从 Hortonworks 家没拿啥东西,唯独拿来了 Apache NiFi ,并借鉴了 HDF 平台,整合成了全新的产品 Cloudera DataFlow(CDF)产品,并且大有把 CDF 做成自家拳头产品之势.Cloudera 官网 PRODUCTS 展示的第一个产品就是 Data Flow.Apache NiFi 是数据领域非常有潜力,也很优秀的一款产品.但目前来看,NiFi 在数据处理领域还不火.一是目前数据处理领域内生态很繁荣,选择很多.另一个是我们离真正的 data driven 还有一些时间和距离.目前很多公司都热衷于搭建各自的数据 pipline,这方面的技术选型着实很多.我觉得 NiFi 在这方面是比较超前的,NiFi 把数据 pipline 在往前推进成--data flow.这也是 NiFi 的核心思想.随着5G和DT时代的到来,data flow 将会成为一个非常普遍的技术概念.

NiFi 是什么

NiFi 的简介这里不赘述,详细可见 上一篇文章 或者是 官网 .这里从源码的角度上来讲讲 NiFi 的本质,在我看来,NiFi 是一套借鉴 flow-based programming (FBP) 编程范式写成的 Java 编程工具箱.首先它是用 Java 语言写成的系统级应用或者说是框架,对于 FBP,它是一种编程范式,如面向过程,面向对象,函数式,流式编程等一样.FBP是流式编程的一种,简单理解就是把程序看作是信息数据包,处理器以及处理器间的连接组成的有向图.这是 NiFi 的使用方式,也可以称这是使用 NiFi 来进行数据编程.说它是工具箱,是因为它内置了很多的 Java 组件,框架和技术.它将各种 Java 框架技术包装成 NiFi 中的处理器或者服务等组件,开箱即用,组合成不同的 data flow ,对于开发者来说,也可以叫编写成不同的应用程序.

比如说,NiFi 把 jetty server 包装成 handlehttprequesthandlehttpresponse 两个处理器,我们可以基于这两个处理器加上一个 execute sql 处理器来连一个有向图,这其实就是一个 restful 接口了.

为什么要研究 NiFi 源码

研究 NiFi 的源码主要有两个目的,也可以说是有两个好处.

  1. 进行 NiFi 的二次开发,对于 NiFi 的二次开发其实有两个层面.
    • 第一个层面是利用 NiFi 提供的扩展机制,如前所述, NiFi 是一个 java 工具箱,包装了很多丰富的 java 工具.那么我们也可以开发我们自己的 java 工具,把它放进 NiFi 的工具箱里面来进行使用.在这个层面上,也是可以完成很多特定的二次开发的需求的.
    • 第二个层面那就是修改 NiFi 的源码,然后重新进行编译,打包了.
  2. NiFi 源码是绝好的 Java 学习项目. NiFi 核心框架对 IO,并发,异步,缓冲缓存,容错等技术方面的处理和设计都是非常值得学习和研究的. 此外, NiFi Java 工具箱里的工具也是很多 Java 框架和技术的封装(这里深刻明白了 Java 的生态是如此的强大和繁荣),通过阅读处理器的代码也能加深对各种 Java 框架和技术的理解.

编译 NiFi 源码

要研究 NiFi 源码,首先得编译 NiFi 源码. NiFi 源码可以从 github 上面 clone 下来,它是一个 maven 工程,使用 maven 编译打包.官网给出了非常详细的 编译步骤 ,这也是我非常喜欢 NiFi 的一点,它的官方文档真的非常非常细致. 为了便于概览和阅读,我将 NiFi 工程导入到 eclipse 里面进行编译.在 windows 平台下使用 eclipse 编译 NiFi 有几处地方要注意下:

  • 可能会由于 eclipse 的插件版本过低导致报错
  • 可能要反复下载 pom 声明的依赖. NiFi 工程编译要下载比较多的 jar 包依赖和 build 插件.所以要保证网络畅通,修改默认的 maven 仓库镜像,建议使用阿里云的镜像.
    <mirror> <id>nexus-ali</id> <mirrorOf>central</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </mirror>
  • 另外工程比较繁杂,有些 pom 里面的 maven 插件生命周期有重叠,可以忽略一些插件的错误.

NiFi 源码框架分析

当 NiFi 源码编译好之后,就开始正式进入源码分析.在 eclipse 里面左边视图拉一下列表,可以看到,NiFi 总共有 394 个 pom 工程.一下子感觉无从下手,这个时候我们还是要回过头来看 NiFi 官网上的架构图:

记住这个图, NiFi 首先是一个 JVM 应用,其次最上层是一个 web server.我们就从这里入手开始阅读 NiFi 的源码.
再回到 eclipse 里面来,要分析 maven 工程的源码,首要看的就是 pom 文件,通过 pom 文件,我们能够得知这个工程的大概.可以发现,这394个 pom 工程大部分是子工程,都是一层套一层的父子pom 工程.那么我们首先找到最顶级的pom 工程,也就是 nifi:

这么一看,整个 NiFi 工程还是清晰了一些.

  • nifi-api 就是 nifi 的应用程序接口,里面就是定义了整个工程所需要用到的接口,注解,抽象类和枚举等基本的接口和信息.
  • nifi-assembly 负责 nifi 的成品装配, 工程打包最后形成的可供部署的压缩包就在这个工程里的 target 目录内.
  • nifi-bootstarp 负责 nifi 这个 jvm 应用程序的启动相关事宜
  • nifi-commons nifi 诸多特性,比如data-provenance,expression-language,s2s 传输 的实现就在这里,同时也是 nifi 的工具类集合
  • nifi-docker nifi 的 docker 应用相关
  • nifi-docs nifi 的文档 实现相关
  • nifi-external nifi 内部元信息和外部交换,主要用于集群模式下
  • nifi-framework-api 这就是nifi 核心框架层的api,也就是架构图中的 Flow Controller 那一层,注意这里只是各种接口信息定义,不是实现.
  • nifi-maven-archetypes 这里只是为了生成两个 maven archetype,一个是 nifi 自定义处理器的脚手架,一个是 nifi 自定义服务的脚手架.这些脚手架在 maven 的中央仓库都有提供.
  • nifi-mock 用于 nifi 的 mock 测试
  • nifi-nar-bundles 之前一直说的 nifi java 工具箱就是这里了.整个 nifi 里面大部分的 maven 工程都是这个工程的子工程.在这个工程里面,一个 bundle 就是一个工具,也对应着上面架构图里的 Extension
  • nifi-toolkit 这里面是 nifi 的命令行工具的实现.nifi 也提供了比较丰富的命令行指令.

以上就是 NiFi 源码的总体结构了.有了总体的概览以后,我们需要研究哪一方面的源码实现,就可以直接去相应的 module 里面看了.

进入源码

接下来,正式进入源码阅读.很直观的,对于 NiFi 这个应用程序,我们首先关心的是它是如何启动的,以及它的启动和初始化流程.所以,我们首先进入的是 nifi-bootstrap 这个 module :

我们很快就找到了 RunNiFi 这个类了,从名字上我们就能很直观的了解到就是这个类启动了 NiFi .这里说下,阅读框架源码的时候,不一定要每一行代码都去读,只看关键部分或者看注释,一般质量比较高的源码都会有比较详尽的注释和准确的命名.拉开 eclipse 右边 outline 视图,发现了 main 方法,这里就是 jvm 应用的入口了.

我们再回头看 nifi 的 启动脚本 nifi.sh 里面,可以找到这个命令:

正好对上了.
接着看这个 main 方法, 里面获取上面启动脚本的命令行输入参数,找到具体的 start 方法:

switch (cmd.toLowerCase()) { case "start": runNiFi.start(); break;

接着看 start 方法,里面做了很多前期的准备性工作,主要是加载 bootstrap.conf 里配置的属性,以及在里面构建另外一个 java cmd 命令:
final ProcessBuilder builder = new ProcessBuilder();
builder.command(cmd);
Process process = builder.start();

这个构建的命令行如下(可从 nifi 启动日志里面找到):

/opt/jdk1.8.0_131/bin/java
-classpath /opt/nifi-1.7.1/./conf:/opt/nifi-1.7.1/./lib/logback-core-1.2.3.jar:/opt/nifi-1.7.1/./lib/jetty-schemas-3.1.jar:/opt/nifi-1.7.1/./lib/logback-classic-1.2.3.jar:/opt/nifi-1.7.1/./lib/jul-to-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/jcl-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-properties-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-runtime-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-framework-api-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-nar-utils-1.7.1.jar:/opt/nifi-1.7.1/./lib/javax.servlet-api-3.1.0.jar:/opt/nifi-1.7.1/./lib/log4j-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/slf4j-api-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-api-1.7.1.jar
-Dorg.apache.jasper.compiler.disablejsr199=true
-Xmx3g -Xms3g
-Djavax.security.auth.useSubjectCredsOnly=true
-Djava.security.egd=file:/dev/urandom
-Dsun.net.http.allowRestrictedHeaders=true
-Djava.net.preferIPv4Stack=true
-Djava.awt.headless=true -XX:+UseG1GC
-Djava.protocol.handler.pkgs=sun.net.www.protocol
-Duser.timezone=Asia/Shanghai
-Dnifi.properties.file.path=/opt/nifi-1.7.1/./conf/nifi.properties
-Dnifi.bootstrap.listen.port=56653
-Dapp=NiFi
-Dorg.apache.nifi.bootstrap.config.log.dir=/opt/nifi-1.7.1/logs org.apache.nifi.NiFi

所以这个 start 方法总的做的事只是启动了另外一个 java 进程,这个进程才是真正的 NiFi runtime ,注意上面命令的最后一行, org.apache.nifi.NiFi 这就是 NiFi 真正的实例程序.

NiFi 实例的本质探索

这个 NiFi 在哪里了?我们可以逐步找到它,这个 NiFi 在 nifi-framework 里面找到,nifi-framework-api 在根工程下,但 nifi-framework 的实现在 nifi-nar-bundles 里面,也就是 nifi-framework-bundle:

nifi-framework 里面就是 nifi 的核心框架代码了:

找到里面的 nifi-runtime , org.apache.nifi.NiFi 就在这个里面了:

可以看到 nifi 的命名非常规范,通过命名我们基本就可以找到对应的功能和源码.
打开 NiFi ,看到 main 方法,非常简单:

   /**
     * Main entry point of the application.
     *
     * @param args things which are ignored
     */
    public static void main(String[] args) {
        LOGGER.info("Launching NiFi...");
        try {
            NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
            new NiFi(properties);
        } catch (final Throwable t) {
            LOGGER.error("Failure to launch NiFi due to " + t, t);
        }
    }

生成了一个 NiFi 实例.这个构造方法调用了另外一个构造方法:

下面这个就是 NiFi 的真正实例构造方法,里面进行了很多初始化的操作.
看这个方法源码,也是直接先看注释,不需要一行行去读.先找到关键性的代码:

  final Bundle systemBundle = SystemBundle.create(properties);

        // expand the nars
        final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);

        // load the extensions classloaders
        NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();

        narClassLoaders.init(rootClassLoader,
                properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());

上面就是非常关键性的代码,上面说过,NiFi 是一个 java 工具箱,里面包装了很多 java 工具.从这里开始,我们就来探究这些 java 工具的本质.
这个 java 工具箱在 NiFi 在用户看来其实就是安装目录下面的 lib 文件夹:

java 工具其实就是 lib 目录下的一个个的 nar 包:

这些 nar 其实就是一种压缩文件,我们可以把它解压缩:

可以看到,里面的本质还是 jar 包:

可以得出结论, nar 包 其实就是 nifi 包装了其他额外信息的 jar 包集合的压缩包而已.所以 nar 包本质还是 jar 包,所以就回到了我们熟悉的领域了.看上面的源码注释.第一步就是先把这些 nar 包全部解开 ExtensionMapping 里面去.
然后加载并初始化这些 nar 包扩展的类加载器:

 // load the extensions classloaders
        NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();

        narClassLoaders.init(rootClassLoader,
                properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());

        // load the framework classloader
        final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
        if (frameworkClassLoader == null) {
            throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
        }

        final Set<Bundle> narBundles = narClassLoaders.getBundles();

在 NiFi 的官方介绍中,有两处它的特性介绍是扩展和类加载隔离,这里我们可以对它这两个特性的实现一探究竟了.它为每一个 nar 包构造了一个独立的自定义的类加载器: NarClassLoader

目前基本清晰, NiFi 的 扩展性是由自定义的压缩文件 nar 包 和 自定义的类加载器来提供的. 接着往下看:

// load the server from the framework classloader
        Thread.currentThread().setContextClassLoader(frameworkClassLoader);
        Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
        Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);

        final long startTime = System.nanoTime();
        nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
        nifiServer.setExtensionMapping(extensionMapping);
        nifiServer.setBundles(systemBundle, narBundles);

看到这里回想到一开始的架构图,图中 JVM 的最上层是 web server , 这个 web server 就是在这里被加载了,可以看到,这是一个 jetty server ,接着往下看:

if (shutdown) {
            LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
        } else {
            nifiServer.start();

            if (bootstrapListener != null) {
                bootstrapListener.sendStartedStatus(true);
            }

            final long duration = System.nanoTime() - startTime;
            LOGGER.info("Controller initialization took " + duration + " nanoseconds "
                    + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
        }

start 这个 nifiServer ,这个 NiFi 对象的构造方法这里就全部走完了.

NiFi Web

上面的代码走完之后, NiFi 实例化并没有完全完成,我们先放一放,回头理一下工程项目结构.上面的 NiFiServer 的 start() 方法之后,代码就跳转到了 nifi-framework 下面的另外一个子工程里面了,这个子工程就是 nifi-web :

可以看到, 跟 nifi web 相关的源码都在这个子工程里面了,包括 nifi 的 server 代码和界面代码,而上面所说的 NiFiServer 这个类就在 nifi-jetty 工程里面:

接着看 JettyServer 这个类,上面的 NiFi 构造方法里面最后是先实例化了这个 JettyServer ,然后调用了 start 方法.先看它的构造方法,只看注释,找到了核心方法:

     // load wars from the bundle
      Handler warHandlers = loadWars(bundles);

大致可以看到,其实就是把 war 包加载进来了,这些 war 包就是 nifi-web 下面的子工程,有几个子工程的 pom 文件中配置的就是 <packaging>war</packaging>

接着看这个 start 方法:
第一句就是 ExtensionManager.discoverExtensions(systemBundle, bundles); 就是这里把所有的扩展类加载进 JVM 了, 看到看到 ExtensionManager 的注释,这个注释就说明了一切

Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
@ThreadSafe - is immutable

这个 ExtensionManager 在加载类的时候,用到了java 的一种比较高级的机制, java SPI(service provider interface),这种机制在很多框架中比如 spring 中大量使用
final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), bundle.getClassLoader());
有兴趣的可以自己去查阅相关资料,这个机制解释了为什么写自定义的处理器的时候要在 /resources /META-INF/services 目录下面写上配置文件.在自定义处理开发的时候,一定要注意写这个配置文件,否则类是加载不进来的(我第一次写自定义处理器就跳坑了)
接着 start 这个 jetty server,接着往下看,只看注释,可以看到,大致就是做了 server context 以及 filter 的注入工作了:
// ensure the appropriate wars deployed successfully before injecting the NiFi context and security filters // this must be done after starting the server (and ensuring there were no start up failures)

基本到这里, NiFi 的实例化和初始化流程基本就有个大致了解了.我们可以接着再进一步,看到 nifi-web-api 这个工程,这个工程其实就是 nifi 的 restful 接口工程,nifi 的所有 restful 接口都是这里实现的,包括处理器的新增,处理器的连接以及处理器的 start 等.
在里面随便打开一个以 resource 结尾的类:

这里我们看到了非常熟悉的注解了,接着打开 resources 文件夹,看到了 context.xml 文件 :

原来这是一个 spring 的 web 工程.然后找到一个关键的 configuration 类:

这里基本就清楚了, NiFi 实例内所有的对象都是通过 spring ioc 注入的.

总结

现在为止,从开发角度对 NiFi 就有了一个基本的认识了,它是一个 JVM 应用,它通过独立的类加载器加载类,使用 spring ioc 注入和管理对象.从以上的分析,我们了解到了 NiFi 的扩展性特性的大致实现,也了解了架构图最上面的一部分源码.至于它其他诸多特性的源码和实现,则需要花更多的时间研究 nifi-framework-core 工程了.

原文地址:https://www.cnblogs.com/hdpdriver/p/10610654.html