spark hive 小结

依赖:

 

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>

集成Hive

Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的 一点是,如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。

ps:一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了

1.使用内置hive

ps:版本为1.2.1

ps:需要注意内置hive是非常容易出现问题的
1.先启动集群/opt/software/spark-2.2.0-bin-hadoop2.7/sbin/start-all.sh
2.进入到spark-shell模式/opt/software/spark-2.2.0-bin-hadoop2.7/bin/spark-shell --master spark://hadoop01:7077
3.在spark-shell下操作hive
spark.sql("show tables").show 查询所有hive的表
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING)") 创建表
spark.sql("LOAD DATA LOCAL INPATH '/opt/software/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src") 添加数据
spark.sql("SELECT * FROM src").show 查询表中数据
会出现一个问题FileNotFoundException 没有找到文件
通过在主节点和从节点查询可以发现,主节点存在spark-warehouse目录 目录中是存在数据的
但是在从节点中没有这个文件夹,所以此时将文件夹分发到从节点
scp -r ./spark-warehouse/ root@hadoop02:$PWD
再次执行查询
ps:这样表面看查询是没什么问题了,但是实际问题在于是讲master节点上的数据分发到从节点上的,那么不可能说每次操作有了数据都执行拷贝操作,所以此时就需要使用HDFS来进行存储数据了
所以先将所有从节点上的spark-warehouse删除掉
删除后将主节点上的spark-warehouse和metastor_db删除掉
然后在启动集群的时候添加一个命令
--conf spark.sql.warehouse.dir=hdfs://hadoop01:8020/spark_warehouse
此方法只做一次启动即可 后续再启动集群的时候就无需添加这个命了 因为记录在metastore_db中了
ps:spark-sql中可以直接使用SQL语句操作

2.集成外部hive(重要)

1.将Hive中的hive-site.xml软连接到Spark安装目录下的conf目录下。[主节点有即可]
ln -s /opt/software/apache-hive-1.2.1-bin/conf/hive-site.xml /opt/software/spark-2.2.0-bin-hadoop2.7/conf/hive-site.xml
2.打开spark shell,注意带上访问Hive元数据库的JDBC客户端
将mysql驱动jar包拷贝到spark的bin目录下
./spark-shell --master spark://hadoop01:7077 --jars mysql-connector-java-5.1.36.jar

ps:做完外部hive链接需要注意,因为hive-site.xml文件是在Spark的conf目录下,若直接启动spark-shell无论是单机版还是集群版都会出现报错 Error creating transactional connection factory 原因在于,启动时会加载hive-site.xml文件,所以必须添加jar路径, 为了以后使用建议删除软连接,需要的时候在做外部hive的连接
删除软连接方式:
rm -rf 软连接方式

 

总结:

若要把Spark SQL连接到一个部署好的Hive上,你必须把hive-site.xml复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

3.通过代码操作(重要)

ps:需要有Hadoop本地环境

import org.apache.spark.sql.{Row, SparkSession}
object HiveCode {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("HiveCode")
      .config("spark.sql.warehouse.dir", "D:\spark-warehouse")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
    import spark.sql
   // sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)")
    sql("LOAD DATA LOCAL INPATH  'dir/kv1.txt' INTO TABLE src_1")
    sql("SELECT * FROM src_1").show()
    sql("SELECT COUNT(*) FROM src_1").show()
    val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key")
    sqlDF.as("mixing").show()
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")
    sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show()
  }
}
case class Record(key: Int, value: String)
 

ps:本地若出现"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder和权限异常问题

在cmd命令下进入到D:/hadoop2.7.1/bin目录下执行命令即可

winutils.exe chmod 777 /tmp/hive

连接服务器hive

ps:需要添加hive-site.xml hdfs-site.xml core-site.xml

这里出现了一个异常 Could not find the uri with key [dfs.encryption.key.provider.uri] to create a KeyProvider !! 这个异常并没有找到解决问题的办法,结果自己能执行了!!!!! 谁能解决麻烦备注说明!

我的Hadoop集群是高可用所以我在windows下配置了C:WindowsSystem32driversetc路径下的

hosts文件 只要应对我的 mycluster (这一个不配置应该是可以的)

192.168.223.111 mycluster 192.168.223.112 mycluster

import java.io.File

import org.apache.spark.sql.{Row, SparkSession}

object HiveCode {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("HiveCode")
      .config("spark.sql.warehouse.dir", "hdfs://hadoop01:9000/spark_warehouse")
      .master("spark://hadoop01:7077")
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
    import spark.sql
    //sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)")
   // sql("LOAD DATA LOCAL INPATH 'dir/kv1.txt' INTO TABLE src_1")
    sql("SELECT * FROM src_1").show()
  sql("SELECT COUNT(*) FROM src_1").show()
   val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key")
   sqlDF.as("mixing").show()
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")
    sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show()
  }
}
case class Record(key: Int, value: String)

原文地址:https://www.cnblogs.com/lshan/p/13964168.html