Spark Dataset DataFrame 操作

相关博文参考

sparkSQL实战详解

Spark-SQL之DataFrame操作大全

sparksql中dataframe的用法

import groovy.sql.DataSet
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 
object sparkSession {
  case class Person(name:String,age:BigInt)
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[4]")
      .appName("Spark SQL Example")
      .getOrCreate()
    //读取json文件,返回一个dataframe;
    val df = spark.read.json("D:\people.json")
    import spark.implicits._
    //查看DataFrame中的内容,默认显示20行;
    df.show()
    //----------------------dataframe和dataSet相互转换-----------------------------------
    // val df:DataFrame = DataSet[Row]  //dataframe和dataSet的关系;
    val ds = df.as[Person]
    val ds_1 = df.toJSON
    val df_new = ds.toDF()
    //----------------------------------------------------------------------------------
    //打印DataFrame的Schema信息;可以理解为模型;
    df.printSchema()
    //查看DataFrame部分列中的内容,age
    df.select("age").show()
    //查看DataFrame部分列中的内容,age+1
    df.select($"name",$"age"+1).show
    //另外一种写法;查看age和name;
    df.select(df("age"),df("name")).show()
    //过滤age大于20的,显示10行;
    df.filter($"age" > 20).show(10)
    //统计年龄大于20的人数;
    df.filter(df("age") > 20).count()
    //按年龄进行分组,统计人数;
    df.groupBy("name").count().show()
    //collect方法会将df中的所有数据都获取到,并返回一个Row类型的Array对象
    df.collect().foreach(println)
    //和上面的类似,返回一个Row类型的List集合;
    val list = df.collectAsList()
    //获取指定字段的统计信息;
    df.describe("name","age").show()
    //跟sql语句的where条件一样;
    df.where("age = 18 and name = 'jason'").show()
    //根据某个字段筛选;
    df.filter("name = 'jason'").show()
    //获取指定的字段可以对字段做一些特殊的操作,可以写别名;
    df.selectExpr("age","name as n").show()
    //获取指定的字段,注意这个一次只能获取一个字段;
    val age = df.col("age")
    println(age)
    //和上面的一样,也是获取某个字段;
    val name = df.apply("name")
    println(name)
    //删除指定的字段;
    df.drop("age").show()
    //跟sql的limit一样,显示前几行;
    df.limit(5).show()
    //根据某个字段排序;
    df.orderBy(df("age").desc).show()
    //按照partition进行排序;
    df.sortWithinPartitions("age").show()
    //跟sql里面的一样,根据某个字段分组;
    val groupby_name = df.groupBy("name").count().show()
    //结合groupby做一些聚合操作;
    df.groupBy("age").max().show()
    //去重;
    df.distinct().show()
    //指定字段去重;
    df.dropDuplicates("name").show()
    //聚合操作;
    df.agg("age"-> "max","age"-> "min").show()
    //对结果叠加;
    df.union(df).show()
    //跟sql里面的join一样支持,left join,right join,inner join,这个操作非常的丰富,这里就不在一一列举了;
    df.join(df,Seq("age"),"left").show()
    //获取两个df中相同的数据,相当于inner join;
    df.intersect(df).show()
    //对指定字段重命名;
    df.withColumnRenamed("name","name1").show()
    //增加新的字段,默认显示为null
    df.withColumn("name1",df("age")).show()
    //前几天有人问我增加新的字段显示为0,怎么写?选择一列数值类型的乘以0就可以了;
    df.withColumn("name2",df("age")*0).show()
  }
}

一、Spark2 Dataset DataFrame空值null,NaN判断和处理

1.1 显示前10条数据

val data1 = data.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
data1.limit(10).show
+-------+------+---+------------+--------+-------------+---------+----------+------+
|affairs|gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+---+------------+--------+-------------+---------+----------+------+
|      0|  male| 37|          10|      no|            3|       18|         7|     4|
|      0|  null| 27|        null|      no|            4|       14|         6|  null|
|      0|  null| 32|        null|     yes|            1|       12|         1|  null|
|      0|  null| 57|        null|     yes|            5|       18|         6|  null|
|      0|  null| 22|        null|      no|            2|       17|         6|  null|
|      0|  null| 32|        null|      no|            2|       17|         5|  null|
|      0|female| 22|        null|      no|            2|       12|         1|  null|
|      0|  male| 57|          15|     yes|            2|       14|         4|     4|
|      0|female| 32|          15|     yes|            4|       16|         1|     2|
|      0|  male| 22|         1.5|      no|            4|       14|         4|     5|
+-------+------+---+------------+--------+-------------+---------+----------+------+

1.2 删除所有列的空值和NaN

val resNull=data1.na.drop()
resNull.limit(10).show()
+-------+------+---+------------+--------+-------------+---------+----------+------+
|affairs|gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+---+------------+--------+-------------+---------+----------+------+
|      0|  male| 37|          10|      no|            3|       18|         7|     4|
|      0|  male| 57|          15|     yes|            2|       14|         4|     4|
|      0|female| 32|          15|     yes|            4|       16|         1|     2|
|      0|  male| 22|         1.5|      no|            4|       14|         4|     5|
|      0|  male| 37|          15|     yes|            2|       20|         7|     2|
|      0|  male| 27|           4|     yes|            4|       18|         6|     4|
|      0|  male| 47|          15|     yes|            5|       17|         6|     4|
|      0|female| 22|         1.5|      no|            2|       17|         5|     4|
|      0|female| 27|           4|      no|            4|       14|         5|     4|
|      0|female| 37|          15|     yes|            1|       17|         5|     5|
+-------+------+---+------------+--------+-------------+---------+----------+------+

1.3 删除某列的空值和NaN

 //删除某列的空值和NaN
val res=data1.na.drop(Array("gender","yearsmarried"))

1.4 删除某列的非空且非NaN的低于10的

// 删除某列的非空且非NaN的低于10的
data1.na.drop(10,Array("gender","yearsmarried"))

1.5 填充所有空值的列

 //填充所有空值的列
val res123=data1.na.fill("wangxiao123")
res123.limit(10).show()
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
|affairs|     gender|age|yearsmarried|children|religiousness|education|occupation|     rating|
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
|      0|       male| 37|          10|      no|            3|       18|         7|          4|
|      0|wangxiao123| 27| wangxiao123|      no|            4|       14|         6|wangxiao123|
|      0|wangxiao123| 32| wangxiao123|     yes|            1|       12|         1|wangxiao123|
|      0|wangxiao123| 57| wangxiao123|     yes|            5|       18|         6|wangxiao123|
|      0|wangxiao123| 22| wangxiao123|      no|            2|       17|         6|wangxiao123|
|      0|wangxiao123| 32| wangxiao123|      no|            2|       17|         5|wangxiao123|
|      0|     female| 22| wangxiao123|      no|            2|       12|         1|wangxiao123|
|      0|       male| 57|          15|     yes|            2|       14|         4|          4|
|      0|     female| 32|          15|     yes|            4|       16|         1|          2|
|      0|       male| 22|         1.5|      no|            4|       14|         4|          5|
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+

1.6 对指定的列空值填充

 //对指定的列空值填充
 val res2=data1.na.fill(value="wangxiao111",cols=Array("gender","yearsmarried") )
 res2.limit(10).show()
 +-------+-----------+---+------------+--------+-------------+---------+----------+------+
|affairs|     gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+-----------+---+------------+--------+-------------+---------+----------+------+
|      0|       male| 37|          10|      no|            3|       18|         7|     4|
|      0|wangxiao111| 27| wangxiao111|      no|            4|       14|         6|  null|
|      0|wangxiao111| 32| wangxiao111|     yes|            1|       12|         1|  null|
|      0|wangxiao111| 57| wangxiao111|     yes|            5|       18|         6|  null|
|      0|wangxiao111| 22| wangxiao111|      no|            2|       17|         6|  null|
|      0|wangxiao111| 32| wangxiao111|      no|            2|       17|         5|  null|
|      0|     female| 22| wangxiao111|      no|            2|       12|         1|  null|
|      0|       male| 57|          15|     yes|            2|       14|         4|     4|
|      0|     female| 32|          15|     yes|            4|       16|         1|     2|
|      0|       male| 22|         1.5|      no|            4|       14|         4|     5|
+-------+-----------+---+------------+--------+-------------+---------+----------+------+

1.7 查询空值列

 //查询空值列
data1.filter("gender is null").select("gender").limit(10).show
+------+
|gender|
+------+
|  null|
|  null|
|  null|
|  null|
|  null|
+------+
 data1.filter( data1("gender").isNull ).select("gender").limit(10).show
 +------+
|gender|
+------+
|  null|
|  null|
|  null|
|  null|
|  null|
+------+

1.8 查询非空列

data1.filter("gender is not null").select("gender").limit(10).show
+------+
|gender|
+------+
|  male|
|female|
|  male|
|female|
|  male|
|  male|
|  male|
|  male|
|female|
|female|
+------+
data1.filter("gender<>''").select("gender").limit(10).show
+------+
|gender|
+------+
|  male|
|female|
|  male|
|female|
|  male|
|  male|
|  male|
|  male|
|female|
|female|
+------+

二、Dataset行列操作和执行计划

Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]
Dataset是“懒惰”的,只在执行行动操作时触发计算。本质上,数据集表示一个逻辑计划,该计划描述了产生数据所需的计算。当执行行动操作时,Spark的查询优化程序优化逻辑计划,并生成一个高效的并行和分布式物理计划。

2.1 常用包

import scala.math._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrameReader
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.DataFrameStatFunctions

2.2 创建SparkSession,并导入示例数据

val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
 
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
 
val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
      (0, "male", 37, 10, "no", 3, 18, 7, 4),
      (0, "female", 27, 4, "no", 4, 14, 6, 4),
      (0, "female", 32, 15, "yes", 1, 12, 1, 4),
      (0, "male", 57, 15, "yes", 5, 18, 6, 5),
      (0, "male", 22, 0.75, "no", 2, 17, 6, 3),
      (0, "female", 32, 1.5, "no", 2, 17, 5, 5),
      (0, "female", 22, 0.75, "no", 2, 12, 1, 3),
      (0, "male", 57, 15, "yes", 2, 14, 4, 4),
      (0, "female", 32, 15, "yes", 4, 16, 1, 2),
      (0, "male", 22, 1.5, "no", 4, 14, 4, 5))
 
val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
 
data.printSchema()
root
 |-- affairs: double (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = false)
 |-- yearsmarried: double (nullable = false)
 |-- children: string (nullable = true)
 |-- religiousness: double (nullable = false)
 |-- education: double (nullable = false)
 |-- occupation: double (nullable = false)
 |-- rating: double (nullable = false)

2.3 操作指定的列和行

// 在Spark-shell中展示,前n条记录
data.show(7)
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
|    0.0|  male|37.0|        10.0|      no|          3.0|     18.0|       7.0|   4.0|
|    0.0|female|27.0|         4.0|      no|          4.0|     14.0|       6.0|   4.0|
|    0.0|female|32.0|        15.0|     yes|          1.0|     12.0|       1.0|   4.0|
|    0.0|  male|57.0|        15.0|     yes|          5.0|     18.0|       6.0|   5.0|
|    0.0|  male|22.0|        0.75|      no|          2.0|     17.0|       6.0|   3.0|
|    0.0|female|32.0|         1.5|      no|          2.0|     17.0|       5.0|   5.0|
|    0.0|female|22.0|        0.75|      no|          2.0|     12.0|       1.0|   3.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
only showing top 7 rows
 
 
// 取前n条记录
val data3=data.limit(5)
 
 
// 过滤
data.filter("age>50 and gender=='male' ").show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
|    0.0|  male|57.0|        15.0|     yes|          5.0|     18.0|       6.0|   5.0|
|    0.0|  male|57.0|        15.0|     yes|          2.0|     14.0|       4.0|   4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
 
// 数据框的所有列
 
val columnArray=data.columns
columnArray: Array[String] = Array(affairs, gender, age, yearsmarried, children, religiousness, education, occupation, rating)
 
// 查询某些列的数据
data.select("gender", "age", "yearsmarried", "children").show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
|  male|37.0|        10.0|      no|
|female|27.0|         4.0|      no|
|female|32.0|        15.0|     yes|
+------+----+------------+--------+
only showing top 3 rows
 
 
val colArray=Array("gender", "age", "yearsmarried", "children")
colArray: Array[String] = Array(gender, age, yearsmarried, children)
 
data.selectExpr(colArray:_*).show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
|  male|37.0|        10.0|      no|
|female|27.0|         4.0|      no|
|female|32.0|        15.0|     yes|
+------+----+------------+--------+
only showing top 3 rows
 
 
// 操作指定的列,并排序
// data.selectExpr("gender", "age+1","cast(age as bigint)").orderBy($"gender".desc, $"age".asc).show
data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc).show
+------+----+----+
|gender|age1|age2|
+------+----+----+
|  male|23.0|  22|
|  male|23.0|  22|
|  male|38.0|  37|
|  male|58.0|  57|
|  male|58.0|  57|
|female|23.0|  22|
|female|28.0|  27|
|female|33.0|  32|
|female|33.0|  32|
|female|33.0|  32|
+------+----+----+

2.4 查看SparkSQL逻辑和物理执行计划

val data4=data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc)
data4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gender: string, age1: double ... 1 more field]
 
// 查看物理执行计划
data4.explain()
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
   +- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
      +- LocalTableScan [gender#20, age1#135, age2#136L, age#21]
 
// 查看逻辑和物理执行计划   
data4.explain(extended=true)
== Parsed Logical Plan ==
'Sort ['gender DESC, 'age ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L]
   +- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupation#2
6, _9#17 AS rating#27]      +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17]
 
== Analyzed Logical Plan ==
gender: string, age1: double, age2: bigint
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
   +- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L, age#21]
      +- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupatio
n#26, _9#17 AS rating#27]         +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17]
 
== Optimized Logical Plan ==
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
   +- LocalRelation [gender#20, age1#135, age2#136L, age#21]
 
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
   +- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
      +- LocalTableScan [gender#20, age1#135, age2#136L, age#21]

三、Dataset去重、差集、交集

import org.apache.spark.sql.functions._

3.1 对整个DataFrame的数据去重

// 对整个DataFrame的数据去重
data.distinct()
data.dropDuplicates()

3.2 对指定列的去重

// 对指定列的去重
val colArray=Array("affairs", "gender")
data.dropDuplicates(colArray)
//data.dropDuplicates("affairs", "gender")

3.3 差集

 val df=data.filter("gender=='male' ")
// data与df的差集
data.except(df).show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
|    0.0|female|32.0|        15.0|     yes|          1.0|     12.0|       1.0|   4.0|
|    0.0|female|32.0|         1.5|      no|          2.0|     17.0|       5.0|   5.0|
|    0.0|female|32.0|        15.0|     yes|          4.0|     16.0|       1.0|   2.0|
|    0.0|female|22.0|        0.75|      no|          2.0|     12.0|       1.0|   3.0|
|    0.0|female|27.0|         4.0|      no|          4.0|     14.0|       6.0|   4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+

3.4 交集

 
// data与df的交集
data.intersect(df)

四、Dataset聚合操作

data.groupBy("gender").agg(count($"age"),max($"age").as("maxAge"), avg($"age").as("avgAge")).show
+------+----------+------+------+                                              
|gender|count(age)|maxAge|avgAge|
+------+----------+------+------+
|female|         5|  32.0|  29.0|
|  male|         5|  57.0|  39.0|
+------+----------+------+------+
 
 
data.groupBy("gender").agg("age"->"count","age" -> "max", "age" -> "avg").show
+------+----------+--------+--------+                                          
|gender|count(age)|max(age)|avg(age)|
+------+----------+--------+--------+
|female|         5|    32.0|    29.0|
|  male|         5|    57.0|    39.0|
+------+----------+--------+--------+

4.1 DataFrame分组统计信息,groupBy,agg算子

亲测
groupBy聚合算子
people.unionAll(newPeople).groupBy(col("name")).count.show
或者
people.unionAll(newPeople).groupBy($"name").count.show
或者
people.unionAll(newPeople).groupBy("name").count.show

调用DataFrame的toDF方法,重新命名全部列名,增加列名的可读性
people.groupBy("depId").agg(Map("age" -> "max", "gender" -> "count"))
.toDF("depId","maxAge","countGender").show

使用函数式编程方式对前后两个合并进行分组统计并显示结果
people.unionAll(newPeople).groupBy($"name").count.filter($"count" < 2).show

使用groupBy方法将合并后的DataFrame按照"name" 列进行分组,
得到GroupData类的实例,实例会自动带上分组的列,以及"count" 列。

GroupData类在spark2.0.x 版本改为RelationalGroupedDataset

五、Dataset之视图与SQL

// 创建视图
data.createOrReplaceTempView("Affairs")
 
val df1 = spark.sql("SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25")
df1: org.apache.spark.sql.DataFrame = [affairs: double, gender: string ... 7 more fields]
 
// 子查询
val df2 = spark.sql("select gender, age,rating from  ( SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25 ) t ")
df2: org.apache.spark.sql.DataFrame = [gender: string, age: double ... 1 more field]
 
df2.show
+------+----+------+
|gender| age|rating|
+------+----+------+
|  male|22.0|   3.0|
|female|22.0|   3.0|
|  male|22.0|   5.0|
+------+----+------+

六、Dataset之collect_set与collect_list

collect_set去除重复元素;collect_list不去除重复元素
select gender,
concat_ws(’,’, collect_set(children)),
concat_ws(’,’, collect_list(children))
from Affairs
group by gender

// 创建视图
data.createOrReplaceTempView("Affairs")
 
val df3= spark.sql("select gender,concat_ws(',',collect_set(children)),concat_ws(',',collect_list(children)) from Affairs group by gender")
df3: org.apache.spark.sql.DataFrame = [gender: string, concat_ws(,, collect_set(children)): string ... 1 more field]
 
df3.show  // collect_set去除重复元素;collect_list不去除重复元素
+------+-----------------------------------+------------------------------------+
|gender|concat_ws(,, collect_set(children))|concat_ws(,, collect_list(children))|
+------+-----------------------------------+------------------------------------+
|female|                             no,yes|                    no,yes,no,no,yes|
|  male|                             no,yes|                    no,yes,no,yes,no|
+------+-----------------------------------+------------------------------------+

七、Dataset多维度统计cube与rollup

val df6 = spark.sql("select gender,children,max(age),avg(age),count(age) from Affairs group by Cube(gender,children) order by 1,2")
df6.show
+------+--------+--------+--------+----------+                                 
|gender|children|max(age)|avg(age)|count(age)|
+------+--------+--------+--------+----------+
|  null|    null|    57.0|    34.0|        10|
|  null|      no|    37.0|    27.0|         6|
|  null|     yes|    57.0|    44.5|         4|
|female|    null|    32.0|    29.0|         5|
|female|      no|    32.0|    27.0|         3|
|female|     yes|    32.0|    32.0|         2|
|  male|    null|    57.0|    39.0|         5|
|  male|      no|    37.0|    27.0|         3|
|  male|     yes|    57.0|    57.0|         2|
+------+--------+--------+--------+----------+
 
 
val df7 = spark.sql("select gender,children,max(age),avg(age),count(age) from Affairs group by rollup(gender,children) order by 1,2")
 
df7.show
+------+--------+--------+--------+----------+                                 
|gender|children|max(age)|avg(age)|count(age)|
+------+--------+--------+--------+----------+
|  null|    null|    57.0|    34.0|        10|
|female|    null|    32.0|    29.0|         5|
|female|      no|    32.0|    27.0|         3|
|female|     yes|    32.0|    32.0|         2|
|  male|    null|    57.0|    39.0|         5|
|  male|      no|    37.0|    27.0|         3|
|  male|     yes|    57.0|    57.0|         2|
+------+--------+--------+--------+----------+

八、Dataset分析函数–排名函数row_number,rank,dense_rank,percent_rank

select gender,
age,
row_number() over(partition by gender order by age) as rowNumber,
rank() over(partition by gender order by age) as ranks,
dense_rank() over(partition by gender order by age) as denseRank,
percent_rank() over(partition by gender order by age) as percentRank
from Affairs

val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
 
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
     
val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
      (0, "male", 37, 10, "no", 3, 18, 7, 4),
      (0, "female", 27, 4, "no", 4, 14, 6, 4),
      (0, "female", 32, 15, "yes", 1, 12, 1, 4),
      (0, "male", 57, 15, "yes", 5, 18, 6, 5),
      (0, "male", 22, 0.75, "no", 2, 17, 6, 3),
      (0, "female", 32, 1.5, "no", 2, 17, 5, 5),
      (0, "female", 22, 0.75, "no", 2, 12, 1, 3),
      (0, "male", 57, 15, "yes", 2, 14, 4, 4),
      (0, "female", 32, 15, "yes", 4, 16, 1, 2),
      (0, "male", 22, 1.5, "no", 4, 14, 4, 5),
      (0, "male", 37, 15, "yes", 2, 20, 7, 2),
      (0, "male", 27, 4, "yes", 4, 18, 6, 4),
      (0, "male", 47, 15, "yes", 5, 17, 6, 4),
      (0, "female", 22, 1.5, "no", 2, 17, 5, 4),
      (0, "female", 27, 4, "no", 4, 14, 5, 4),
      (0, "female", 37, 15, "yes", 1, 17, 5, 5),
      (0, "female", 37, 15, "yes", 2, 18, 4, 3),
      (0, "female", 22, 0.75, "no", 3, 16, 5, 4),
      (0, "female", 22, 1.5, "no", 2, 16, 5, 5),
      (0, "female", 27, 10, "yes", 2, 14, 1, 5),
      (0, "female", 22, 1.5, "no", 2, 16, 5, 5),
      (0, "female", 22, 1.5, "no", 2, 16, 5, 5),
      (0, "female", 27, 10, "yes", 4, 16, 5, 4),
      (0, "female", 32, 10, "yes", 3, 14, 1, 5),
      (0, "male", 37, 4, "yes", 2, 20, 6, 4))
 
val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
 
data.printSchema()
 
// 创建视图
data.createOrReplaceTempView("Affairs")
 
val s1="row_number() over(partition by gender order by age) as rowNumber,"
val s2="rank() over(partition by gender order by age) as ranks,"
val s3="dense_rank() over(partition by gender order by age) as denseRank,"
val s4="percent_rank() over(partition by gender order by age) as percentRank"
val df8=spark.sql("select gender,age,"+s1+s2+s3+s4+" from Affairs")
 
df8.show(50)
+------+----+---------+-----+---------+------------------+                     
|gender| age|rowNumber|ranks|denseRank|       percentRank|
+------+----+---------+-----+---------+------------------+
|female|22.0|        1|    1|        1|               0.0|
|female|22.0|        2|    1|        1|               0.0|
|female|22.0|        3|    1|        1|               0.0|
|female|22.0|        4|    1|        1|               0.0|
|female|22.0|        5|    1|        1|               0.0|
|female|22.0|        6|    1|        1|               0.0|
|female|27.0|        7|    7|        2|               0.4|
|female|27.0|        8|    7|        2|               0.4|
|female|27.0|        9|    7|        2|               0.4|
|female|27.0|       10|    7|        2|               0.4|
|female|32.0|       11|   11|        3|0.6666666666666666|
|female|32.0|       12|   11|        3|0.6666666666666666|
|female|32.0|       13|   11|        3|0.6666666666666666|
|female|32.0|       14|   11|        3|0.6666666666666666|
|female|37.0|       15|   15|        4|0.9333333333333333|
|female|37.0|       16|   15|        4|0.9333333333333333|
|  male|22.0|        1|    1|        1|               0.0|
|  male|22.0|        2|    1|        1|               0.0|
|  male|27.0|        3|    3|        2|              0.25|
|  male|37.0|        4|    4|        3|             0.375|
|  male|37.0|        5|    4|        3|             0.375|
|  male|37.0|        6|    4|        3|             0.375|
|  male|47.0|        7|    7|        4|              0.75|
|  male|57.0|        8|    8|        5|             0.875|
|  male|57.0|        9|    8|        5|             0.875|
+------+----+---------+-----+---------+------------------+

九、DataSet 创建新行之flatMap

val dfList = List(("Hadoop", "Java,SQL,Hive,HBase,MySQL"), ("Spark", "Scala,SQL,DataSet,MLlib,GraphX"))
dfList: List[(String, String)] = List((Hadoop,Java,SQL,Hive,HBase,MySQL), (Spark,Scala,SQL,DataSet,MLlib,GraphX))
 
case class Book(title: String, words: String)
 
val df=dfList.map{p=>Book(p._1,p._2)}.toDS()
df: org.apache.spark.sql.Dataset[Book] = [title: string, words: string]
 
df.show
+------+--------------------+
| title|               words|
+------+--------------------+
|Hadoop|Java,SQL,Hive,HBa...|
| Spark|Scala,SQL,DataSet...|
+------+--------------------+
 
df.flatMap(_.words.split(",")).show
+-------+
|  value|
+-------+
|   Java|
|    SQL|
|   Hive|
|  HBase|
|  MySQL|
|  Scala|
|    SQL|
|DataSet|
|  MLlib|
| GraphX|
+-------+
原文地址:https://www.cnblogs.com/aixing/p/13327371.html