记录工作中groovy动态生成Flink任务

工作中的痛点:有一个计算的任务,需要配置成前端配置好一些简单的信息,例如名字,计算间隔,计算规则(这个是需要提前写好,开放给用户选择的),然后通过提交到我们的计算引擎中心生成对应的任务jar包提交到服务器上去计算(Flink+groovy)

我这里仅仅记录流程和关键点。

1:将前端传入引擎的信息进行封装

// 解析定义的参数,并封装运算脚本所需的其他参数值(key-value形式)。 
Map<String, Object> ruleParams = parseParamsToMap(taskDef.getRuleParams());

//接下来就是个中封装数据 等等 ,下面粗略的给了一个脚本内容 内容就是计算的方法 scriptContentInfo就是计算方法中需要的一些参数
 ruleParams.put("ScriptContentInfo",taskDef.ScriptContentInfo);

2:自行构建一个groovy的脚本类

class GetRuleTask {

    static void getJobInfo(String ruleName, String scriptContentBody, Map<String, Object> inputParams, String gScriptPath){

// gScriptPath 是脚本所在的路径 通常是idea是放在resource下面,ruleName是创建动态任务类的方法名,scripteContenBody就是类的方法体 inputParams就是方法中使用到的各种参数 GroovyScriptEngine engine = new GroovyScriptEngine(gScriptPath); Binding bind = new Binding(); bind.setVariable("ScriptContentBody",ScriptContentBody); bind.setVariable("ruleName", ruleName); bind.setVariable("inputParams", inputParams);
//CodeLoader下面会介绍,是一个将参数和类绑定在一起生成完整类的操作 engine.run(
"CodeLoader.groovy",bind); AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext(); cxt.scan("com.my.boke.boven.loader"); cxt.refresh();
//这个地方的RuleSet.class 下面会介绍,就是动态生成的类 RuleSet rules
= cxt.getBean(RuleSet.class);
//调用动态类的动态方法 rules.invokeMethod(ruleName,
null);

3:提前创建一个空类 RuleSet

package com.my.boke.boven.loader

class RuleSet {

}

4:CodeLoader介绍

ScriptEngineManager factory = new ScriptEngineManager()
ScriptEngine engine = factory.getEngineByName("groovy")
// 绑定规则的输入参数
Bindings binds = engine.createBindings();
for (Entry<String, Object> entry : inputParams.entrySet()) {
    binds.put(entry.getKey(), entry.getValue());
}
//导入参数所用的到的所有包,其实也不多,也就10几个。
def code_import = """import com.my.boke.boven.common.*;
import com.alibaba.fastjson.JSONObject;
...
...
...
;
"""

//这个地方就是将方法包和方法体进行一个绑定,生成一个完整类
Reader reader = new StringReader(code_import + methodBody);
RuleSet.metaClass.'static'."${methodName}" = {
    ->
    engine.eval(reader, binds)
}

我故意将2步骤写在了前面,就是想体现一点,其实我们的服务运行当中,如果没有启动任务的时候,类中的方法是不存在的,只有当我启动的时候才会去动态的生成。

到这一步只需要remote将jar包提交到flink即可。

以上只提供思路,具体实现根据自己业务实现。

原文地址:https://www.cnblogs.com/yeszero/p/11573264.html