sparkSQL--over窗口函数(实战案例)

一、over(窗口函数)

指的是对多行数据进行处理返回普通列和聚合列的过程

详细语法:

窗口函数sql语法:窗口函数名()over (partition by 划分窗口字段 order by 窗口内的排序规则 rows between (start,end))

窗口函数分类:

  • 聚合窗口函数 aggregate 聚合类
  • 排名窗口函数 ranking 排名类
  • 数据分析窗口函数 analytic 分析类

参考链接:https://www.cnblogs.com/abc8023/p/10910741.html

Function Type SQL DataFrame API Description
Ranking rank rank rank值可能是不连续的
Ranking dense_rank denseRank rank值一定是连续的
Ranking percent_rank percentRank 相同的分组中 (rank -1) / ( count(score) - 1 )
Ranking ntile ntile 将同一组数据循环的往n个桶中放,返回对应的桶的index,index从1开始
Ranking row_number rowNumber 很单纯的行号,类似excel的行号
Analytic cume_dist cumeDist
Analytic first_value firstValue 相同的分组中最小值
Analytic last_value lastValue 相同的分组中最大值
Analytic lag lag 取前n行数据
Analytic lead lead 取后n行数据
Aggregate min min 最小值
Aggregate max max 最大值
Aggregate sum sum 求和
Aggregate avg avg 求平均

二、具体用法如下

count(...) over(partition by ... order by ...) --求分组后的总数。
sum(...) over(partition by ... order by ...) --求分组后的和。
max(...) over(partition by ... order by ...) --求分组后的最大值。
min(...) over(partition by ... order by ...) --求分组后的最小值。
avg(...) over(partition by ... order by ...) --求分组后的平均值。
rank() over(partition by ... order by ...) --rank值可能是不连续的。
dense_rank() over(partition by ... order by ...) --rank值是连续的。
first_value(...) over(partition by ... order by ...) --求分组内的第一个值。
last_value(...) over(partition by ... order by ...) --求分组内的最后一个值。
lag() over(partition by ... order by ...) --取出前n行数据。
lead() over(partition by ... order by ...) --取出后n行数据。
ratio_to_report() over(partition by ... order by ...) --Ratio_to_report() 括号中就是分子,over() 括号中就是分母。
percent_rank() over(partition by ... order by ...)

三、应用案例

问题

某app访问页面的日志详细记录字段如下:day, user_id, page_id, time

求某天每个用户访问页面次数前10的页面。

("2018-01-01",1,"www.baidu.com","10:01"),
("2018-01-01",2,"www.baidu.com","10:01"),
("2018-01-01",1,"www.sina.com","10:01"),
("2018-01-01",3,"www.baidu.com","10:01"),
("2018-01-01",3,"www.baidu.com","10:01"),
("2018-01-01",1,"www.sina.com","10:01")

思路

  1. 每个用户访问不同页面的次数

select user_id,page_id,count(page_id) from t_log group by user_id, page_id

+-------+-------------+-----+---+---|
|user_id| page_id|count|
+-------+-------------+-----+----+--|
| 2|www.baidu.com| 1|
| 3|www.baidu.com| 2|
| 1|www.baidu.com| 1|
| 1| www.sina.com| 2|
+-------+-------------+-----+----+--|

  1. 对每个用户点击页面次数降序排列,并且使用窗口函数中的排名函数,对点击页面进行排名

w1:

| 1| www.sina.com| 2| 1

| 1|www.baidu.com| 1| 2

w2:

| 2|www.baidu.com| 1| 1

w3:

| 3|www.baidu.com| 2| 1

+-------+-------------+-----+----+
|user_id| page_id|count|rank|
+-------+-------------+-----+----+
| 1| www.sina.com| 2| 1|
| 1|www.baidu.com| 1| 2|
| 3|www.baidu.com| 2| 1|
| 2|www.baidu.com| 1| 1|
+-------+-------------+-----+----+

  1. 获得每个用户访问次数前10的页面

where rank <= 10

+-------+-------------+-----+----+
|user_id| page_id|count|rank|
+-------+-------------+-----+----+
| 1| www.sina.com| 2| 1|
| 1|www.baidu.com| 1| 2|
| 3|www.baidu.com| 2| 1|
| 2|www.baidu.com| 1| 1|
+-------+-------------+-----+----+

代码

package method

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window

object SQLDemo3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("sql operation").master("local[*]").getOrCreate()
    val rdd = spark.sparkContext.makeRDD(
      List(
        ("2018-01-01",1,"www.baidu.com","10:01"),
        ("2018-01-01",2,"www.baidu.com","10:01"),
        ("2018-01-01",1,"www.sina.com","10:01"),
        ("2018-01-01",3,"www.baidu.com","10:01"),
        ("2018-01-01",3,"www.baidu.com","10:01"),
        ("2018-01-01",1,"www.sina.com","10:01")
      )
    )
    import spark.implicits._
    val df = rdd.toDF("day","user_id","page_id","time")
  df.createTempView("t_log")
    //注意:""" 包裹内容 “”“自动进行字符串的拼接
    spark
      .sql(
        """
          |select *
          |from
          | (select user_id,page_id, num,
          |   rank() over(partition by user_id order by num desc) as rank
          |   from
          |     (select
          |       user_id,
          |       page_id,
          |       count(page_id) as num
          |       from t_log
          |       group by user_id,page_id))
          | where rank <= 10
          |
          |""".stripMargin
      )
      .show()
  spark.stop()
  }
}
//结果
+-------+-------------+---+----+
|user_id|      page_id|num|rank|
+-------+-------------+---+----+
|      1| www.sina.com|  2|   1|
|      1|www.baidu.com|  1|   2|
|      3|www.baidu.com|  2|   1|
|      2|www.baidu.com|  1|   1|
+-------+-------------+---+----+
原文地址:https://www.cnblogs.com/wanpi/p/14969000.html