数据倾斜处理及使用场景

  数据倾斜含义:   

1.1、是指shuffle过程中,必须将各个节点上相同key拉取到某个节点上的一个task来进行处理,此时如果某个key对应的数据特别大的话,就会发生数据倾斜。

      1.2、数据倾斜举例:

 
 

二、数据倾斜现象

      2.1、同一个stage中相同task绝大部分task执行时间快,少数几个执行时间慢。往往慢task耗时是快task耗时的2-3倍以上。

      2.2、原本正常运行的任务某天突然oom,也可能是发生了数据倾斜。

三、定位数据倾斜的代码

      3.1、数据倾斜是发生在shuffle过程中,定位代码中具有 shuffle的算子。例如:reduceBykey,groupByKey,aggregateByKey,distinct,cogroup,join,partionBy,repartion.

    3.2、结合任务管理界面,比对同一个task的执行时间,与数据量。如果有个别相差较大就说有数据倾斜,从而根据task定位到对应stage的shuffle算子。

    3.3、对数据进行无放回采样,查看key的分布,得到倾斜的key值,再从代码中定位哪些地方会有这些key参与从而定位到对应的代码块。

四、数据倾斜解决方案

4.1、对数据进行etl 预处理数据

              使用场景:

                    a、hive中文件大小不均匀,有的大有的小。spark在读取大文件时会对大文件按。照block快进行切分,小文件不会切分。如果不进行预处理,那么小文件处理速度快,大文件处理慢、资源没有得到充分利用,可以先对hive数据进行清洗、去重、重新分区等操作来将原本不均匀的数据重新均匀的存放在多个文件中。以简化后面依赖此数据源的任务。

                    b、hive中key分布不均匀,可以将shuffle类操作在先进行处理。处理完毕之后,spark应用不必进行重复的shuffle,直接用处理后的结果就可以。在频繁调用spark作业并且有实效要求的场景中,如果今天作业要用到昨天数据的聚合数据,可以每天进行一次预处理,将数据聚合好,从而保证今天作业的实效要求。

4.2、过滤少数导致倾斜的key

              使用场景:

                      a、倾斜key没有业务有意义,比如存在很多key是‘-’(‘-’在我们系统代表空)的记录,那么久可以直接filter掉来解决’-‘带来的数据倾斜。

                      b、倾斜key是有意义的,那么就需要单独拎出来进行单独处理。

4.3、提高shuffle操作的并行度

使用场景:同一个task被分配了多个倾斜的key.试图增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。实现起来比较简单,可以有效缓解和减轻数据倾斜的影响,原理如下图

sqlContext.setConf("spark.sql.shuffle.partitions", "1000"); 
 
 

4.4、两阶段聚合(局部聚合+全局聚合)

              使用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。如果是join类的shuffle操作,还得用其他的解决方案。

              实现方式:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

在sparksql 中 编写udf 函数 实现随机前缀   
在rdd 中使用
Random random = new Random();
random.next(10)  给要处理的一般数据量大时 根据实际数据进行设置 添加到要聚合的key 拼接,然后进行聚合 下一步split掉 key中的随机数,在进行二次聚合
 
 
 

4.5、将reduce join转为map join

           将小表进行广播共享

               使用场景:join类操作,存在小表join大表的场景。可以将小表进行广播从而避免shuffle

4.6、采样倾斜key并分拆join操作

               使用场景:适用于join类操作中,由于相同key过大占内存,不能使用4.5方案,但倾斜key的种数不是很多的场景。

                实现方式:

                        第一步、 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key.

      使用sample 抽样算子 (false 0.1) 值可根据数据量调  计算出量大的key

                        第二步、将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。

      拆分出数据量较多的key 然后按照 4.4 方法做就好

                      第三步、 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。

       膨胀数据的理解就是相当于 inner join  可以参考 另外一篇rdd join 进行理解

                      第四步、再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。

                      第五步、 而另外两个普通的RDD就照常join即可。最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

具体原理见下图:

 
 

4.7、使用随机前缀和扩容RDD进行join

              使用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜。

              实现方式:同4.6,不同的是它不需要对原先rdd进行倾斜key过滤将原来rdd形成含倾斜key的rdd,与不含倾斜key的rdd。直接对整个原本的rdd的key一边进行加随机数,另一边进行相应倍数的扩容。而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

原文地址:https://www.cnblogs.com/Mr--zhao/p/12799989.html