Spark SQL编程之DataFrame篇

             Spark SQL编程之DataFrame篇

                                     作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.DataFrame的创建

  在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:
    (1)通过Spark的数据源进行创建;
    (2)从一个存在的RDD进行转换;
    (3)还可以从Hive Table进行查询返回。

1>.从Spark数据源进行创建

[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json
{"name":"yinzhengjie","passwd":"2020"}
{"name":"Jason","passwd":"666666"}
{"name":"Liming","passwd":"123"}
{"name":"Jenny","passwd":"456"}
{"name":"Danny","passwd":"789"}
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json        #创建测试数据
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 
20/07/13 03:03:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040
Spark context available as 'sc' (master = local[*], app id = local-1594580701441).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 2.4.6
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc                            #Spark-shell内置的sc变量
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40cd02fc

scala> spark                          #spark-shell内置的spark变量
res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@16ef07fa

scala> val df = spark.read.json("/tmp/user.json")     #读取本地的json文件会返回一个DataFrame对象,我们命名为df。
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]

scala> df.show                         #展示读取到的结果
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> 

2>.从RDD进行转换

  博主推荐阅读:
    https://www.cnblogs.com/yinzhengjie2020/p/13185272.html
    https://www.cnblogs.com/yinzhengjie2020/p/13200300.html

3>.从Hive Table进行查询返回

  博主推荐阅读:
    https://www.cnblogs.com/yinzhengjie2020/p/13211015.html

二.SQL风格语法

  临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。

  需要注意的是,使用全局表时需要全路径访问,如:"global_temp.user2"

1>.创建临时视图

scala> val df = spark.read.json("/tmp/user.json")     #读取本地的json文件会返回一个DataFrame对象,我们命名为df。
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]

scala> df.createTempView("user")              #创建临时视图

scala> spark.sql("select * from user").show        #使用Spark SQL来查询数据
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.sql("select * from user where passwd=2020").show      #当然,我们也可以进行过滤操作。
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
+-----------+------+


scala> 

2>.创建全局视图

scala> df.createGlobalTempView("user2")                    #创建一个全局视图

scala> spark.sql("select * from global_temp.user2").show          #默认使用当前的session查询全局视图数据
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.sql("select * from global_temp.user2 user where passwd=2020").show
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
+-----------+------+


scala> spark.newSession().sql("select * from global_temp.user2").show      #使用一个新session来查询全局视图数据
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.newSession().sql("select * from global_temp.user2 user where passwd=2020").show
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
+-----------+------+


scala> 

三.DSL风格语法

1>.查看DataFrame的Schema信息

scala> val df = spark.read.json("/tmp/user.json")        #创建一个DataFrame
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]

scala> df.printSchema                        #查看DataFrame的Schema信息
root
 |-- name: string (nullable = true)
 |-- passwd: string (nullable = true)


scala> 

2>.只查看"name"列数据

scala> df.select("name").show()
+-----------+
|       name|
+-----------+
|yinzhengjie|
|      Jason|
|     Liming|
|      Jenny|
|      Danny|
+-----------+


scala> 

3>.查看”name”列数据以及”passwd+30”数据

scala> df.select($"name", $"passwd" + 10).show()
+-----------+-------------+
|       name|(passwd + 10)|
+-----------+-------------+
|yinzhengjie|       2030.0|
|      Jason|     666676.0|
|     Liming|        133.0|
|      Jenny|        466.0|
|      Danny|        799.0|
+-----------+-------------+


scala> 

4>.查看”passwd”大于”2020”的数据

scala> df.filter($"passwd" > 2020).show()
+-----+------+
| name|passwd|
+-----+------+
|Jason|666666|
+-----+------+


scala> 

5>.按照”passwd”分组,查看数据条数

scala> df.groupBy("passwd").count().show()
+------+-----+
|passwd|count|
+------+-----+
|  2020|    1|
|   789|    1|
|666666|    1|
|   456|    1|
|   123|    1|
+------+-----+


scala> 

四.RDD转换为DataFrame

  温馨提示:  
    如果需要RDD与DF或者DS之间操作,那么都需要引入"import spark.implicits._"(spark不是包名,而是sparkSession对象的名称),下面是具体的案例。


scala
> import spark.implicits._ #导入隐式转换 import spark.implicits._ scala> val listRDD = sc.makeRDD(List((1,"YinZhengjie",18),(2,"Jason Yin",20),(3,"Danny",28)))      #创建一个RDD listRDD: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[84] at makeRDD at <console>:27 scala> val df = listRDD.toDF("Id","Name","Age")      #将RDD转换成DataFrame df: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show                         #查看将RDD转换成DataFrame后的数据 +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala>

五.DataFrame转换为RDD

scala> df          #注意观察此时df是DataFrame
res33: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]

scala> df.show
+---+-----------+---+
| Id|       Name|Age|
+---+-----------+---+
|  1|YinZhengjie| 18|
|  2|  Jason Yin| 20|
|  3|      Danny| 28|
+---+-----------+---+


scala> df.rdd       #直接调用rdd方法即可将DataFrame转换为RDD
res35: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[97] at rdd at <console>:29

scala> res35.collect   #查看DataFrame转换rdd后的数据(注意哈,这个res36是上一条命令执行的返回结果)
res36: Array[org.apache.spark.sql.Row] = Array([1,YinZhengjie,18], [2,Jason Yin,20], [3,Danny,28])

scala> 
原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13193293.html