通过livy向CDH集群的spark提交任务

 场景

应用后台远程调用spark任务

简单介绍下livy

Submit Jobs from Anywhere

Livy enables programmatic, fault-tolerant, multi-tenant submission of Spark jobs from web/mobile apps (no Spark client needed). So, multiple users can interact with your Spark cluster concurrently and reliably.

Use Interactive Scala or Python

Livy speaks either Scala or Python, so clients can communicate with your Spark cluster via either language remotely. Also, batch job submissions can be done in Scala, Java, or Python.

No Code Changes Needed

Don’t worry, no changes to existing programs are needed to use Livy. Just build Livy with Maven, deploy the configuration file to your Spark cluster, and you’re off! 

Livy是基于Apache许可的一个服务,它可以让远程应用通过REST API比较方便的与Spark集群交互。通过简单的REST接口或RPC客户端库,它可以让你轻松的提交Spark作业或者Spark代码片段,同步或者异步的结果检索,以及SparkContext管理。Livy还简化了Spark和应用程序服务器之间的交互,从而为web/mobile应用简化Spark架构。

主要功能有:
1.由多个客户端为多个Spark作业使用长时间运行的SparkContexts。
2.同时管理多个SparkContexts,让它们在集群中(YARN/Mesos)运行,从而实现很好的容错和并发,而不是在Livy服务上运行。
3.预编译的jars,代码片段或者Java/Scala客户端API都可以用来提交作业。
4.安全认证的通信。(比如kerberos)

livy的rest api(参考链接

代码如下

依赖:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>

demo代码

package com.dtmobile.livy;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class HttpUtils {

    public static HttpURLConnection init(HttpURLConnection conn){
        conn.setDoInput(true);
        conn.setDoOutput(true);
        conn.setRequestProperty("charset","utf-8");
        conn.setRequestProperty("Content-Type","application/json");
        return conn;
    }

    /**
     * HttpGET请求
     */
    public static JSONObject getAccess(String urlStr) {

        HttpURLConnection conn = null;
        BufferedReader in = null;
        StringBuilder builder = null;
        JSONObject response = null;
        try {
            URL url = new URL(urlStr);
            conn = init((HttpURLConnection) url.openConnection());
            conn.setRequestMethod("GET");
            conn.connect();

            in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8"));
            String line = "";
            builder = new StringBuilder();
            while((line = in.readLine()) != null){
                builder.append(line);
            }
            
            response = JSON.parseObject(builder.toString());

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (conn!=null)
                conn.disconnect();
            try {
                if (in != null)
                    in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return response;
    }

    /**
     * HttpDelete请求
     */
    public static Boolean deleteAccess(String urlStr) {
        HttpURLConnection conn = null;
        try {
            URL url = new URL(urlStr);
            conn = init((HttpURLConnection) url.openConnection());
            conn.setRequestMethod("DELETE");
            conn.connect();

            conn.getInputStream().close();

            conn.disconnect();
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }

        return true;
    }



    /**
     * HttpPost请求
     */
    public static String postAccess(String urlStr, JSONObject data)  {
        HttpURLConnection conn = null;
        BufferedReader in = null;
        StringBuilder builder = null;
        DataOutputStream out = null;
        try {
            URL url = new URL(urlStr);
            conn = init((HttpURLConnection) url.openConnection());
            conn.setRequestMethod("POST");
            conn.connect();

            out = new DataOutputStream(conn.getOutputStream());
            out.write(data.toString().getBytes("utf8"));
            out.flush();

            in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8"));
            String line = "";
            builder = new StringBuilder();
            while((line = in.readLine()) != null){
                builder.append(line);
            }

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (conn!= null)
                conn.disconnect();
            try {
                if (in!=null)
                    in.close();
                if (out!=null)
                    out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (builder != null)
            return builder.toString();
        return "";
    }

}

 

package cn.com.dtmobile.livy;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;

public class LivyApp {

    static String host = "http://172.xx.x.xxx:8998";

    public static int submitJob() throws JSONException {
        JSONObject data = new JSONObject();

        JSONObject conf = new JSONObject();//spark相关配置
        conf.put("spark.master","yarn");
     conf.put("spark.submit.deployMode","cluster");
        
        data.put("conf",conf);
        data.put("proxyUser","etluser");
        data.put("file","/kong/data/jar/your_jar.jar");// 指定执行的spark jar (hdfs路径)
        data.put("jars",new String[]{"/kong/data/jar/dbscan-on-spark_2.11-0.2.0-SNAPSHOT.jar"});//指定spark jar依赖的外部jars
        data.put("className", "cn.com.dtmobile.spark.App");
        data.put("name","jonitsiteplan");
        data.put("executorCores",3);
        data.put("executorMemory","2g");
        data.put("driverCores",1);
        data.put("driverMemory","4g");
        data.put("numExecutors",6);
        data.put("queue","default");
        data.put("args",new String[]{"杭州","yj_hangzhou","2019041719"});//传递参数
        
        String res = HttpUtils.postAccess(host + "/batches", data);
        
        JSONObject resjson = JSON.parseObject(res);
        System.out.println("id:"+resjson.getIntValue("id"));
        return resjson.getIntValue("id");
    }

    public static void getJobInfo(int id){
//        JSONObject response = HttpUtils.getAccess(host + "/batches/3");
//        System.out.print(response.toString(1));

//        JSONObject log = HttpUtils.getAccess(host + "/batches/3/log");
//        System.out.print(log.toString(1));

        JSONObject state = HttpUtils.getAccess(host + "/batches/"+id+"/state");
        System.out.println(state.getString("state"));
    }

    public static void killJob(int id){
        // 可以直接kill掉spark任务
        if(HttpUtils.deleteAccess(host+"/batches/"+id)) {
            System.out.println("kill spark job success");
        }
        
    }

    public static void main(String[] args) {
        int id  = submitJob();
        while(true) {
            try {
                getJobInfo(id);
                Thread.sleep(10000);
            } catch (InterruptedException e) {
            }
        }
        
//        killJob(9);
    }


}

执行提交以后,可以在livy的UI界面上任务的状态,也可以通过api轮询的获取任务状态,以及任务的输出日志

yarn界面


查看任务日志

查看spark任务执行的日志可以查看 livy rest server的角色日志,如下:

原文地址:https://www.cnblogs.com/zz-ksw/p/11345908.html