spark学习进度17(Catalyst优化器、dataset介绍、dataframe介绍)

 RDD 和 SparkSQL 运行时的区别

RDD 的运行流程

1e627dcc1dc31f721933d3e925fa318b

大致运行步骤

先将 RDD 解析为由 Stage 组成的 DAG, 后将 Stage 转为 Task 直接运行

问题

任务会按照代码所示运行, 依赖开发者的优化, 开发者的会在很大程度上影响运行效率

解决办法

创建一个组件, 帮助开发者修改和优化代码, 但是这在 RDD 上是无法实现的

SparkSQL 提供了什么?

72e4d163c029f86fafcfa083e6cf6eda

和 RDD 不同, SparkSQL 的 Dataset 和 SQL 并不是直接生成计划交给集群执行, 而是经过了一个叫做 Catalyst 的优化器, 这个优化器能够自动帮助开发者优化代码

也就是说, 在 SparkSQL 中, 开发者的代码即使不够优化, 也会被优化为相对较好的形式去执行Catalyst

 

为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst, 整个 SparkSQL 的架构大致如下

4d025ea8579395f704702eb94572b8de
  1. API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句

  2. 收到 SQL 语句以后, 将其交给 CatalystCatalyst 负责解析 SQL, 生成执行计划等

  3. Catalyst 的输出应该是 RDD 的执行计划

  4. 最终交由集群运行

 
67b14d92b21b191914800c384cbed439
 

总结

SparkSQL 和 RDD 不同的主要点是在于其所操作的数据是结构化的, 提供了对数据更强的感知和分析能力, 能够对代码进行更深层的优化, 而这种能力是由一个叫做 Catalyst 的优化器所提供的

Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行

 

Dataset 的特点

目标

  1. 理解 Dataset 是什么

  2. 理解 Dataset 的特性

Dataset 是什么?

@Test
  def dataset1(): Unit = {
    // 1. 创建 SparkSession
    val spark = new sql.SparkSession.Builder()
      .master("local[6]")
      .appName("dataset1")
      .getOrCreate()

    // 2. 导入隐式转换
    import spark.implicits._

    // 3. 演示
    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
    val dataset = sourceRDD.toDS()

    // Dataset 支持强类型的 API
    dataset.filter( item => item.age > 10 ).show()
    // Dataset 支持弱类型 API
    dataset.filter( 'age > 10 ).show()
    dataset.filter( $"age" > 10 ).show()
    // Dataset 可以直接编写 SQL 表达式
    dataset.filter("age > 10").show()
  }

问题1: People 是什么?

People 是一个强类型的类

问题2: 这个 Dataset 中是结构化的数据吗?

非常明显是的, 因为 People 对象中有结构信息, 例如字段名和字段类型

问题3: 这个 Dataset 能够使用类似 SQL 这样声明式结构化查询语句的形式来查询吗?

当然可以, 已经演示过了

问题4: Dataset 是什么?

Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API 和类似 RDD 一样的命令式 API

Dataset 的底层是什么?

Dataset 最底层处理的是对象的序列化形式, 通过查看 Dataset 生成的物理执行计划, 也就是最终所处理的 RDD, 就可以判定 Dataset 底层处理的是什么形式的数据

val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd

dataset.queryExecution.toRdd 这个 API 可以看到 Dataset 底层执行的 RDD, 这个 RDD 中的范型是 InternalRowInternalRow 又称之为 Catalyst Row, 是 Dataset 底层的数据结构, 也就是说, 无论 Dataset 的范型是什么, 无论是 Dataset[Person] 还是其它的, 其最底层进行处理的数据结构都是 InternalRow

所以, Dataset 的范型对象在执行之前, 需要通过 Encoder 转换为 InternalRow, 在输入之前, 需要把 InternalRow 通过 Decoder 转换为范型对象

cc610157b92466cac52248a8bf72b76e
@Test
  def dataset2(): Unit = {
    // 1. 创建 SparkSession
    val spark = new sql.SparkSession.Builder()
      .master("local[6]")
      .appName("dataset1")
      .getOrCreate()

    // 2. 导入隐式转换
    import spark.implicits._

    // 3. 演示
    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
    val dataset = sourceRDD.toDS()

//    dataset.explain(true)
    // 无论Dataset中放置的是什么类型的对象, 最终执行计划中的RDD上都是 InternalRow
    val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd
  }

 可以获取 Dataset 对应的 RDD 表示

在 Dataset 中, 可以使用一个属性 rdd 来得到它的 RDD 表示, 例如 Dataset[T] → RDD[T]

def dataset3(): Unit = {
    // 1. 创建 SparkSession
    val spark = new sql.SparkSession.Builder()
      .master("local[6]")
      .appName("dataset1")
      .getOrCreate()

    // 2. 导入隐式转换
    import spark.implicits._

    // 3. 演示
//    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
//    val dataset = sourceRDD.toDS()
    val dataset: Dataset[Person] = spark.createDataset(Seq(Person("zhangsan", 10), Person("lisi", 15)))

    //    dataset.explain(true)
    // 无论Dataset中放置的是什么类型的对象, 最终执行计划中的RDD上都是 InternalRow
    // 直接获取到已经分析和解析过的 Dataset 的执行计划, 从中拿到 RDD
    val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd

    // 通过将 Dataset 底层的 RDD[InternalRow] 通过 Decoder 转成了和 Dataset 一样的类型的 RDD
    val typedRdd: RDD[Person] = dataset.rdd

    println(executionRdd.toDebugString)
    println()
    println()
    println(typedRdd.toDebugString)
  }
  使用 Dataset.rdd 将 Dataset 转为 RDD 的形式
  Dataset 的执行计划底层的 RDD

可以看到 (1) 对比 (2) 对了两个步骤, 这两个步骤的本质就是将 Dataset 底层的 InternalRow 转为 RDD 中的对象形式, 这个操作还是会有点重的, 所以慎重使用 rdd 属性来转换 Dataset 为 RDD

总结

  1. Dataset 是一个新的 Spark 组件, 其底层还是 RDD

  2. Dataset 提供了访问对象中某个特定字段的能力, 不用像 RDD 一样每次都要针对整个对象做操作

  3. Dataset 和 RDD 不同, 如果想把 Dataset[T] 转为 RDD[T], 则需要对 Dataset 底层的 InternalRow 做转换, 是一个比价重量级的操作

 

DataFrame 的作用和常见操作

目标

  1. 理解 DataFrame 是什么

  2. 理解 DataFrame 的常见操作

 

DataFrame 是 SparkSQL 中一个表示关系型数据库中  的函数式抽象, 其作用是让 Spark 处理大规模结构化数据的时候更加容易. 一般 DataFrame 可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema 信息. 也就是说 DataFrame 中有 Schema 信息, 可以像操作表一样操作 DataFrame.

eca0d2e1e2b5ce678161438d87707b61

DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

238c241593cd5b0fd06d4d74294680e2

DataFrame 支持 SQL 中常见的操作, 例如: selectfilterjoingroupsortjoin 等

@Test
  def dataframe1(): Unit = {
    // 1. 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("dataframe1")
      .master("local[6]")
      .getOrCreate()

    // 2. 创建 DataFrame
    import spark.implicits._

    val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()

    // 3. 看看 DataFrame 可以玩出什么花样
    // select name from ... t where t.age > 10
    dataFrame.where('age > 10)
      .select('name)
      .show()
  }

 

 DataFrame如何创建:

  @Test
  def dataframe2(): Unit = {
    val spark = SparkSession.builder()
      .appName("dataframe1")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._

    val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))

    // 1. toDF
    val df1 = personList.toDF()
    val df2 = spark.sparkContext.parallelize(personList).toDF()

    // 2. createDataFrame
    val df3 = spark.createDataFrame(personList)

    // 3. read
    val df4 = spark.read.csv("dataset/BeijingPM20100101_20151231_noheader.csv")
    df4.show()
  }

DataFrame介绍:

 @Test
  def dataframe3(): Unit = {
    // 1. 创建 SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("pm analysis")
      .getOrCreate()

    import spark.implicits._

    // 2. 读取数据集
    val sourceDF: DataFrame = spark.read
      .option("header", value = true)//把第一行弄成header
      .csv("dataset/BeijingPM20100101_20151231.csv")

    // 查看 DataFrame 的 Schema 信息, 要意识到 DataFrame 中是有结构信息的, 叫做 Schema
    sourceDF.printSchema()

    // 3. 处理
    //     1. 选择列
    //     2. 过滤掉 NA 的 PM 记录
    //     3. 分组 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != NA group by year, month
    //     4. 聚合
    // 4. 得出结论
//    sourceDF.select('year, 'month, 'PM_Dongsi)
//      .where('PM_Dongsi =!= "NA")
//      .groupBy('year, 'month)
//      .count()
//      .show()

    // 是否能直接使用 SQL 语句进行查询
    // 1. 将 DataFrame 注册为临表
    sourceDF.createOrReplaceTempView("pm")

    // 2. 执行查询
    val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month")

    resultDF.show()

    spark.stop()
  }

 

 Dataset 和 DataFrame 的异同

目标

  1. 理解 Dataset 和 DataFrame 之间的关系

DataFrame 就是 Dataset

根据前面的内容, 可以得到如下信息

  1. Dataset 中可以使用列来访问数据, DataFrame 也可以

  2. Dataset 的执行是优化的, DataFrame 也是

  3. Dataset 具有命令式 API, 同时也可以使用 SQL 来访问, DataFrame 也可以使用这两种不同的方式访问

所以这件事就比较蹊跷了, 两个这么相近的东西为什么会同时出现在 SparkSQL 中呢?

44fb917304a91eab99d131010448331b

确实, 这两个组件是同一个东西, DataFrame 是 Dataset 的一种特殊情况, 也就是说 DataFrame 是 Dataset[Row] 的别名

DataFrame 和 Dataset 所表达的语义不同

第一点: DataFrame 表达的含义是一个支持函数式操作的 , 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象

val spark: SparkSession = new sql.SparkSession.Builder()
  .appName("hello")
  .master("local[6]")
  .getOrCreate()

import spark.implicits._

val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()       

val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS() 
  DataFrame 就是 Dataset[Row]
  Dataset 的范型可以是任意类型

第三点: DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同

DataFrame 在进行强类型操作时候, 例如 map 算子, 其所处理的数据类型永远是 Row

df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 10) )(RowEncoder.apply(df.schema)).show()

但是对于 Dataset 来讲, 其中是什么类型, 它就处理什么类型

ds.map( (item: People) => People(item.name, item.age * 10) ).show()

第三点: DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查

  1. DataFrame 中存放的数据以 Row 表示, 一个 Row 代表一行数据, 这和关系型数据库类似

  2. DataFrame 在进行 map 等操作的时候, DataFrame 不能直接使用 Person 这样的 Scala 对象, 所以无法做到编译时检查

  3. Dataset 表示的具体的某一类对象, 例如 Person, 所以再进行 map 等操作的时候, 传入的是具体的某个 Scala 对象, 如果调用错了方法, 编译时就会被检查出来

val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
ds.map(person => person.hello) 
  这行代码明显报错, 无法通过编译
 
 
 @Test
  def dataframe4(): Unit = {
    val spark = SparkSession.builder()
      .appName("dataframe1")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._

    val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))

    // DataFrame 是弱类型的
    val df: DataFrame = personList.toDF()
    df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema))
      .show()

    // DataFrame 所代表的弱类型操作是编译时不安全
//    df.groupBy("name, school")

    // Dataset 是强类型的
    val ds: Dataset[Person] = personList.toDS()
    ds.map( (person: Person) => Person(person.name, person.age * 2) )
      .show()

    // Dataset 所代表的操作, 是类型安全的, 编译时安全的
//    ds.filter( person => person.school )
  }

  

 Row

 
 @Test
  def row(): Unit = {
    // 1. Row 如何创建, 它是什么
    // row 对象必须配合 Schema 对象才会有 列名
    val p = Person("zhangsan", 15)
    val row = Row("zhangsan", 15)

    // 2. 如何从 Row 中获取数据
    row.getString(0)
    row.getInt(1)

    // 3. Row 也是样例类
    row match {
      case Row(name, age) => println(name, age)
    }

  }

 
 
原文地址:https://www.cnblogs.com/dazhi151/p/14264820.html