dolphin 增加钉钉通知-功能

dolphin 1.6已经增加了企业微信的功能。

公司用的钉钉,那么增加功能到钉钉指定分组,可以针对分组将错误的任务或者被杀死的任务进行提示。

alert模块,增加配置:

1  enterprise.dingtalk.enable=true
2  enterprise.dingtalk.secret=钉钉秘钥
3  enterprise.dingtalk.url=钉钉地址
4 # enterprise.dingtalk.url=https://oapi.dingtalk.com/robot/send?access_token=token

增加配置,Constants增加配置

1     public static final String ENTERPRISE_DINGTALK_ENABLE="enterprise.dingtalk.enable";
2 
3     public static final String ENTERPRISE_DINGTALK_SECRET="enterprise.dingtalk.secret";
4 
5     public static final String ENTERPRISE_DINGTALK_URL="enterprise.dingtalk.url";

增加工具类

  1 package org.apache.dolphinscheduler.alert.utils;
  2 
  3 import com.alibaba.fastjson.JSON;
  4 import com.alibaba.fastjson.JSONObject;
  5 import org.apache.dolphinscheduler.common.utils.HttpUtils;
  6 import org.apache.http.HttpEntity;
  7 import org.apache.http.client.ClientProtocolException;
  8 import org.apache.http.client.methods.CloseableHttpResponse;
  9 import org.apache.http.client.methods.HttpPost;
 10 import org.apache.http.entity.StringEntity;
 11 import org.apache.http.impl.client.CloseableHttpClient;
 12 import org.apache.http.impl.client.HttpClients;
 13 import org.apache.http.util.EntityUtils;
 14 import org.slf4j.Logger;
 15 import org.slf4j.LoggerFactory;
 16 
 17 import javax.crypto.Mac;
 18 import javax.crypto.spec.SecretKeySpec;
 19 import java.io.IOException;
 20 import java.net.URLEncoder;
 21 import java.util.Base64;
 22 
 23 public class EnterpriseDingTalkUtils {
 24     public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class);
 25 
 26 //    private static final boolean ENTERPRISE_DINGTALK_ENABLE = PropertyUtils.getBoolean(Constants.ENTERPRISE_DINGTALK_ENABLE);
 27 
 28     private static final String ENTERPRISE_DINGTALK_URL = PropertyUtils.getString(Constants.ENTERPRISE_DINGTALK_URL);
 29 
 30     private static final String ENTERPRISE_DINGTALK_SECRET = PropertyUtils.getString(Constants.ENTERPRISE_DINGTALK_SECRET);
 31 
 32     /**
 33      * get Enterprise Ding Ding is enable
 34      * @return isEnable
 35      */
 36     public static boolean isEnable(){
 37         return PropertyUtils.getBoolean(Constants.ENTERPRISE_DINGTALK_ENABLE);
 38     }
 39 
 40     /**
 41      * get Secret string and time number
 42      */
 43     public static String encodeKey(){
 44         try {
 45             //获取时间戳
 46             Long timestamp = System.currentTimeMillis();
 47             //把时间戳和密钥拼接成字符串,中间加入一个换行符
 48             String stringToSign = timestamp + "
" + ENTERPRISE_DINGTALK_SECRET;
 49             //声明一个Mac对象,用来操作字符串
 50             Mac mac = null;
 51             mac = Mac.getInstance("HmacSHA256");
 52 
 53             //初始化,设置Mac对象操作的字符串是UTF-8类型,加密方式是SHA256
 54             mac.init(new SecretKeySpec(ENTERPRISE_DINGTALK_SECRET.getBytes("UTF-8"), "HmacSHA256"));
 55             //把字符串转化成字节形式
 56             byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8"));
 57             //新建一个Base64编码对象
 58             Base64.Encoder encoder = Base64.getEncoder();
 59             //把上面的字符串进行Base64加密后再进行URL编码
 60             String sign = URLEncoder.encode(new String(encoder.encodeToString(signData)),"UTF-8");
 61             System.out.println(timestamp);
 62             System.out.println(sign);
 63 
 64             String result = "&timestamp=" + timestamp + "&sign=" + sign;
 65             return result;
 66         } catch (Exception e) {
 67             e.printStackTrace();
 68             return null;
 69         }
 70     }
 71     public static String getJsonBodyString(String msg){
 72         JSONObject result = new JSONObject();
 73         JSONObject text = new JSONObject();
 74         text.put("content", msg);
 75         result.put("text", text);
 76         result.put("msgtype", "text");
 77         String jsonString = JSON.toJSONString(result);
 78         return jsonString;
 79     }
 80 
 81     public static void sendMessageToDingTalk(String msg){
 82         String enterDingTalkUrl = ENTERPRISE_DINGTALK_URL + encodeKey();
 83         String jsonBodyStr = getJsonBodyString(msg);
 84         CloseableHttpClient httpClient = HttpClients.createDefault();
 85         try {
 86             HttpPost httpPost = new HttpPost(enterDingTalkUrl);
 87             httpPost.setHeader("Content-Type", "application/json;charset=utf8");
 88             httpPost.setEntity(new StringEntity(jsonBodyStr, Constants.UTF_8));
 89             CloseableHttpResponse response = httpClient.execute(httpPost);
 90             String resp;
 91             try {
 92                 HttpEntity entity = response.getEntity();
 93                 resp = EntityUtils.toString(entity, Constants.UTF_8);
 94                 EntityUtils.consume(entity);
 95             } finally {
 96                 response.close();
 97             }
 98             logger.info("Enterprise DingTalk send [{}], param:{}, resp:{}",
 99                     ENTERPRISE_DINGTALK_URL, msg, resp);
100         } catch (ClientProtocolException e) {
101             e.printStackTrace();
102         } catch (IOException e) {
103             e.printStackTrace();
104         } finally {
105             try {
106                 httpClient.close();
107             } catch (IOException e) {
108                 e.printStackTrace();
109             }
110         }
111     }
112 
113 }

在server模块的worker运行逻辑(TaskExecuteThread)加入:

    @Override
    public void run() {
        String result = "";
        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
        try {
            logger.info("script path : {}", taskExecutionContext.getExecutePath());
            // task node
            TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);

            // copy hdfs/minio file to local
            downloadResource(taskExecutionContext.getExecutePath(),
                    taskExecutionContext.getResources(),
                    logger);

            taskExecutionContext.setTaskParams(taskNode.getParams());
            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
            taskExecutionContext.setDefinedParams(getGlobalParamsMap());

            // set task timeout
            setTaskTimeout(taskExecutionContext, taskNode);

            taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
                    taskExecutionContext.getProcessDefineId(),
                    taskExecutionContext.getProcessInstanceId(),
                    taskExecutionContext.getTaskInstanceId()));

            task = TaskManager.newTask(taskExecutionContext, taskLogger);

            // task init
            task.init();
            result = String.format("task name=%s,task-id=%s,type=%s error",taskNode.getName(),taskNode.getId(),taskNode.getType());
            // task handle
            task.handle();

            // task result process
            task.after();
            responseCommand.setStatus(task.getExitStatus().getCode());
            responseCommand.setEndTime(new Date());
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
        } catch (Exception e) {
            logger.error("task scheduler failure", e);
            kill();
            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
            responseCommand.setEndTime(new Date());
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
        } finally {
            taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
            ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
            taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            clearTaskExecPath();
            //
            if(EnterpriseDingTalkUtils.isEnable()) {
                if (responseCommand.getStatus() == ExecutionStatus.FAILURE.getCode() ||
                        responseCommand.getStatus() == ExecutionStatus.KILL.getCode()) {
                    EnterpriseDingTalkUtils.sendMessageToDingTalk(result);
                }
            }
        }
    }

参考:https://segmentfault.com/a/1190000022077236

有朝一日同风起,扶摇直上九万里
原文地址:https://www.cnblogs.com/wind-man/p/15132971.html