livy提交spark应用

 
spark-submit的使用shell时时灵活性较低,livy作为spark提交的一种工具,是使用接口或者java客户端的方式提交,可以集成到web应用中

1.客户端提交的方式

http://livy.incubator.apache.org/docs/latest/programmatic-api.html

核心代码

LivyClient client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build();

try {
  System.err.printf("Uploading %s to the Spark context...
", piJar);
  client.uploadJar(new File(piJar)).get();

  System.err.printf("Running PiJob with %d samples...
", samples);
  double pi = client.submit(new PiJob(samples)).get();

  System.out.println("Pi is roughly: " + pi);
} finally {
  client.stop(true);
}

2.REST API

http://livy.incubator.apache.org/docs/latest/rest-api.html

1.以最常使用的batches接口作为例子,请求参数

 rest 的http

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpUtils {
//post 请求
 public  String postAccess(String url, Map<String, String> headers, String data) {

        HttpPost post = new HttpPost(url);
        if (headers != null && headers.size() > 0) {
            headers.forEach((K, V) -> post.addHeader(K, V));
        }
        try {
            StringEntity entity = new StringEntity(data);
            entity.setContentEncoding("UTF-8");
            entity.setContentType("application/json");
            post.setEntity(entity);
            HttpResponse response = httpClient.execute(post);
            HttpEntity resultEntity = response.getEntity();
            result = EntityUtils.toString(resultEntity);
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("postAccess执行有误" + e.getMessage());
        }
        return result;
    }
}  

livy提交spark应用类,异步线程进行状态打印或者也可以状态监控返回web端

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wanmi.sbc.dw.utils.GsonUtil;
import com.wanmi.sbc.dw.utils.HttpUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

/**
 * @ClassName: com.spark.submit.impl.livy.LivyApp
 * @Description: livy提交spark任务
 * @Author: 小何
 * @Time: 2020/12/15 10:46
 * @Version: 1.0
 */
@Component
public class LivyServer {
    private static final Logger logger = LoggerFactory.getLogger(LivyServer.class);

    private static final List<String> FAIl_STATUS_LIST = Arrays.asList("shutting_down", "error", "dead", "killed");
    private final HashMap<String, String> headers;

    private HttpUtils httpUtils;

    public LivyServer() {
        headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Requested-By", "admin");
    }


    /**
     * 提交参数
     *
     * @param livyParam
     * @return
     */
    @SneakyThrows
    public String batchSubmit(LivyParam livyParam) {
        this.httpUtils = new HttpUtils();
        String livyUri = livyParam.getLivyUri();
        LivyParam livyParamCopy = new LivyParam();
        BeanUtils.copyProperties(livyParam, livyParamCopy);
        livyParamCopy.setLivyUri(null);
        String request = GsonUtil.toJsonString(livyParamCopy);
        logger.info("任务提交信息{}", request);
        String result = httpUtils.postAccess(livyUri + "/batches", headers, request);
        if (!GsonUtil.isJson(result)) {
            logger.info("任务提交错误:{}", result);
            return "error:" + result;
        }
        if (result == null) {
            return "error:" + "livy地址:" + livyUri + "错误,请检查";
        }
        logger.info("提交返回任务返回信息:{}", result);
        JSONObject jsonObject = JSONObject.parseObject(result);
        String state = jsonObject.getString("state");
        String id = jsonObject.getString("id");
        Thread thread = new Thread(() -> {
            try {
                queryState(livyParam.getLivyUri(), id, state);
            } catch (InterruptedException | IOException e) {
                logger.error("线程运行出错:{}", e.fillInStackTrace());
            }
        }, livyParam.getName() + System.currentTimeMillis());
        thread.start();
        return result;
    }


    //提交任务执行状态验证
    public void queryState(String livyUrl, String batchId, String responseState) throws InterruptedException, IOException {
        if (responseState != null && !FAIl_STATUS_LIST.contains(responseState)) {
            boolean isRunning = true;
            while (isRunning) {
                String url = livyUrl + "/batches/" + batchId;
                String batchesInfo = httpUtils.getAccess(url, headers);
                JSONObject info = JSON.parseObject(batchesInfo);
                String id = info.getString("id");
                String sta = info.getString("state");
                String appId = info.getString("appId");
                String appInfo = info.getString("appInfo");
                logger.info("livy:sessionId:{},state:{}", id, sta);
                if ("success".equals(sta)) {
                    logger.info("任务{}:执行完成", appId, appInfo);
                    httpUtils.close();
                    isRunning = false;
                } else if (FAIl_STATUS_LIST.contains(sta) || sta == null) {
                    logger.error("任务{}执行有误,请检查后重新提交:
", appId, batchesInfo);
                    httpUtils.close();
                    isRunning = false;
                } else if ("running".equals(sta) || "idle".equals(sta) || "starting".equals(sta)) {
                    logger.info("查看任务{},运行状态:
{}", appId, batchesInfo);
                } else {
                    logger.info("任务{}状态:{},未知,退出任务查看", id, sta);
                    isRunning = false;
                }
                Thread.sleep(5000);
            }
        }
    }
}

livy请求参数

@Data
public class LivyParam {
    /**
     * livy的地址
     */
    private String livyUri;

    /**
     * 要运行的jar包路径
     */
    private String file;
    /**
     * 运行的代理名
     */
    private String proxyUser;
    /**
     * 运行主类
     */
    private String className;
    /**
     * 主类的参数
     */
        private List<String> args;
    /**
     * 需要运行的jar包
     */
    private String thirdJarPath;
    private List<String> jars;
    private List<String> pyFiles;
    private List<String> files;
    private String driverMemory;
    private Integer driverCores;
    private String executorMemory;
    private Integer executorCores;
    private Integer numExecutors;
    private List<String> archives;
    /**
     * 队列
     */
    private String queue;
    /**
     * appName
     */
    private String name;
    /**
     * 其他配置
     */
    private Map<String, String> conf;

}

测试

      构建参数
        new  livyParam = new LivyParam();
        livyParam.setLivyUri(sparkSubmitParam.getLivyUri());
        livyParam.setClassName(sparkSubmitParam.getClassName());
        livyParam.setArgs(sparkSubmitParam.getArgs());
        livyParam.setConf(sparkSubmitParam.getConf());
        livyParam.setDriverCores(sparkSubmitParam.getDriverCores());
        livyParam.setDriverMemory(sparkSubmitParam.getDriverMemory());
        livyParam.setArchives(sparkSubmitParam.getArchives());
        livyParam.setExecutorCores(sparkSubmitParam.getExecutorCores());
        livyParam.setExecutorMemory(sparkSubmitParam.getExecutorMemory());
        livyParam.setJars(sparkSubmitParam.getJars());
        livyParam.setFile(sparkSubmitParam.getFile());
        livyParam.setName(sparkSubmitParam.getName());
        livyParam.setQueue(sparkSubmitParam.getQueue());
        livyParam.setProxyUser(sparkSubmitParam.getProxyUser());

//发送请求
   String result = liveServer.batchSubmit(livyParam);

  

原文地址:https://www.cnblogs.com/hejunhong/p/14248380.html