Dolphin Scheduler增加Spark任务实例停止功能

api模块直接对server模块进行依赖是个很糟糕的做法,以下内容,仅供娱乐。

------------

DS任务实例面板的操作项仅提供了日志查看功能,当要关停Spark类型的任务实例时,如果YARN和worker不在同一台服务器上,停止工作流并不能同时关停任务实例,因此就想到专门针对Spark类型的任务增加一个任务停止功能。

基本效果如下:

前端

首先在src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue添加停止按钮

<td>
    <x-button
        v-show="item.taskType==='SPARK'"
        :disabled="item.state !== 'RUNNING_EXEUTION'"
        type="error"
        shape="circle"
        size="xsmall"
        data-toggle="tooltip"
        :title="$t('Stop')"
        icon="ans-icon-stop"
        @click="_stopTask(item)">
    </x-button>
</td>

其中_stopTask方法实现如下

_stopTask(item) {
        console.log(item)

        let prom = new Promise((resolve, reject) => {
          io.post(`projects/${projectName}/task-instance/kill`, {  // 发送任务停止请求
            host: item.host,
            taskInstanceId: item.id
          }, res => {
            resolve(res)
          }).catch(e => {
            reject(e)
          })
        })
        prom.then(() => {
          console.log("killed task, will update task list")
          new Promise((resolve, reject) => {
            io.get(`projects/${projectName}/task-instance/list-paging`, this.searchParams, res => {  // 刷新任务列表,杀死任务之后任务状态没那么快变为FAILED,立即刷新任务状态也不一定显示为停止,多刷新一次一般就好了,废话。。
              resolve(res.data)
            }).catch(e => {
              reject(e)
            })
          }).then(res => {
            console.log(res)
            this.taskInstanceList = res.totalList
          }, err => {
            console.log("get task list failed " + err)
          })
        })
      }

完整代码如下

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
  <div class="list-model">
    <div class="table-box">
      <table class="fixed">
        <tr>
          <th scope="col">
            <span>{{$t('#')}}</span>
          </th>
          <th scope="col">
            <span>{{$t('Name')}}</span>
          </th>
          <th scope="col">
            <span>{{$t('Process Instance')}}</span>
          </th>
          <th scope="col" width="70">
            <span>{{$t('Executor')}}</span>
          </th>
          <th scope="col" width="90">
            <span>{{$t('Node Type')}}</span>
          </th>
          <th scope="col" width="40">
            <span>{{$t('State')}}</span>
          </th>
          <th scope="col" width="140">
            <span>{{$t('Submit Time')}}</span>
          </th>
          <th scope="col" width="140">
            <span>{{$t('Start Time')}}</span>
          </th>
          <th scope="col" width="140">
            <span>{{$t('End Time')}}</span>
          </th>
          <th scope="col" width="110">
            <span>{{$t('host')}}</span>
          </th>
          <th scope="col" width="74">
            <span>{{$t('Duration')}}(s)</span>
          </th>
          <th scope="col" width="84">
            <span>{{$t('Retry Count')}}</span>
          </th>
          <th scope="col" width="50">
            <span>{{$t('Operation')}}</span>
          </th>
        </tr>
        <tr v-for="(item, $index) in list" :key="item.id">
          <td>
            <span>{{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}}</span>
          </td>
          <td>
            <span class="ellipsis" :title="item.name">{{item.name}}</span>
          </td>
          <td><a href="javascript:" class="links" @click="_go(item)"><span
            class="ellipsis">{{item.processInstanceName}}</span></a></td>
          <td>
            <span v-if="item.executorName">{{item.executorName}}</span>
            <span v-else>-</span>
          </td>
          <td><span>{{item.taskType}}</span></td>
          <td><span v-html="_rtState(item.state)" style="cursor: pointer;"></span></td>
          <td>
            <span v-if="item.submitTime">{{item.submitTime | formatDate}}</span>
            <span v-else>-</span>
          </td>
          <td>
            <span v-if="item.startTime">{{item.startTime | formatDate}}</span>
            <span v-else>-</span>
          </td>
          <td>
            <span v-if="item.endTime">{{item.endTime | formatDate}}</span>
            <span v-else>-</span>
          </td>
          <td><span>{{item.host || '-'}}</span></td>
          <td><span>{{item.duration}}</span></td>
          <td><span>{{item.retryTimes}}</span></td>
          <td>
            <x-button
              type="info"
              shape="circle"
              size="xsmall"
              data-toggle="tooltip"
              :title="$t('View log')"
              icon="ans-icon-log"
              @click="_refreshLog(item)">
            </x-button>
          </td>
          <td>
            <x-button
              v-show="item.taskType==='SPARK'"
              :disabled="item.state !== 'RUNNING_EXEUTION'"
              type="error"
              shape="circle"
              size="xsmall"
              data-toggle="tooltip"
              :title="$t('Stop')"
              icon="ans-icon-stop"
              @click="_stopTask(item)">
            </x-button>
          </td>
        </tr>
      </table>
    </div>
  </div>
</template>
<script>
  import Permissions from '@/module/permissions'
  import mLog from '@/conf/home/pages/dag/_source/formModel/log'
  import {tasksState} from '@/conf/home/pages/dag/_source/config'
  import io from '@/module/io'
  import localStore from '@/module/util/localStorage'

  // Get the name of the item currently clicked
  let projectName = localStore.getItem('projectName')

  export default {
    name: 'list',
    data() {
      return {
        list: [],
        isAuth: Permissions.getAuth(),
        backfillItem: {},
        searchParams: {
          // page size
          pageSize: 10,
          // page index
          pageNo: 1,
          // Query name
          searchVal: '',
          // Process instance id
          processInstanceId: '',
          // host
          host: '',
          // state
          stateType: '',
          // start date
          startDate: '',
          // end date
          endDate: '',
          // Exectuor Name
          executorName: ''
        }
      }
    },
    props: {
      taskInstanceList: Array,
      pageNo: Number,
      pageSize: Number
    },
    methods: {
      _rtState(code) {
        let o = tasksState[code]
        return `<em class="${o.icoUnicode} ${o.isSpin ? 'as as-spin' : ''}" style="color:${o.color}" data-toggle="tooltip" data-container="body" title="${o.desc}"></em>`
      },
      _stopTask(item) {
        console.log(item)

        let prom = new Promise((resolve, reject) => {
          console.log(`projects/${projectName}/task-instance/kill`)
          io.post(`projects/${projectName}/task-instance/kill`, {
            host: item.host,
            taskInstanceId: item.id
          }, res => {
            resolve(res)
          }).catch(e => {
            reject(e)
          })
        })
        prom.then(() => {
          console.log("killed task, will update task list")
          new Promise((resolve, reject) => {
            io.get(`projects/${projectName}/task-instance/list-paging`, this.searchParams, res => {
              resolve(res.data)
            }).catch(e => {
              reject(e)
            })
          }).then(res => {
            console.log(res)
            this.taskInstanceList = res.totalList
          }, err => {
            console.log("get task list failed " + err)
          })
        })
      },
      _refreshLog(item) {
        let self = this
        let instance = this.$modal.dialog({
          closable: false,
          showMask: true,
          escClose: true,
          className: 'v-modal-custom',
          transitionName: 'opacityp',
          render(h) {
            return h(mLog, {
              on: {
                ok() {
                },
                close() {
                  instance.remove()
                }
              },
              props: {
                self: self,
                source: 'list',
                logId: item.id
              }
            })
          }
        })
      },
      _go(item) {
        this.$router.push({path: `/projects/instance/list/${item.processInstanceId}`})
      },
    },
    watch: {
      taskInstanceList(a) {
        this.list = []
        setTimeout(() => {
          this.list = a
        })
      }
    },
    created() {
    },
    mounted() {
      this.list = this.taskInstanceList
    },
    components: {}
  }
</script>
View Code

 后端

主要是修改api模块的代码。

dolphinscheduler-apisrcmainjavaorgapachedolphinschedulerapicontrollerTaskInstanceController.java中添加任务停止接口

@PostMapping("/kill")
@ResponseStatus(HttpStatus.OK)
public Result kill(@RequestParam("host") String host, @RequestParam("taskInstanceId") Integer taskId) {
    logger.info("start kill task");
    Map<String, Object> result = taskInstanceService.killTask(host, taskId);
    Status status = (Status)result.get(Constants.STATUS);
    if (status == Status.SUCCESS) {
        return success(status.getMsg());
    } else {
        Integer code = status.getCode();
        String msg = (String)result.get(Constants.MSG);
        return error(code, msg);
    }
}
killTask方法在dolphinscheduler-apisrcmainjavaorgapachedolphinschedulerapiserviceTaskInstanceService.java中实现
public Map<String, Object> killTask(String host, Integer taskInstanceId) {
        Map<String, Object> result = new HashMap<>(5);
        TaskInstance taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstanceId);
        if (taskInstance == null) {
            logger.error("cannot find the task to kill:" + taskInstanceId);
            result.put(Constants.STATUS, Status.TASK_INSTANCE_NOT_FOUND);
            return result;
        }

        if (!taskInstance.getTaskType().toLowerCase().equals(TaskType.SPARK.getDescp())) {
            logger.error("cannot kill the task type: " + taskInstance.getTaskType());
            result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
            result.put(Constants.MSG, "not support kill the task type " + taskInstance.getTaskType());
            return result;
        }

        try {
            ProcessUtils.killSparkTask(taskInstance);
        } catch (Exception e) {
            logger.error("kill spark task failed : " + e.getMessage(), e);
            result.put(Constants.STATUS, Status.TASK_KILL_ERROR);
            result.put(Constants.MSG, "kill spark task failed");
            return result;
        }

        result.put(Constants.STATUS, Status.SUCCESS);
        result.put(Constants.MSG, "kill task success");
        return result;
    }
Status.TASK_KILL_ERROR是新加的,在原来的代码中没有。

ProcessUtils.killSparkTask的实现如下
public static void killSparkTask(TaskInstance taskInstance) {
        try {
           
            String state = "{"state": "KILLED"}";
            String appid = ProcessUtils.getYarnAppId(taskInstance);
            String appAddress = HadoopUtils.getAppAddress();

            String url = String.format(appAddress, appid)+"/state";
            logger.info("kill yarn task request url is " + url);

            String response = HttpUtils.put(url, state);
            logger.info("kill yarn task response: " + response);
        } catch (Exception e) {
            throw e;
        }
    }

其中ProcessUtils.getYarnAppId(taskInstance);是从已有代码中抽取出来的

private static String getYarnAppId(TaskInstance taskInstance) {
        String appId = null;
        try {
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            int port = PropertyUtils.getInt(Constants.LOGGER_SERVER_RPC_PORT);
            LogClient logClient = new LogClient(taskInstance.getHost(), port);

            String log = logClient.viewLog(taskInstance.getLogPath());
            if (StringUtils.isNotEmpty(log)) {
                List<String> appIds = LoggerUtils.getAppIds(log, logger);
                String workerDir = taskInstance.getExecutePath();
                if (StringUtils.isEmpty(workerDir)) {
                    logger.error("task instance work dir is empty");
                    throw new RuntimeException("task instance work dir is empty");
                }
                if (appIds.size() > 0) {
                    appId = appIds.get(appIds.size() - 1);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

        return appId;
    }
HttpUtils.put(url, state);的实现代码如下
public static String put(String url, String json) {
        // create HttpClient
        CloseableHttpClient httpclient = HttpClients.createDefault();
        CloseableHttpResponse response = null;
        String responseContent = null;

        try {
            // create http post request
            HttpPut httpPut = new HttpPut(url);
            httpPut.setHeader("Content-Type", "application/json;charset=UTF-8");

            StringEntity se = new StringEntity(json);
            se.setContentType("text/json");

            httpPut.setEntity(se);

            // execute
            response = httpclient.execute(httpPut);
        } catch (Exception e) {
           logger.error(e.getMessage(), e);
        } finally {
            try {
                responseContent = EntityUtils.toString(response.getEntity(), "UTF-8");
                if (response != null) {
                    response.close();
                }
                httpclient.close();
            } catch (Exception ex) {
                logger.error(ex.getMessage(), ex);
            }
        }

        return responseContent;
    }


总体来说,实现起来不难,基本都是借用已有代码,就是需要前期理清原代码的思路。

原文地址:https://www.cnblogs.com/144823836yj/p/12566995.html