通过java代码进行impala和kudu的对接

对于impala而言,开发人员是可以通过JDBC连接impala的,有了JDBC,开发人员可以通过impala来间接操作kudu;

maven导包:

<!-- https://mvnrepository.com/artifact/com.cloudera/ImpalaJDBC41 -->
    <dependency>
        <groupId>com.cloudera</groupId>
        <artifactId>ImpalaJDBC41</artifactId>
        <version>2.5.41</version>
    </dependency>

通过JDBC连接impala操作kudu

使用JDBC连接impala操作kudu,与JDBC连接mysql做更重增删改查基本一样

创建实体类

package com.impala;

/**
 * Created by angel;
 */
public class Person {

    private int companyId;
    private int workId ;
    private String name;
    private String gender;
    private String photo;
    public Person(int companyId , int workId , String name , String gender , String photo){
        this.companyId = companyId ;
        this.workId = workId ;
        this.name = name ;
        this.gender = gender ;
        this.photo = photo ;
    }
    public int getCompanyId() {
        return companyId;
    }

    public int getWorkId() {
        return workId;
    }

    public String getGender() {
        return gender;
    }

    public String getName() {
        return name;
    }

    public String getPhoto() {
        return photo;
    }

    public void setCompanyId(int companyId) {
        this.companyId = companyId;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setPhoto(String photo) {
        this.photo = photo;
    }

    public void setWorkId(int workId) {
        this.workId = workId;
    }

    @Override
    public String toString() {
        return super.toString();
    }
}
View Code

JDBC连接impala对kudu进行增删改查

package com.impala;

import java.sql.*;

/**
 * Created by angel;
 */
public class Contants {
    private static String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
    private static String CONNECTION_URL = "jdbc:impala://hadoop01:21050/default;auth=noSasl";
    static Connection con = null;
    static ResultSet rs = null;
    static PreparedStatement ps = null;
    //连接
    public static Connection getConn() {


        try {
            Class.forName(JDBC_DRIVER);
            con = DriverManager.getConnection(CONNECTION_URL);
        }catch (Exception e){
            e.printStackTrace();
        }
        return con;
    }
    //查询
    public static ResultSet QueryRows(String sql){
        try {
            ps = con.prepareStatement(sql);
            rs = ps.executeQuery();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return rs;
    }
    //打印
    public static void printRows(ResultSet rs){
        try{

            while (rs.next()) {
            final int companyId = rs.getInt("companyid");
            final int workId = rs.getInt("workid");
            final String name = rs.getString("Name");
            final String gender = rs.getString("gender");
            final String photo = rs.getString("photo");
            System.out.println(companyId + "----" + workId + "----" + name+ "----" +gender+ "----" + photo);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭rs、ps和con
            if(rs != null){
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(ps != null){
                try {
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(con != null){
                try {
                    con.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    //插入
    public static void insertRows(Person person){
        con = getConn();
        String sql = "insert into external_table2 (companyid,workid,name,gender,photo) values(?,?,?,?,?)";
        PreparedStatement pstmt;
        try {

            pstmt = (PreparedStatement) con.prepareStatement(sql);
            pstmt.setInt(1 , person.getCompanyId());
            pstmt.setInt(2, person.getWorkId());
            pstmt.setString(3, person.getName());
            pstmt.setString(4, person.getGender());
            pstmt.setString(5, person.getPhoto());
            pstmt.executeUpdate();

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    //更新
    public static void updateRows(Person person){
        Connection conn = getConn();
        String sql = "update external_table2 set photo='" + person.getPhoto() + "' , name='"+person.getName()+"' where workid=" + person.getWorkId();

        PreparedStatement pstmt;
        try {
            pstmt = (PreparedStatement) conn.prepareStatement(sql);
            pstmt.executeUpdate();

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    //删除
    public static void deleteRows(int workID){
        Connection conn = getConn();
        String sql = "delete from external_table2 where workid="+workID;
        PreparedStatement pstmt;
        try {
            pstmt = (PreparedStatement) conn.prepareStatement(sql);
            pstmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }


}
View Code

测试

package com.impala;

import org.apache.log4j.Logger;
import java.sql.*;


public class ImpalaJdbcClient {
    private static String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
    //默认数据库default,默认用户名就是登录账号 密码 为空
    private static String CONNECTION_URL = "jdbc:impala://hadoop01:21050/default;auth=noSasl";
    private static final Logger log = Logger.getLogger(ImpalaJdbcClient.class);
    public static void main(String[] args) throws Exception {
        final Connection con = Contants.getConn();
        //查询
//        String sql = "select * from external_table2;";
//        final ResultSet rs = Contants.QueryRows(sql);
//        Contants.printRows(rs);
        //插入
//        Contants.insertRows(new Person(11 , 11 , "zhansan" , "female" , "photo10"));
//        String sql = "select * from external_table2;";
//        final ResultSet rs = Contants.QueryRows(sql);
//        Contants.printRows(rs);
        //删除
//        Contants.deleteRows(10);
//        String sql = "select * from external_table2;";
//        final ResultSet rs = Contants.QueryRows(sql);
//        Contants.printRows(rs);
        //更新
//        Contants.updateRows(new Person(1,1,"aaaa" , "male" , "pppppp"));
//        String sql = "select * from external_table2;";
//        final ResultSet rs = Contants.QueryRows(sql);
//        Contants.printRows(rs);
    }
}
View Code
原文地址:https://www.cnblogs.com/niutao/p/10555566.html