Mastering-Spark-SQL学习笔记01 SparkSQL

SparkSQL可以让开发人员使用关系化查询对大规模结构化数据进行处理。

像Apache Spark一样,Spark SQL特别适合大规模的分布式内存计算。SparkSQL将关系型处理与Spark的函数式编程API进行整合。

SparkSQL和SparkCore的计算模型的主要区别是注入、查询和持久化(半)结构化数据的关系化框架,使用可以由SQL(带有许多HiveQL的功能)和高级的类SQL的函数式定义的DataSet API进行关系型查询(即结构化查询)。

这里的(半)结构化数据是指可以由列名、列类型以及列是否为空组成的Schema所描述的数据集。

当使用SQL或Query DSL时,查询都会成为一个有强制Encoder的DataSet。

DataSet是带有transformation和action算子进行结构化查询执行管道编程的接口。结构化查询在内部是一棵(逻辑的和物理的)关系型算子和表达式组成的Catalyst tree。

当一个action算子(如直接的show、count,间接的save、saveAsTable)在DataSet上被执行时,(在DataSet背后的)结构化查询会通过以下执行阶段:

1)Logical Analysis 逻辑分析

2)Caching Replacement 缓存替换

3)Logical Query Optimization,逻辑查询优化使用 rule-based和cost-based优化器

4)Physical Planning 物理计划

5)Physical Optimization 物理优化(如 Whole-Stage Java Code Generation 或 Adaptive Query Execution)

6)Constructing the RDD of Internal Binary Rows(根据Spark Core的RDD API表示结构化查询)

从Spark 2.0开始,Spark SQL实际上就是Spark底层内存分布式平台的主要和功能丰富的接口。将Spark Core的RDDs隐藏在更高层次的抽象之后,即使没有您的同意,也可以使用逻辑和物理查询优化策略。

换句话说,Spark SQL的Dataset API描述了一个分布式计算,它最终将被转换为RDDs的DAG(有向无环图)来执行。在幕后,结构化查询被自动编译成相应的RDD操作。

Spark SQL支持批处理和流模式下的结构化查询(后者是Spark SQL的独立模块,称为Spark Structured Streaming)

Spark SQL编程模型:// Define the schema using a case class

case class Person(name: String, age: Int)
// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek",10)))
// Convert RDD[Person] to Dataset[Person] and run a query
// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, ag
e: int]
// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10').where('age <= 19').select('name').as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]
scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+// You could however want to use good ol' SQL, couldn't you?
// 1. Register people Dataset as a temporary view in Catalog 要先将DataSet注册为View,才可以使用SQL
people.createOrReplaceTempView("people") // 2. Run SQL query val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19") scala> teenagers.show +-----+---+ | name|age| +-----+---+ |Jacek| 10| +-----+---+

Spark SQL提供使用了基于规则的查询优化器Wole-Stage Codegen(全流程代码生成,在运行时动态生成比手写更好的代码)、使用内部二进制行格式的Tungsten执行引擎

自Spark SQL 2.2起,结构化查询可以使用Hint框架进一步优化。

Spark SQL引入了一个名为Dataset(以前是DataFrame)的表格数据抽象。DataSet数据抽象的目的是使在Spark基础设施上处理大量结构化表格数据更加简单和快速。

// 处理JSON文件并将它的子集存为CSV
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
// 结构化流式处理代码
import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")
  .select("name", "score")
  .where('score > 15)
  .writeStream
  .format("console")
  .start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+

在Spark 2.0中,Spark SQL的主要数据抽象是Dataset。它表示具有已知模式的记录的结构化数据。这个结构化数据表示数据集支持使用压缩列格式的紧凑二进制表示,这种格式存储在JVM堆之外的托管对象中。

它应该通过减少内存的使用和GC来加快计算速度

Spark SQL支持谓词下推来优化DataSet查询的性能,也可以在运行时生成经过优化的代码。

Spark SQL附带了不同的API:

1. Dataset API(前身是DataFrame API)带有强类型的类LINQ的查询特定领域语言

2. 结构化流API(即流Dataset)用于持续增量地执行结构化查询

3. 非程序员可能会通过与Hive的直接集成来使用SQL作为查询语言。

4. JDBC/ODBC爱好者可以使用JDBC接口(通过Thrift JDBC/ODBC)并将它们的工具连接到Spark的分布式查询引擎。

Spark SQL使用特定的DataFrameReader和DataFrameWriter对象提供了统一的接口以访问分布式存储,如Cassandra、HDFS(Hive、Parquet、JSON)。

你可以使用类SQL在HDFS、S3上的大量数据,能访问的数据可以来自不同的数据源(文件或表)。

Spark SQL定义了以下类型的函数:
1. 标准函数或用户定义函数(UDF),从单个行获取值作为输入,为每个输入行生成单个返回值。

2. 对一组行进行操作并计算每个组的单个返回值的基本聚合函数

3. 窗口聚合函数,对一组行进行操作,并为一组中的每一行计算单个返回值。

有两个支持的目录实现—内存(默认)和hive—您可以使用spark.sql.catalogImplementation来设置。

您可以解析来自外部数据源的数据,并让模式推断者推断模式。

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
scala> df.show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

val query = df.groupBy("i").agg(max("j").as("aggOrdering")).orderBy(sum("j")).as[(Int, Int)]
scala> query.show()
+---+-----------+                                                               
|  i|aggOrdering|
+---+-----------+
|  1|          2|
+---+-----------+
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+

Spark SQL支持两种“模式”来编写结构化查询:Dataset API和SQL。RuntimeReplaceable表达式只能使用SQL模式,如nvl、nvl2、ifnull、nullif等SQL函数。

Spark SQL应用开发的需要以下步骤:

1. 建立开发环境(IDEA,Scala 和 sbt) 

2. 指定库依赖关系

3. 创建 SparkSession

4. 从外部数据源加载 Dataset

5. 转变Dataset

6. 保存Dataset以持久化存储

原文地址:https://www.cnblogs.com/sunspeedzy/p/9431577.html