Spark:The Definitive Book第七章笔记

分组的类型:

The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.

A “group by” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.

A “window” gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.

A “grouping set,” which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.

A “rollup” makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.

A “cube” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.

每次分组产生RelationalGroupedDataset。

在交互式查询与hot Analysis中,Spark提供了精度与速度的权衡。

count() 最简单的聚合,是一个action,不是transformation。


val dataPath = "data/retail-data/all/*.csv"

val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(dataPath).coalesce(5)

df.cache()

df.createOrReplaceTempView("dfTable")

df.count()


Aggregation Functions

  • 转换版 的count()

we can do one of two things: specify a specific column to count, or all the columns by using count(*) or count(1) to represent that we want to count every row as the literal one


import org.apache.spark.sql.functions.count

df.select(count("StockCode")).show() 

当count(*),Spark会计算包含null的行,当单独在某些列上用count时,不计算null

  • countDistinct

// in Scala

import org.apache.spark.sql.functions.countDistinct

df.select(countDistinct("StockCode")).show() // 4070

  • approx_count_distinct

// in Scala

import org.apache.spark.sql.functions.approx_count_distinct

df.select(approx_count_distinct("StockCode", 0.1)).show() // 3364

You will notice that approx_count_distinct took another parameter with which you can specify the maximum estimation error allowed.这样有很大性能提升。

  • first、last

This will be based on the rows in the DataFrame, not on the values in the DataFrame


// in Scala

import org.apache.spark.sql.functions.{first, last}

df.select(first("StockCode"), last("StockCode")).show()

  • min、max

  • sum

  • sumDistinct

  • avg、mean

  • Variance and Standard Deviation

By default, Spark performs the formula for the sample standard deviation or variance if you use the variance or stddev functions.


// in Scala

import org.apache.spark.sql.functions.{var_pop, stddev_pop}

import org.apache.spark.sql.functions.{var_samp, stddev_samp}

df.select(var_pop("Quantity"), var_samp("Quantity"),

  stddev_pop("Quantity"), stddev_samp("Quantity")).show()

  • skewness and kurtosis

Skewness and kurtosis are both measurements of extreme points in your data. Skewness measures the asymmetry of the values in your data around the mean, whereas kurtosis is a measure of the tail of data.


import org.apache.spark.sql.functions.{skewness, kurtosis}

df.select(skewness("Quantity"), kurtosis("Quantity")).show()

  • Covariance and Correlation

cov() 协方差

corr()相关性


import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}

df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),

    covar_pop("InvoiceNo", "Quantity")).show()

  • Aggregating to Complex Types

collect_set

collect_list

agg


import org.apache.spark.sql.functions.{collect_set, collect_list}

df.agg(collect_set("Country"), collect_list("Country")).show()


Grouping

First we specify the column(s) on which we would like to group, and then we specify.the aggregation(s). The first step returns a RelationalGroupedDataset, and the second step returns a DataFrame.

  • Grouping with Expressions

Rather than passing that function as an expression into a select statement, we specify it as within agg. This makes it possible for you to pass-in arbitrary expressions that just need to have some aggregation specified.


import org.apache.spark.sql.functions.{count, expr}

df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"),expr("count(Quantity)")).show()

  • Grouping with Maps

Sometimes, it can be easier to specify your transformations as a series of Maps for which the key is the column, and the value is the aggregation function (as a string) that you would like to perform. You can reuse multiple column names if you specify them inline, as well.


// in Scala

df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()

Window Functions

Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.


import org.apache.spark.sql.functions.{col, to_date}

val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),

  "MM/d/yyyy H:mm"))

dfWithDate.createOrReplaceTempView("dfWithDate")

The first step to a window function is to create a window specification. Note that the partition by is unrelated to the partitioning scheme concept that we have covered thus far. It’s just a similar concept that describes how we will be breaking up our group. The ordering determines the ordering within a given partition, and, finally, the frame specification (the rowsBetween statement) states which rows will be included in the frame based on its reference to the current input row.In the following example, we look at all previous rows up to the current row:


// in Scala

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.functions.col

val windowSpec = Window

  .partitionBy("CustomerId", "date")

  .orderBy(col("Quantity").desc)

  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

Now we want to use an aggregation function to learn more about each specific customer. An example might be establishing the maximum purchase quantity over all time. To answer this, we use the same aggregation functions that we saw earlier by passing a column name or expression. In addition, we indicate the window specification that defines to which frames of data this function will apply:


import org.apache.spark.sql.functions.max

val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

You will notice that this returns a column (or expressions). We can now use this in a DataFrame select statement.Before doing so, though, we will create the purchase quantity rank. To do that we use the dense_rank function to determine which date had the maximum purchase quantity for every customer. We use dense_rank as opposed to rank to avoid gaps in the ranking sequence when there are tied values (or in our case, duplicate rows):


// in Scala

import org.apache.spark.sql.functions.{dense_rank, rank}

val purchaseDenseRank = dense_rank().over(windowSpec)

val purchaseRank = rank().over(windowSpec)

Now we can perform a select to view the calculated window values:


// in Scala

import org.apache.spark.sql.functions.col



dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")

  .select(

    col("CustomerId"),

    col("date"),

    col("Quantity"),

    purchaseRank.alias("quantityRank"),

    purchaseDenseRank.alias("quantityDenseRank"),

    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show(100)


Grouping Sets

sometimes we want something a bit more complete—an aggregation across multiple groups. We achieve this by using grouping sets. Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregation in their group-by statements.


// in Scala

val dfNoNull = dfWithDate.drop()

dfNoNull.createOrReplaceTempView("dfNoNull")

//传统方法

spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull

GROUP BY customerId, stockCode

ORDER BY CustomerId DESC, stockCode DESC""").show(100)

//Grouping Sets

spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull

GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))

ORDER BY CustomerId DESC, stockCode DESC""").show()

警告:Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.

if you also want to include the total number of items, regardless of customer or stock code? With a conventional group-by statement, this would be impossible. But,it’s simple with grouping sets: we simply specify that we would like to aggregate at that level, as well, in our grouping set. This is, effectively, the union of several different groupings together: ?:什么意思


spark.sql("""

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull

GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())

ORDER BY CustomerId DESC, stockCode DESC

""").show()

//?: 与传统方法的结果一样

The GROUPING SETS operator is only available in SQL. To perform the same in DataFrames, you use the rollup and cube operators—which allow us to get the same results.

## Rollups

When we set our grouping keys of multiple columns, Spark looks at those as well as the actual combinations that are visible in the dataset. A rollup is a multidimensional aggregation that performs a variety of group-by style calculations for us.

```scala

val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))

  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")

  .orderBy("Date")

rolledUpDF.show()

A null in both rollup columns specifies the grand total across both of those columns:


rolledUpDF.where("Country IS NULL").show()

rolledUpDF.where("Date IS NULL").show()

Cube

A cube takes the rollup to a level deeper. Rather than treating elements hierarchically, a cube does the same thing across all dimensions. This means that it won’t just go by date over the entire time period, but also the country.


// in Scala

dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))

  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

Grouping Metadata

Sometimes when using cubes and rollups, you want to be able to query the aggregation levels so that you can easily filter them down accordingly. We can do this by using the grouping_id, which gives us a column specifying the level of aggregation that we have in our result set.


// in Scala

import org.apache.spark.sql.functions.{grouping_id, sum, expr}

dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))

.orderBy(expr("grouping_id()").desc)

.show()

Pivot

Pivots make it possible for you to convert a row into a column.


// in Scala

val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

This DataFrame will now have a column for every combination of country, numeric variable, and a column specifying the date.


pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()


User-Defined Aggregation Functions

User-defined aggregation functions (UDAFs) are a way for users to define their own aggregation functions based on custom formulae or business rules. You can use UDAFs to compute custom calculations over groups of input data (as opposed to single rows). Spark maintains a single AggregationBuffer to store intermediate results for every group of input data.

To create a UDAF, you must inherit from the UserDefinedAggregateFunction base class and implement the following methods:

  • inputSchema represents input arguments as a StructType

  • bufferSchema represents intermediate UDAF results as a StructType

  • dataType represents the return DataType

  • deterministic is a Boolean value that specifies whether this UDAF will return the same result for a given input

  • initialize allows you to initialize values of an aggregation buffer

  • update describes how you should update the internal buffer based on a given row

  • merge describes how two aggregation buffers should be merged

  • evaluate will generate the final result of the aggregation

定义UADF


// in Scala

import org.apache.spark.sql.expressions.MutableAggregationBuffer

import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

class BoolAnd extends UserDefinedAggregateFunction {

  def inputSchema: org.apache.spark.sql.types.StructType =

    StructType(StructField("value", BooleanType) :: Nil)

  def bufferSchema: StructType = StructType(

    StructField("result", BooleanType) :: Nil

  )

  def dataType: DataType = BooleanType

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {

    buffer(0) = true

  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

      buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)

  }

  def evaluate(buffer: Row): Any = {

    buffer(0)

  }

}

注册及使用


// in Scala

val ba = new BoolAnd

spark.udf.register("booland", ba)

import org.apache.spark.sql.functions._

spark.range(1)

  .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")

  .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")

 .select(ba(col("t")), expr("booland(f)"))

  .show()

in Spark 2.3, you will also be able to call Scala or Java UDFs and UDAFs by registering the function just as we showed in the UDF.

原文地址:https://www.cnblogs.com/DataNerd/p/10399778.html