【NIFI】 Apache NiFI 之 ExecuteScript处理(一)

   本例介绍NiFI ExecuteScript处理器的使用,使用的脚本引擎ECMScript

FlowFile I / O简介

  NiFi中的流文件由两个主要组件构成,即属性和内容。属性是关于内容/流文件的元数据,我们在本系列的第1部分中看到了如何使用ExecuteScript来操作它们流文件的内容本质上只是一个字节集合,没有固有的结构,模式,格式等。各种NiFi处理器假设传入的流文件具有特定的模式/格式(或者从属性中确定它作为“mime.type”或以其他方式推断它。然后,这些处理器可以基于文件确实具有该格式的假设来对内容起作用(并且如果它们不这样,则经常转移到“失败”关系)。处理器也可以输出指定格式的流文件,这在处理器中有描述。NiFi文档

流文件内容的输入和输出(I / O)通过ProcessSession API提供,因此是ExecuteScript的“session”变量(有关更多信息,请参阅第1部分)。一种机制是将回调对象传递给对session.read()或session.write()的调用。将为FlowFile对象创建InputStream和/或OutputStream,并使用相应的回调接口调用回调对象,并传入InputStream和/或OutputStream引用以供回调使用。有三个主要的回调接口,每个接口都有自己的用例:

  InputStreamCallback

    session.read(flowFileinputStreamCallback)方法使用此接口 提供InputStream,从中读取流文件的内容。界面有一个方法:

1 void process(InputStream in) throws IOException

   此接口提供托管输入流以供使用。虽然可以手动关闭流,但输入流会自动打开和关闭。如果您只是从特定的流文件中读取而不是写回来,那么这是您将使用的表单。

   例如,当您想要处理传入的流文件,但创建许多输出流文件时,如 SplitText处理器。

  OutputStreamCallback

    session.write(flowFileoutputStreamCallback)方法使用此接口 来提供要写入流文件内容的OutputStream。界面有一个方法:

1 void process(OutputStream out) throws IOException

    此接口提供托管输出流以供使用。尽管可以手动关闭流,但输出流会自动打开和关闭 - 如果包含这些流的任何流打开应该清除的资源,则非常重要。

    例如,ExecuteScript将从内部或外部文件生成数据,但不生成流文件。然后你将使用session.create()创建一个新的FlowFile,然后使用session.write( flowFileoutputStreamCallback)来插入内容。

  StreamCallback

    session.write(flowFilestreamCallback)方法使用此接口 来提供InputStream和OutputStream,从中读取和/或写入流文件的内容。界面有一个方法:

1 void process(InputStream in, OutputStream out) throws IOException

 

    此接口提供托管输入和输出流以供使用。虽然可以手动关闭流,但输入流会自动打开和关闭 - 如果包含这些流的任何流打开应该清除的资源,则非常重要。

    例如,当您想要处理传入的流文件并用新的东西覆盖其内容时,例如 EncryptContent处理器。

  由于这些回调是Java对象,因此脚本必须创建一个并将其传递给会话方法,还有其他读取和写入流文件的方法,包括:

    • 使用session.read(flowFile)返回一个InputStream。这减轻了对InputStreamCallback的需求,而是返回可以读取的InputStream。作为交换,您必须手动管理(关闭,例如)InputStream。
    • 使用session.importFrom(inputStreamflowFile)从InputStream写入FlowFile。这取代了传递了OutputStreamCallback的session.write()的需要。

 ExecuteScript介绍

  ExecuteScript是一个多功能处理器,允许用户使用编程语言编写自定义逻辑,每次触发ExecuteScript处理器时都会执行该编程语言。为脚本提供以下变量绑定以启用对NiFi组件的访问:

  session:这是对分配给处理器的ProcessSession的引用。该会话允许您对流文件执行操作,如create(),putAttribute()和transfer(),以及read()和write()。

  context:这是对处理器的ProcessContext的引用。它可用于检索处理器属性,关系,Controller Services和StateManager。

  log:这是对处理器的ComponentLog的引用。使用它将消息记录到NiFi,例如log.info('Hello world!')

  REL_SUCCESS:这是对为处理器定义的“成功”关系的引用。它也可以通过引用父类的静态成员(ExecuteScript)来继承,但是某些引擎(如Lua)不允许引用静态成员,因此这是一个便利变量。它还节省了必须使用关系的完全限定名称。

  REL_FAILURE:这是对为处理器定义的“失败”关系的引用。与REL_SUCCESS一样,它也可以通过引用父类的静态成员(ExecuteScript)来继承,但是某些引擎(如Lua)不允许引用静态成员,因此这是一个便利变量。它还节省了必须使用关系的完全限定名称。

  动态属性:ExecuteScript中定义的任何动态属性都将作为设置为与动态属性对应的PropertyValue对象的变量传递给脚本引擎。这允许您获取属性的String值,还可以根据NiFi表达式语言评估属性,将值转换为适当的数据类型(例如布尔值等)等。因为动态属性名称变为脚本的变量名,您必须知道所选脚本引擎的变量命名属性

ExecuteScript使用

  1、从会话中获取传入的流文件

    目的:有到ExecuteScript的传入连接,并希望从队列中检索一个流文件以进行处理

    方法:使用会话对象中的get()方法。此方法返回要处理的下一个最高优先级FlowFile的FlowFile。如果没有要处理的FlowFile,则该方法将返回null。请注意,即使FlowFiles有稳定的流入处理器,也可能返回null。如果处理器有多个并发任务,并且其他任务已经检索到FlowFiles,则会发生这种情况。如果脚本需要FlowFile继续处理,那么如果从session.get()返回null,它应立即返回

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     // All processing code goes here
4 }

  2、从会话中获取多个传入流文件

    目的:有到ExecuteScript的传入连接,并希望从队列中检索多个流文件以进行处理

    方法:使用会话对象中的get(maxResults)方法。此方法从工作队列返回maxResults FlowFiles。如果没有可用的FlowFiles,则返回一个空列表(该方法不返回null)。注意:如果存在多个传入队列,则根据是否在单个调用中轮询所有队列或仅轮询单个队列,未指定行为。话虽如此,这里描述了观察到的行为(对于NiFi 1.1.0+和之前)

    Examples

      Javascript

1 flowFileList = session.get(100)
2 if(!flowFileList.isEmpty()) {
3   for each (var flowFile in flowFileList) { 
4        // Process each FlowFile here
5   }
6 }

  3、创建一个新的FlowFile

    目的:生成一个新的FlowFile以发送到下一个处理器

    方法:使用会话对象中的create()方法。此方法返回一个新的FlowFile对象,您可以对其执行进一步处理

    Examples

      Javascript

1 var flowFile = session.create();
2 // Additional processing here

  4、从父FlowFile创建新的FlowFile

    目的:希望基于传入的FlowFile生成新的FlowFile

    方法:使用会话对象中的create(parentFlowFile)方法。此方法采用父FlowFile引用并返回新的子FlowFile对象。新创建的FlowFile将继承除UUID之外的所有父属性。此方法将自动生成Provenance FORK事件或Provenance JOIN事件,具体取决于在提交ProcessSession之前是否从同一父级生成其他FlowFiles

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     var newFlowFile = session.create(flowFile);
4     // Additional processing here
5 }

  5、向流文件添加属性

    目的:有一个要添加自定义属性的流文件

    方法:使用会话对象中的putAttribute(flowFileattributeKeyattributeValue)方法。此方法使用给定的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
4 }

  6、向流文件添加多个属性

    目的:有一个要添加自定义属性的流文件

    方法:使用会话对象中的putAllAttributes(flowFileattributeMap)方法。此方法使用给定Map中的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。

    Examples

      Javascript

1 var number2 = 2;
2 var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
3 var flowFile = session.get() 
4 if (flowFile != null) {
5     flowFile = session.putAllAttributes(flowFile, attrMap)
6 }

  7、从流文件中获取属性

    目的:有一个流文件,您可以从中检查属性

    方法:使用FlowFile对象中的getAttribute(attributeKey)方法。此方法返回给定attributeKey的String值,如果未找到attributeKey,则返回null。这些示例显示了“filename”属性值的检索。

    Examples

      Javascript

1 var flowFile = session.get() 
2 if (flowFile != null) {
3     var myAttr = flowFile.getAttribute('filename')
4 }
1 var flowFile = session.get() 
2 if (flowFile != null) {
3     var attrs = flowFile.getAttributes();
4     for each (var attrKey in attrs.keySet()) { 
5        // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
6   }
7 }

  8、将流文件传输到关系

    目的:处理流文件(新文件或传入文件)后,您希望将流文件传输到关系(“成功”或“失败”)。在这个简单的情况下,让我们假设有一个名为“errorOccurred”的变量,它指示FlowFile应该传输到哪个关系。

    方法:使用会话对象中的transfer(flowFilerelationship)方法。从文档中:此方法根据给定的关系将给定的FlowFile传输到适当的目标处理器工作队列。如果关系导致多个目标,则复制FlowFile的状态,使得每个目标都接收FlowFile的精确副本,尽管每个目标都具有其自己的唯一标识。

    注意:ExecuteScript将在每次执行结束时执行session.commit()以确保已提交操作。您不需要(也不应该)在脚本中执行session.commit()。

    Examples

      Javascript

 1 var flowFile = session.get();
 2 if (flowFile != null) {
 3    // All processing code goes here
 4    if(errorOccurred) {
 5      session.transfer(flowFile, REL_FAILURE)
 6    }
 7    else {
 8      session.transfer(flowFile, REL_SUCCESS)
 9    }
10 }

  9、以指定的日志记录级别向日志发送消息

    目的:将处理期间发生的某些事件报告给日志记录框架。

    方法:将log变量与warn(),trace(),debug(),info()或error()方法一起使用。这些方法可以使用单个String,或者后跟对象数组的String,或者后跟对象数组后跟Throwable的String。第一个用于简单消息。当您有一些要记录的动态对象/值时,将使用第二个。要在消息字符串中引用这些,请在消息中使用“{}”。这些是按照外观的顺序针对Object数组进行评估的,因此如果消息显示为“Found these things:{} {} {}”并且Object数组为['Hello',1,true],则记录的消息将为“找到这些东西:你好1真的”。这些日志记录方法的第三种形式也采用Throwable参数
    
Examples

      Javascript

1 var ObjectArrayType = Java.type("java.lang.Object[]");
2 var objArray = new ObjectArrayType(3);
3 objArray[0] = 'Hello';
4 objArray[1] = 1;
5 objArray[2] = true;
6 log.info('Found these things: {} {} {}', objArray)

   10、使用回调读取传入流文件的内容

    目的:有到ExecuteScript的传入连接,并希望从队列中检索流文件的内容以进行处理

    方法:使用read(flowFileinputStreamCallback)来自会话对象的方法。传入read()方法需要一个InputStreamCallback对象。请注意,因为InputStreamCallback是一个对象,所以默认情况下内容只对该对象可见。如果需要使用read()方法之外的数据,请使用更全局范围的变量。这些示例将传入流文件的完整内容存储到String中(使用Apache Commons的IOUtils类)。注意:对于大流量文件,这不是最好的技术; 相反,您应该只读取您需要的数据,并根据需要进行处理。对于像SplitText这样的东西,你可以一次读取一行并在InputStreamCallback中处理它,或者使用前面提到的session.read(flowFile)方法来获得在回调之外使用的InputStream引用。

    Examples

      Javascript

 1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback")
 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils")
 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
 4  
 5 var flowFile = session.get();
 6 if(flowFile != null) {
 7   // Create a new InputStreamCallback, passing in a function to define the interface method
 8   session.read(flowFile,
 9     new InputStreamCallback(function(inputStream) {
10         var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
11         // Do something with text here
12     }));
13 }

   11、使用回调将内容写入传出流文件

    目的:传出流文件生成内容

    方法:使用会话对象中的write(flowFileoutputStreamCallback)方法。传递给write()方法需要一个OutputStreamCallback对象。请注意,因为OutputStreamCallback是一个对象,所以默认情况下内容只对该对象可见。如果需要使用write()方法之外的数据,请使用更全局范围的变量。这些示例将示例String写入flowFile。

    Examples

      Javascript

 1 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 4  
 5 var flowFile = session.get();
 6 if(flowFile != null) {
 7   // Create a new OutputStreamCallback, passing in a function to define the interface method
 8   flowFile = session.write(flowFile,
 9     new OutputStreamCallback(function(outputStream) {
10         outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
11     }));
12 }

   12、使用回调覆盖带有更新内容的传入流文件

    目的:重用传入的流文件,但希望修改其传出流文件的内容。

    方法:使用write(flowFilestreamCallback)来自会话对象的方法。传递给write()方法需要StreamCallback对象。StreamCallback提供InputStream(来自传入流文件)和outputStream(用于该流文件的下一个版本),因此您可以使用InputStream获取流文件的当前内容,然后修改它们并将它们写回到流文件。这会覆盖流文件的内容,因此对于追加,您必须通过附加到读入内容来处理它,或者使用不同的方法(使用session.append()而不是session.write())。请注意,由于StreamCallback是一个对象,因此默认情况下内容仅对该对象可见。如果需要使用write()方法之外的数据,请使用更全局范围的变量

    Examples

      Javascript

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new StreamCallback, passing in a function to define the interface method
  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
    }));
}

   13、处理脚本处理过程中的错误

    目的:脚本中发生错误(通过数据验证或抛出异常),并且您希望脚本正常处理它。

    方法:对于异常,使用脚本语言的异常处理机制(通常它们是try / catch块)。对于数据验证,您可以使用类似的方法,但定义一个布尔变量,如“valid”和if / else子句而不是try / catch子句。ExecuteScript定义“成功”和“失败”关系; 通常,您的处理将“好”流文件转移到成功,“坏”流文件转换为失败(在后一种情况下记录错误)

    Examples

      Javascript

 1 var flowFile = session.get();
 2 if(flowFile != null) {
 3   try {
 4     // Something that might throw an exception here
 5  
 6     // Last operation is transfer to success (failures handled in the catch block)
 7     session.transfer(flowFile, REL_SUCCESS)
 8 } catch(e) {
 9   log.error('Something went wrong', e)
10   session.transfer(flowFile, REL_FAILURE)
11 }
12 }

 ExecuteScript-Demo

  1、页面如下图

  

  2、GenerateFlowFile

    

  2、ExecuteScript

    

    脚本内容:

 1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
 2 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
 3 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
 4 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 5  
 6 var flowFile = session.get();
 7 
 8 
 9 if(flowFile != null) {
10 
11     try {
12 
13         var text = "";
14 
15         // 读取flowFile中内容
16         session.read(flowFile,new InputStreamCallback(function(inputStream) {
17             var str = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
18             
19 
20             //由JSON字符串转换为JSON对象
21             var obj = JSON.parse(str); 
22             obj.age = 18
23 
24             //将JSON对象转化为JSON字符
25             text = JSON.stringify(obj); 
26 
27         }));
28 
29         // 向flowFile中写入内容
30         flowFile = session.write(flowFile, new OutputStreamCallback(function(outputStream) {
31 
32             outputStream.write(text.getBytes(StandardCharsets.UTF_8))
33 
34         }));
35 
36         session.transfer(flowFile, REL_SUCCESS)
37 
38     } catch(e) {
39         log.error('Something went wrong', e)
40         session.transfer(flowFile, REL_FAILURE)
41     }
42     
43 }

  3、PutFile

    

    输出文件内容:{"id":1,"name":"god","age":18}

  

  其他脚本引擎,参考以下地址 

  参考文档链接:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

原文地址:https://www.cnblogs.com/h--d/p/10111850.html