spark学习进度18(SparkSQL读写)

初识 DataFrameReader:

SparkSQL 的一个非常重要的目标就是完善数据读取, 所以 SparkSQL 中增加了一个新的框架, 专门用于读取外部数据源, 叫做 DataFrameReader

 @Test
  def reader1(): Unit = {
    // 1. 创建 SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("reader1")
      .getOrCreate()

    // 2. 框架在哪
    val reader: DataFrameReader = spark.read
  }

DataFrameReader 由如下几个组件组成

组件解释

schema

结构信息, 因为 Dataset 是有结构的, 所以在读取数据的时候, 就需要有 Schema 信息, 有可能是从外部数据源获取的, 也有可能是指定的

option

连接外部数据源的参数, 例如 JDBC 的 URL, 或者读取 CSV 文件是否引入 Header 等

format

外部数据源的格式, 例如 csvjdbcjson 等

DataFrameReader 有两种访问方式, 一种是使用 load 方法加载, 使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csvjsonjdbc 等

/**
    * 初体验 Reader
    */
  @Test
  def reader2(): Unit = {
    // 1. 创建 SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("reader1")
      .getOrCreate()

    // 2. 第一种形式
    spark.read
      .format("csv")//设置文件的类型
      .option("header", value = true)//提示包含有表头
      .option("inferSchema", value = true)//推断结构信息
      .load("dataset/BeijingPM20100101_20151231.csv")
      .show(10)

    // 3. 第二种形式
    spark.read
      .option("header", value = true)
      .option("inferSchema", value = true)
      .csv("dataset/BeijingPM20100101_20151231.csv")
      .show()
  }

 

初识 DataFrameWriter:

对于 ETL 来说, 数据保存和数据读取一样重要, 所以 SparkSQL 中增加了一个新的数据写入框架, 叫做 DataFrameWriter

DataFrameWriter 中由如下几个部分组成

组件解释

source

写入目标, 文件格式等, 通过 format 方法设定

mode

写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定

extraOptions

外部参数, 例如 JDBC 的 URL, 通过 optionsoption 设定

partitioningColumns

类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定

bucketColumnNames

类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定

sortColumnNames

用于排序的列, 通过 sortBy 设定

mode 指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等

Scala 对象表示字符串表示解释

SaveMode.ErrorIfExists

"error"

将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错

SaveMode.Append

"append"

将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中

SaveMode.Overwrite

"overwrite"

将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标

SaveMode.Ignore

"ignore"

将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS


@Test
def writer1(): Unit = {
System.setProperty("hadoop.home.dir","C:\winutils")
// 2. 读取数据集
val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")

// 3. 写入数据集两种方法
df.write.json("dataset/beijing_pm.json")

df.write.format("json").save("dataset/beijing_pm2.json")
}

读写 Parquet 格式文件

什么时候会用到 Parquet ?

00a2a56f725d86b5c27463f109c43d8c

在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.

为了能够保存比较复杂的数据, 并且保证性能和压缩率, 通常使用 Parquet 是一个比较不错的选择.

所以外部系统收集过来的数据, 有可能会使用 Parquet, 而 Spark 进行读取和转换的时候, 就需要支持对 Parquet 格式的文件的支持.

 @Test
  def parquet(): Unit = {
    // 1. 读取 CSV 文件的数据
    val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")

    // 2. 把数据写为 Parquet 格式
    // 写入的时候, 默认格式就是 parquet
    // 写入模式, 报错, 覆盖, 追加, 忽略
    df.write//默认文件写入是Parquet
      .mode(SaveMode.Overwrite)//写入的模式
      .save("dataset/beijing_pm3")

    // 3. 读取 Parquet 格式文件
    // 默认格式是否是 paruet? 是
    // 是否可能读取文件夹呢? 是
    spark.read
      .load("dataset/beijing_pm3")
      .show()
  }

分区:

 写文件进行分区:

@Test
  def parquetPartitions(): Unit = {
    // 1. 读取数据
//    val df = spark.read
//      .option("header", value = true)
//      .csv("dataset/BeijingPM20100101_20151231.csv")

    // 2. 写文件, 表分区
//    df.write
//      .partitionBy("year", "month")
//      .save("dataset/beijing_pm4")

    // 3. 读文件, 自动发现分区
    // 写分区表的时候, 分区列不会包含在生成的文件中
    // 直接通过文件来进行读取的话, 分区信息会丢失
    // spark sql 会进行自动的分区发现
    spark.read
      .parquet("dataset/beijing_pm4")
      .printSchema()
  }

 

JSON:读写 JSON 格式文件

什么时候会用到 JSON ?

00a2a56f725d86b5c27463f109c43d8c

在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.

在业务系统中, JSON 是一个非常常见的数据格式, 在前后端交互的时候也往往会使用 JSON, 所以从业务系统获取的数据很大可能性是使用 JSON 格式, 所以就需要 Spark 能够支持 JSON 格式文件的读取

  @Test
  def json(): Unit = {
    val df = spark.read
      .option("header", value = true)
      .csv("dataset/BeijingPM20100101_20151231.csv")

//    df.write
//      .json("dataset/beijing_pm5.json")

    spark.read
      .json("dataset/beijing_pm5.json")
      .show()
  }

 json的两个应用场景:

 /**
    * toJSON 的场景:
    * 处理完了以后, DataFrame中如果是一个对象, 如果其他的系统只支持 JSON 格式的数据
    * SParkSQL 如果和这种系统进行整合的时候, 就需要进行转换
    */
  @Test
  def json1(): Unit = {
    val df = spark.read
      .option("header", value = true)
      .csv("dataset/BeijingPM20100101_20151231.csv")

    df.toJSON.show()
  }

  /**
    * 从消息队列中取出JSON格式的数据, 需要使用 SparkSQL 进行处理
    */
  @Test
  def json2(): Unit = {
    val df = spark.read
      .option("header", value = true)
      .csv("dataset/BeijingPM20100101_20151231.csv")

    val jsonRDD = df.toJSON.rdd

    spark.read.json(jsonRDD).show()
  }

访问 Hive

  1. 整合 SparkSQL 和 Hive, 使用 Hive 的 MetaStore 元信息库

  2. 使用 SparkSQL 查询 Hive 表

  3. 案例, 使用常见 HiveSQL

  4. 写入内容到 Hive 表

SparkSQL 整合 Hive

  1. 开启 Hive 的 MetaStore 独立进程

  2. 整合 SparkSQL 和 Hive 的 MetaStore

和一个文件格式不同, Hive 是一个外部的数据存储和查询引擎, 所以如果 Spark 要访问 Hive 的话, 就需要先整合 Hive

整合什么 ?

如果要讨论 SparkSQL 如何和 Hive 进行整合, 首要考虑的事应该是 Hive 有什么, 有什么就整合什么就可以

  • MetaStore, 元数据存储

    SparkSQL 内置的有一个 MetaStore, 通过嵌入式数据库 Derby 保存元信息, 但是对于生产环境来说, 还是应该使用 Hive 的 MetaStore, 一是更成熟, 功能更强, 二是可以使用 Hive 的元信息

  • 查询引擎

    SparkSQL 内置了 HiveSQL 的支持, 所以无需整合

为什么要开启 Hive 的 MetaStore

Hive 的 MetaStore 是一个 Hive 的组件, 一个 Hive 提供的程序, 用以保存和访问表的元数据, 整个 Hive 的结构大致如下

20190523011946

由上图可知道, 其实 Hive 中主要的组件就三个, HiveServer2 负责接受外部系统的查询请求, 例如 JDBCHiveServer2 接收到查询请求后, 交给 Driver 处理, Driver 会首先去询问 MetaStore 表在哪存, 后 Driver 程序通过 MR 程序来访问 HDFS 从而获取结果返回给查询请求者

而 Hive 的 MetaStore 对 SparkSQL 的意义非常重大, 如果 SparkSQL 可以直接访问 Hive 的 MetaStore, 则理论上可以做到和 Hive 一样的事情, 例如通过 Hive 表查询数据

而 Hive 的 MetaStore 的运行模式有三种

  • 内嵌 Derby 数据库模式

  • 这种模式不必说了, 自然是在测试的时候使用, 生产环境不太可能使用嵌入式数据库, 一是不稳定, 二是这个 Derby 是单连接的, 不支持并发

  • Local 模式

    Local 和 Remote 都是访问 MySQL 数据库作为存储元数据的地方, 但是 Local 模式的 MetaStore 没有独立进程, 依附于 HiveServer2 的进程

  • Remote 模式

    和 Loca 模式一样, 访问 MySQL 数据库存放元数据, 但是 Remote 的 MetaStore 运行在独立的进程中

我们显然要选择 Remote 模式, 因为要让其独立运行, 这样才能让 SparkSQL 一直可以访问

配置一下hive启动RunJar:

 

 

 sparkSql的配置:

 

 

sparksql的使用: 

创建表:

 

 

 

 

 访问表:

 

 sparksql创建hive表:

val createTableStr =
  """
    |CREATE EXTERNAL TABLE student
    |(
    |  name  STRING,
    |  age   INT,
    |  gpa   string
    |)
    |ROW FORMAT DELIMITED
    |  FIELDS TERMINATED BY '	'
    |  LINES TERMINATED BY '
'
    |STORED AS TEXTFILE
    |LOCATION '/dataset/hive'
  """.stripMargin

spark.sql("CREATE DATABASE IF NOT EXISTS spark_integrition1")
spark.sql("USE spark_integrition1")
spark.sql(createTableStr)
spark.sql("LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student")
spark.sql("select * from student limit 10").show()

通过sparksqi将数据写入 hive:

使用 SparkSQL 处理数据并保存进 Hive 表

前面都在使用 SparkShell 的方式来访问 Hive, 编写 SQL, 通过 Spark 独立应用的形式也可以做到同样的事, 但是需要一些前置的步骤, 如下

Step 1: 导入 Maven 依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

Step 2: 配置 SparkSession

如果希望使用 SparkSQL 访问 Hive 的话, 需要做两件事

  1. 开启 SparkSession 的 Hive 支持

    经过这一步配置, SparkSQL 才会把 SQL 语句当作 HiveSQL 来进行解析

  2. 设置 WareHouse 的位置

    虽然 hive-stie.xml 中已经配置了 WareHouse 的位置, 但是在 Spark 2.0.0 后已经废弃了 hive-site.xml 中设置的 hive.metastore.warehouse.dir, 需要在 SparkSession 中设置 WareHouse 的位置

  3. 设置 MetaStore 的位置
val spark = SparkSession
  .builder()
  .appName("hive example")
  .config("spark.sql.warehouse.dir", "hdfs://node01:8020/dataset/hive")  
  .config("hive.metastore.uris", "thrift://node01:9083")                 
  .enableHiveSupport()                                                   
  .getOrCreate()
  设置 WareHouse 的位置
  设置 MetaStore 的位置
  开启 Hive 支持

配置好了以后, 就可以通过 DataFrame 处理数据, 后将数据结果推入 Hive 表中了, 在将结果保存到 Hive 表的时候, 可以指定保存模式

val schema = StructType(
  List(
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("gpa", FloatType)
  )
)

val studentDF = spark.read
  .option("delimiter", "	")
  .schema(schema)
  .csv("dataset/studenttab10k")

val resultDF = studentDF.where("age < 50")

resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark_integrition1.student") 
  通过 mode 指定保存模式, 通过 saveAsTable 保存数据到 Hive
package cn.itcast.spark.sql

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}

object HiveAccess {

  def main(args: Array[String]): Unit = {
    // 1. 创建 SparkSession
    //    1. 开启 Hive 支持
    //    2. 指定 Metastore 的位置
    //    3. 指定 Warehouse 的位置
    val spark = SparkSession.builder()//不能指定master
      .appName("hive access1")
      .enableHiveSupport()
      .config("hive.metastore.uris", "thrift://hadooplinux01:9083")
      .config("spark.sql.warehouse.dir", "/dataset/hive")
      .getOrCreate()

    import spark.implicits._

    // 2. 读取数据
    //    1. 上传 HDFS, 因为要在集群中执行, 没办法保证程序在哪个机器中执行
    //        所以, 要把文件上传到所有的机器中, 才能读取本地文件
    //        上传到 HDFS 中就可以解决这个问题, 所有的机器都可以读取 HDFS 中的文件
    //        它是一个外部系统
    //    2. 使用 DF 读取数据

    val schema = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("gpa", FloatType)
      )
    )

    val dataframe = spark.read
      .option("delimiter", "	")
      .schema(schema)//传入结构信息
      .csv("hdfs://hadooplinux01:0900/dataset/studenttab10k")

    val resultDF = dataframe.where('age > 50)

    // 3. 写入数据, 使用写入表的 API, saveAsTable
    resultDF.write.mode(SaveMode.Overwrite)//写入模式
      .saveAsTable("spark03.student")//库名加表名
  }
}

只有打包在集群中运行。

 JDBC:

连接、创建、建表、创建用户、

 

  • Step 1: 连接 MySQL 数据库

    在 MySQL 所在的主机上执行如下命令

    mysql -u root -p
  • Step 2: 创建 Spark 使用的用户

    登进 MySQL 后, 需要先创建用户

    CREATE USER 'spark'@'%' IDENTIFIED BY 'Spark123!';
    GRANT ALL ON spark_test.* TO 'spark'@'%';
  • Step 3: 创建库和表

    CREATE DATABASE spark_test;
    
    USE spark_test;
    
    CREATE TABLE IF NOT EXISTS `student`(
    `id` INT AUTO_INCREMENT,
    `name` VARCHAR(100) NOT NULL,
    `age` INT NOT NULL,
    `gpa` FLOAT,
    PRIMARY KEY ( `id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;

     数据操作:

    使用 SparkSQL 向 MySQL 中写入数据

    其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问

    在使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark 访问 JDBC, 有如下几个配置可用

    属性含义

    url

    要连接的 JDBC URL

    dbtable

    要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法

    fetchsize

    数据抓取的大小(单位行), 适用于读的情况

    batchsize

    数据传输的大小(单位行), 适用于写的情况

    isolationLevel

    事务隔离级别, 是一个枚举, 取值 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE, 默认为 READ_UNCOMMITTED

package cn.itcast.spark.sql

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}

/**
  * MySQL 的访问方式有两种: 使用本地运行, 提交到集群中运行
  *
  * 写入 MySQL 数据时, 使用本地运行, 读取的时候使用集群运行
  */
object MySQLWrite {

  def main(args: Array[String]): Unit = {
    // 1. 创建 SparkSession 对象
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("mysql write")
      .getOrCreate()

    // 2. 读取数据创建 DataFrame
    //    1. 拷贝文件
    //    2. 读取
    val schema = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("gpa", FloatType)
      )
    )

    val df = spark.read
      .schema(schema)
      .option("delimiter", "	")
      .csv("dataset/studenttab10k")

    // 3. 处理数据
    val resultDF = df.where("age < 30")

    // 4. 落地数据
    resultDF.write
      .format("jdbc")
      .option("url", "jdbc:mysql://hadooplinux01:3306/spark02")
      .option("dbtable", "student")
      .option("user", "root")
      .option("password", "511924!")
      .mode(SaveMode.Overwrite)
      .save()
  }

}
原文地址:https://www.cnblogs.com/dazhi151/p/14266743.html