SparkSQL连接Hive

1.将$HIVE_HOME/conf/hive-site.xml文件复制一份到$SPARK_HOME/conf/hive-site.xml

cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf

2.直接启动spark-shell就能帮我们自动连接

./spark-shell --master local[2] --jars /usr/local/jar/mysql-connector-java-5.1.47.jar    # --jars:是指定jar包

3.直接启动spark-shell就能帮我们自动连接

./spark-sql --master local[2] --jars /usr/local/jar/mysql-connector-java-5.1.47.jar --driver-class-path /usr/local/jar/mysql-connector-java-5.1.47.jar

4.我们可以启动一个启动thriftserver服务器server,7*24一直running

cd $SPARK_HOME/sbin
./start-thriftserver.sh --master local --jars /usr/local/jar/mysql-connector-java-5.1.47.jar    # 启动默认监听端口10000

5.通过内置了一个客户端工具连接

cd $SPARK_HOME/bin/beeline
./beeline -u jdbc:hive2://192.168.104.94:10000

6.也可以使用代码连接

package com.imooc.bigdata.chapter06

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object JDBCClientApp {

  def main(args: Array[String]): Unit = {

    // 加载驱动
    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn: Connection = DriverManager.getConnection("jdbc:hive2://192.168.104.94:10000")
    val pstmt: PreparedStatement = conn.prepareStatement("show tables")
    val rs: ResultSet = pstmt.executeQuery()

    while(rs.next()) {
      println(rs.getObject(1) + " : " + rs.getObject(2))
    }
  }
}

 

package com.imooc.bigdata.chapter06

import java.util.Properties

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SparkSession}

object HiveSourceApp {

  def main(args: Array[String]): Unit = {


    // 如果你想使用Spark来访问Hive的时候,一定需要开启Hive的支持
    val spark: SparkSession = SparkSession.builder().master("local").appName("HiveSourceApp")
      .enableHiveSupport() //切记:一定要开启
      .getOrCreate()


    // 走的就是连接 default数据库中的pk表,如果你是其他数据库的,那么也采用类似的写法即可
    //spark.table("default.pk").show()


    // input(Hive/MySQL/JSON...) ==> 处理 ==> output (Hive)


    import spark.implicits._

    val config = ConfigFactory.load()
    val url = config.getString("db.default.url")
    val user = config.getString("db.default.user")
    val password = config.getString("db.default.password")
    val driver = config.getString("db.default.driver")
    val database = config.getString("db.default.database")
    val table = config.getString("db.default.table")
    val sinkTable = config.getString("db.default.sink.table")

    val connectionProperties = new Properties()
    connectionProperties.put("user", user)
    connectionProperties.put("password", password)

    val jdbcDF: DataFrame = spark.read
      .jdbc(url, s"$database.$table", connectionProperties).filter($"cnt" > 100)

    //jdbcDF.show()

    jdbcDF.write.saveAsTable("browser_stat_hive")


    jdbcDF.write.insertInto("browser_stat_hive_1")

    spark.stop()

  }
}

  

 

原文地址:https://www.cnblogs.com/yoyo1216/p/13533312.html