spark-sql性能优化之——动态实现多个列应用同一个函数

在对一个dataframe的多个列实现应用同一个函数时,是否能动态的指定?

例如:

对A,B,C三列实现分组统计

1.初始化spark,构建DF

  val spark = SparkSession.builder()
    .appName("name")
    .master("local[2]")
    .getOrCreate()
  val df = spark.read.json("src\main\resources\json.txt")

2.静态实现

  val newDF = df
    .withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time")))
    .withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time")))
    .withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time")))

3. 动态实现

3.1 方法一:select 实现

  import spark.implicits._

  df.select($"*" +: Seq("A", "B", "C").map(c =>
    sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
  ): _*)

  //定义函数
   def withColumns(cols : Seq[String],df : DataFrame,f : String => Column) = {
     df.select($"*" +: cols.map(c => f(c)) : _*)
  }

3.2 方法二:foldLeft实现

  
Seq("A", "B", "C").foldLeft(df)((df, c) => df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time"))) )
//定义函数 def withColumn(cols : Seq[String],df : DataFrame,f : String => Column, name : String => String = identity) = { cols.foldLeft(df)((df,c) => df.withColumn(name(c),f(c))) }
原文地址:https://www.cnblogs.com/yyy-blog/p/10530739.html