SparkSQL访问Hive源,MySQL源

作者:黑暗行动

一、SparkSQL访问Hive源

软件环境

hadoop2.7.6
spark-2.3.0
scala-2.11.12
hive-2.1.1

SparkSQL命令行模式可以直接连接Hive的

将hive目录中的 D:Softapache-hive-2.1.1-binconfhive-site.xml 文件拷贝贝到 D:Softsparkconf spark目录中

D:softsparkjars 目录中放 mysql-connector-java-5.1.30.jar 包

Java程序SparkSQL连接Hive

1)将hive目录中的 D:Softapache-hive-2.1.1-binconfhive-site.xml 文件拷贝到 srcmain esources 资源目录中

2)添加依赖

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.3.1</version>
      <scope>provided</scope>
    </dependency>
 
 <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.30</version>
    </dependency>

3) 创建SparkSession

 /**
     * SparkSession
     * 支持数据源:hive
     * @return
     */
    public static SparkSession getSparkSessionForHive() {
        return SparkSession
                .builder()
                .appName("SparkSQLForHive")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate();
    }
  1. 测试代码
 public static void main(String[] args) {
        SparkSession spark = SparkUtil.getSparkSessionForHive();
        spark.sql("show tables").show();
        spark.sql("select * from test1").show();
    }
  1. 运行结果
18/11/18 22:36:44 INFO CodeGenerator: Code generated in 234.231366 ms
18/11/18 22:36:44 INFO CodeGenerator: Code generated in 11.285122 ms
+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
| default|bucket_persion|      false|
| default|   bucket_temp|      false|
| default|         hdfs1|      false|
| default|         hdfs2|      false|
| default|           pt1|      false|
| default|        tbcsv1|      false|
| default|        tbcsv2|      false|
| default|         test1|      false|
| default|  test_table_2|      false|
+--------+--------------+-----------+
 
.........
 
18/11/18 22:36:46 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1346 bytes result sent to driver
18/11/18 22:36:46 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 237 ms on localhost (executor driver) (1/1)
18/11/18 22:36:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/11/18 22:36:46 INFO DAGScheduler: ResultStage 0 (show at redHive.java:14) finished in 0.313 s
18/11/18 22:36:46 INFO DAGScheduler: Job 0 finished: show at redHive.java:14, took 0.352593 s
+-------+---+-------+------+
|   name|age|address|school|
+-------+---+-------+------+
|    chy|  1|     芜湖|    示范|
|    zjj|  2|     南京|    南开|
|gaoxing|  3|    马鞍山|   安工大|
+-------+---+-------+------+
 
18/11/18 22:36:46 INFO SparkContext: Invoking stop() from shutdown hook

二、SparkSQL访问MySql源

Spark环境

spark-2.3.0

添加依赖

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.0</version>
</dependency>
 
<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.22</version>
</dependency>

创建SparkSession

/**
     * SparkSession
     * 支持数据源:textFile,load,csv,json,text,format,jdbc
     * @return
     */
    public static SparkSession getSparkSession() {
        return SparkSession
                .builder()
                .appName("SparkSQL")
                .master("local[*]")
                .getOrCreate();
    }

访问Mysql方式1:

public static void test(){
        String url="jdbc:mysql://localhost:3306/sparksql?user=root&password=123456";
        String tableName="users";
        SparkSession spark= SparkUtil.getSparkSession();
        Map<String,String> map=new HashMap<>();
        map.put("driver","com.mysql.jdbc.Driver");
        map.put("url",url);
        map.put("dbtable",tableName);
        map.put("fetchSize","100");
 
        //读取users信息
        Dataset<Row> jdbcDF = spark.read()
                .format("jdbc")
                .options(map)
                .load();
 
        //读取users信息,保存到users_copy表
        jdbcDF.write()
                .format("jdbc")
                .option("url", url)
                .option("dbtable", "users_copy")
                .save();
       
}

访问Mysql方式2:

public static void test2(){
        String url="jdbc:mysql://localhost:3306/sparksql";
        String tempTableName=" (select id,name from users) as u";
        SparkSession spark= SparkUtil.getSparkSession();
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "root");
        connectionProperties.put("password", "123456");
        connectionProperties.put("isolationLevel","REPEATABLE_READ");
        //读取users信息
        Dataset<Row> jdbcDF2 = spark.read()
                .jdbc(url, tempTableName, connectionProperties);
        //读取users信息,保存到users1表
        jdbcDF2.write()
                .jdbc(url, "users1", connectionProperties);
 
}
原文地址:https://www.cnblogs.com/aixing/p/13327403.html