Spark SQL入门之wordcount案例

Spark SQL 是Spark的核心模块,主要用以对结构化的数据(流数据&批数据)进行处理。Spark SQL依然是建立在RDD之上的ETL工具(数据源到数据仓库的一系列处理过程)。
学习官网:http://spark.apache.org/docs/latest/sql-programming-guide.html

一、Spark SQL数据抽象

  • Spark SQL提供了DataFrame和DataSet的数据抽象。
  • DataFrame就是RDD+Schema,可以认为是一张二维表格,劣势在于编译器不进行表格中的字段的类型检查,在运行期进行检查。
  • DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
  • DataFrame=DataSet[Row]。

二、Spark SQL查询方式

DataFrame查询方式

DataFrame支持两种查询方式:一种是DSL风格,另外一种是SQL风格
(1)、DSL风格:
需要引入import spark.implicit. _ 这个隐式转换,可以将RDD隐式转换成DataFrame
(2)、SQL风格:
a、需要将DataFrame注册成一张表格,如果通过CreateTempView这种方式来创建,那么该表格Session有效,如果通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀global_temp
b、需要通过sparkSession.sql方法来运行你的SQL语句

DataSet查询方式

定义一个DataSet,先定义一个Case类

Dataset和DataFrame的区别

Dataset:分布式数据集【数据类型Any】
DataFrame:分布式数据集【数据类型Row】
RDD --------> Dataset(加强版的RDD) -------> DataFrame(特殊Dataset【Row】)

三、入门案例--WordCount

导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.7</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.7</version>
    </dependency>
</dependencies>

开发应用

package quickstart

import org.apache.spark.sql.SparkSession

object SparkSQLWordCount {
  def main(args: Array[String]): Unit = {
    //1.构建Spark SQL中核心对象SparkSession
    val spark = SparkSession.builder().appName("wordcount").master("local[*]").getOrCreate()
    //2.通过SparkSession对象构建dataset,或者dataframe
    val rdd = spark.sparkContext.makeRDD(List("Hello Hadoop","Hello Kafka"))
    //rdd转换为ds或者df
    //scala隐式转换
    import spark.implicits._
    val dataset  = rdd.toDS()
    //如:强类型操作(操作的是类型)
    //方法一:
    /**
    dataset
      .flatMap(_.split("\s"))
      .map((_,1))
      //无类型操作
      .groupBy("_1")
      //无类型操作
      .sum("_2")
      .show()
    */
    //方法二:
    val flatMapDS = dataset.flatMap(_.split("\s"))
    flatMapDS.createTempView("t_word")
    //sql语句中,先执行后面的group  by,再执行前面的select
    spark
      .sql("select value as word ,count(value) from t_word group by value")
      .show()

    spark.stop()
  }
}
//结果
+------+------------+
|  word|count(value)|
+------+------------+
| Kafka|           1|
| Hello|           2|
|Hadoop|           1|
+------+------------+
原文地址:https://www.cnblogs.com/wanpi/p/14961057.html