函数:
聚合每个类别的总价;
val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ val source = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5000), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ).toDF("product", "category", "revenue") // 需求一: 聚合每个类别的总价 // 1. 分组, 2. 对每一组的数据进行聚合 import org.apache.spark.sql.functions._ source.groupBy('category) .agg(sum('revenue)) .show()
把名称变成小写:
val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ val source = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5000), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ).toDF("product", "category", "revenue") // 需求一: 聚合每个类别的总价 // 1. 分组, 2. 对每一组的数据进行聚合 import org.apache.spark.sql.functions._ // source.groupBy('category) // .agg(sum('revenue)) // .show() // 需求二: 把名称变为小写 source.select(lower('product)) .show()
把价格变为字符串形式:
val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ val source = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5000), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ).toDF("product", "category", "revenue") // 需求一: 聚合每个类别的总价 // 1. 分组, 2. 对每一组的数据进行聚合 import org.apache.spark.sql.functions._ // source.groupBy('category) // .agg(sum('revenue)) // .show() // 需求二: 把名称变为小写 // source.select(lower('product)) // .show() // 需求三: 把价格变为字符串形式 // 6000 6K val toStrUDF = udf(toStr _)//帮助我们作用在每一条元素
source.select('product, 'category, toStrUDF('revenue)) .show() }
定义窗口进行操作:(不同产品的前二)
val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ val source = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5000), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ).toDF("product", "category", "revenue") // // 1. 定义窗口 val window = Window.partitionBy('category)//分组 .orderBy('revenue.desc)
//定义窗口之后就能用source了
// 2. 数据处理
import org.apache.spark.sql.functions._
source.select('product, 'category, dense_rank() over window as "rank")
.where('rank <= 2)
.show()
方法二:
def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ val source = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5000), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ).toDF("product", "category", "revenue") // // 1. 定义窗口 // val window = Window.partitionBy('category)//分组 // .orderBy('revenue.desc) // //定义窗口之后就能用source了 // // 2. 数据处理 // import org.apache.spark.sql.functions._ // source.select('product, 'category, dense_rank() over window as "rank") // .where('rank <= 2) // .show() source.createOrReplaceTempView("productRevenue") spark.sql("select product, category, revenue from " + "(select *, dense_rank() over (partition by category order by revenue desc) as rank from productRevenue) " + "where rank <= 2") .show() }
窗口函数的逻辑
窗口定义部分
函数部分
每个商品和他的最高价之间的差值:
package cn.itcast.spark.sql import org.apache.spark.sql import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window object WindowFun1 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("window") .master("local[6]") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val data = Seq( ("Thin", "Cell phone", 6000), ("Normal", "Tablet", 1500), ("Mini", "Tablet", 5500), ("Ultra thin", "Cell phone", 5500), ("Very thin", "Cell phone", 6000), ("Big", "Tablet", 2500), ("Bendable", "Cell phone", 3000), ("Foldable", "Cell phone", 3000), ("Pro", "Tablet", 4500), ("Pro2", "Tablet", 6500) ) val source = data.toDF("product", "category", "revenue") // 1. 定义窗口, 按照分类进行倒叙排列 val window = Window.partitionBy('category) .orderBy('revenue.desc) // 2. 找到最贵的的商品价格 val maxPrice: sql.Column = max('revenue) over window // 3. 得到结果 source.select('product, 'category, 'revenue, (maxPrice - 'revenue) as "revenue_difference") .show() } }