Spark权威指南读书笔记(四) 聚合与连接

Spark权威指南读书笔记(四) 聚合与连接

image-20200818094742912

一、聚合函数

大多数聚合函数位于org.apache.spark.sql.functions。当给定多个输入值时,聚合函数给每个分组计算出一个结果。

count

使用count对指定列进行计数或者使用count(*)或count(1)对所有列进行计数。需要注意的是,当执行count(*)时,Spark会对null值进行计数;而当对某指定列计数时,则不会对null值进行计数。

image-20200818100546379

countDistinct

获取某列的唯一值数量计数

image-20200818100905695

approx_count_distinct

在处理大数据计算时,获得 一个精确的结果开销会很大,但是计算一个近似结果相对容易很多。此时可使用approx_count_distinct。approx_count_distinct具有两个参数,第一个为列,第二个参数指定可容忍的最大误差。

image-20200818102056966

**first 和 last **

基于DataFrame行的顺序而非中值顺序,获取DataFrame的第一个值和最后一个值。

image-20200818102947837

min 和 max

从DataFrame中获取最小值和最大值

image-20200818103628099

sum

累加一行中的所有值

image-20200818103917699

sumDistinct

对一组去重值进行求和

image-20200818104840040

avg

使用avg或mean方法获取平均值

image-20200818105638905

方差与标准差

image-20200818110156445

偏度系数(skewness)与峰度系数(kurtosis)

image-20200818111419564

协方差与相关性

image-20200818113827511

聚合输出复杂类型

image-20200818160942448

二、分组

分组 groupBy

分两个阶段进行分组,首先指定要对其分组的一列或多列,然后指定一个或多个聚合操作。第一步返回一个RelationalGroupedDataset,第二步返回 DataFrame。

image-20200818161147721

使用表达式分组

image-20200818161436822

使用Map进行分组

转换操作指定一系列Map更方便,其中键为列,值为要执行的字符串形式的聚合函数。若以inline方式指定可重用多个列名。

image-20200818162519883

window函数

window函数具体是指在指定数据窗口上执行聚合操作,并使用对当前数据的引用来定义它,此窗口指定将哪些行传递给此函数。不同于groupBy处理数据分组的方式 每一行只能进入一个分组,窗口函数基于Frame的一组行,计算表中的每一输入行的返回值,每一行属于一个或多个Frame。

配置窗口函数第一步是创建一个窗口规范。

image-20200818165610853

rowsBetween参数说明

rowBetween含有两参数,第一个参数指定操作起始位置,第二个参数指定操作的最后位置。

Window.unboundPreceding 分区开始位置

Window.currentRow 分区计算当前位置

Window.unboundedFollowing 分区的最后位置

负数 — 前有元素的情况下向前追加

0 — 等价于currentRow

正数 — 后有元素的情况下向后追加

分组集

分组集用于多组聚合操作组合在一起的底层工具,使得能够在group-by语句中创建任意得聚合操作。

image-20200818171512039

注:分组集取决于聚合级别的null值,如果不过滤空值,则会得到不正确的结果。

rollup

image-20200818174716873

cube

image-20200818174908567

使用grouping_id对元数据分组

image-20200818175931992

透视转换

使用透视转换后,DataFrame会为每一个Country和数值类型列组合产生一个新列。

image-20200818180755793

三、用户自定义聚合函数UDAF

UDAF是用户根据自定义公式或业务逻辑定义自己的聚合函数的一种。可以使用UDAF计算输入数据组(与单行相对)的自定义计算。Spark维护单个AggregateBuffer,用于存储每组输入数据的中间结果。

若要创建UDAF,必须继承UserDefineAggregateFunction基类实现以下方法:

  • inputSchema用于指定输入参数,输入参数类型为StructType
  • bufferSchema用于指定UDAF中间结果,中间结果类型为StructType
  • dataType用于指定返回结果,返回结果类型为DataType
  • deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果
  • initialize初始化聚合缓冲区的初始值
  • update描述应如何根据给定行更新内部缓冲区
  • merge描述应如何合并两个聚合缓冲区
  • evaluate将生成聚合最终结果

image-20200818184434129

四、连接

连接类型

  • inner join: 内部连接,保留左、右数据集内某个键都存在的行
  • outer join:外部连接, 保留左侧或右侧数据集中具有某个键的行
  • left outer join:左外部连接, 保留左侧数据集中具有某个键的行
  • right outer join: 右外部连接, 保留右侧数据集中具有某个键的行
  • left semi join: 左半连接,如果某键在右侧数据行中出现,则保留且仅保留左侧数据行
  • left anti join:左反连接, 如果某键在右侧数据行中没出现,则保留且仅保留左侧数据行
  • natural join: 自然连接, 通过隐式匹配两个数据集之间具有相同名称的列来执行连接
  • cross join:笛卡尔连接,又称为交叉连接,将左侧数据集中的每一行与右侧数据集中每一行匹配

内连接

image-20200818215521416

外连接

image-20200818215614760

左外连接

image-20200818215652272

右外连接

image-20200818215749677

左半连接

image-20200818215937647

左反连接

image-20200818220029305

笛卡尔连接

image-20200818220108960

复杂类型连接操作

image-20200818220343583

处理连接中的重复列

image-20200818220538260

五、Spark通信策略

在连接过程中,Spark以两种不同方式处理集群通信问题。要么执行导致all-to-all通信的shuffle join, 要么采用broadcast join。

大表与大表连接(shuffle join)

大表连接大表时,执行shuffle join。则每个结点都与所有其他结点进行通信,并根据哪个结点具有用于连接的某个键或某一组键来共享数据。由于网络会因通信量而阻塞,所以这种方式很耗时,特别是如果数据没有合理分区的情况下

大表与小表连接(broadcast join)

通过将数据量较小的DataFrame复制到集群中的所有工作结点上,只需开始时执行一次,然后让每个工作节点独立执行作业,无需等待其他工作节点,无需与其他工作结点通信。

通过DataFrame API可以显式告知优化器,使用broadcast函数作用于较小的DataFrame上,并执行广播通信模式的连接操作。

image-20200818225242861

image-20200818225255520

小表与小表连接

最好让Spark自行决定

六、补充 SparkSQL join的三种实现

这部分内容参考博客:https://www.linkedin.com/pulse/spark-sql-3-common-joins-explained-ram-ghadiyaram

http://hbasefly.com/2017/03/19/sparksql-basic-join/

先说结论:

  1. Broadcast Hash Join 适合一张较小的表与一张大表进行join
  2. Shuffle Hash Join 适合一张小表和一张大表进行join,或者两张小表之间的join
  3. Sort Merge Join: 适合两张较大的表进行join

注:前两者基于Hash Join, 不同点在于优先进行Shuffle或Broadcast。

Hash Join

整个过程分为三步:

  1. 确定BuildTable,ProbeTable。BuildTable使用join key构建Hash Table,Probe Table使用join key进行探测,探测成功则可Join。通常情况下,小表为BuildTable, 大表为ProbeTable。
  2. 构建HashTable。依次读取BuildTable的数据,对于每一行数据根据join key进行hash,hash到对应的Bucket,生成HashTable中的一条记录,数据缓存在内存中,若内存放不下需要dump到外存。
  3. 探测。再依次扫描ProbeTable数据,使用相同的hash函数映射HashTable中记录。映射成功后再检测Join条件,若匹配成功,则将二者进行Join。

broadcast join(广播表要足够小)

broadcast Join的条件:

  1. 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
  2. 基表不能被广播,如left outer join,只能广播右表

broadcast join步骤:

  1. broadcast阶段:将小表广播分发到大表所在的所有结点。
  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探

0

shuffle hash join

shuffle hash join的条件:

  1. 分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
  2. 基表不能被广播,比如left outer join时,只能广播右表
  3. 一侧的表要明显小于另外一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)

shuffle hash join步骤:

  1. 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
  2. 对对应分区中的数据进行join,对各个分区内先将小表分区构造为一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配

sort merge join

sort merge join步骤:

  1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
  2. sort阶段:对单个分区节点的两表数据,分别进行排序
  3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边

原文地址:https://www.cnblogs.com/ganshuoos/p/13527018.html