Hive的数据倾斜

目录

  1. 什么是数据倾斜
  2. Hadoop框架的特性
  3. 主要表现
  4. 容易数据倾斜的情况
  5. 产生数据清洗的原因
  6. 业务场景
    1. 空值产生的数据倾斜
    2. 不同数据类型关联产生数据倾斜
    3. 大小表关联查询产生数据倾斜

一、什么是数据倾斜

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点

二、Hadoop框架的特性

  1. 不怕数据大,怕数据倾斜
  2. jobs数比较多的作业运行效率相对比较低,如子查询比较多
  3. sum、count、max、min等聚合函数,通常不会有数据倾斜问题

三、主要表现

任务进度长时间维持在99%或者100%的附近,查看任务监控页面,发现只有少量reduce子任务未完成,因为其处理的数据量和其他的reduce差异多大.单一reduce处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长.

四、容易数据倾斜的情况

关键词 情形 后果
join 其中一个表较小,但是key集中 分发到某一个或几个Reduce上的数据远高于平均值
大表与大表,但是分桶的判断字段0值或空值过多 这些空值都由一个reduce处理,非常慢
group by 维度过小,某值的数量过多 处理某值的reduce非常耗时
count distinct 某特殊值过多 处理次特殊值的reduce耗时
  1. group by不和聚集函数搭配使用的时候
  2. count(distinct),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组的,按distinct字段排序
  3. 小表关联超大表join

五、产生数据倾斜的原因

  1. key分布不均匀
  2. 业务数据本身的特性
  3. 建表考虑不周全
  4. 某些HQL语句本身就存在数据倾斜

六、业务场景

1. 空值产生的数据倾斜

场景说明

在日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id相关联,就会陪到数据倾斜的问题

解决方案

方案一: user_id为空的不参与关联

select * from log a join user b on a.user_id is not null and a.user_id = b.user_id
union all
select * from log c where c.user_id is null;

解决方案2: 赋予空值新的key值

select * from log a left outer join user b on
case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id;

总结

方法2比方法1效率更好,不但IO少了,而且作业数也少了; 方案1中,log表读了两次,jobs肯定是2,而方案2是1.这个优化适合无效id(比如-99, ", null)产生的数据倾斜,把空值的key变成一个字符串加上一个随机数,就能把造成数据倾斜的数据分布到不同的reduce上解决数据倾斜的问题.

改变之处:使本身为null的所有记录不会拥挤在同一个reduce Task了,会由于由替代的随机字符串,而分散到了多个reduce Task中,由于null值关联不上,处理后并不影响最终结果.

2. 不同数据类型关联产生数据倾斜

场景说明

用户表中user_id字段为int,log表中user_id为既有string也有int的类型,当按照两个表的user_id进行join操作的时候,默认的hash操作会按照int类型的id进行分配,这样就会导致所有的string类型的id就被分到同一个reducer当中

解决方案

select * from user a left outer join log b on b.user_id = cast(a.user_id as string)

3. 大小表关联查询产生数据倾斜

注意: 使用map join解决小表关联大表造成的数据倾斜问题,这个方案使用的频率很高.

map join 概念: 将其中做连接的小表(全量数据)分发到所有Map Task端进行join,从而避免了reduce Task,前提要求是内存足以装下该全量数据

屏幕快照 2019-05-12 23.29.39

以大表a和小表b为例,所有的maptask节点都装载小表b的所有数据,然后大表a的一个数据块数据,比如说是a1去跟b全量数据做链接,就省去了reduce做汇总的过程.所以相对来说,在内存允许的条件下使用map join比直接使用MapReduce效率还高些,当然这只限于做join查询的时候.

在hive中,直接提供了能够在HQL语句指定该次查询使用map join,map join的用法是在查询/子查询的SELECT 关键字后面添加 /+ MAPJOIN(tablelist)/ 提示优化器转化为map join(早期的Hive版本的优化器是不能自动优化map join的).其中tablelist可以是一个表,或以都好连接的表的列表.tablelist中的表将会读入内存,通常应该是将小表写在这里.

MapJoin具体用法:

select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movied;

在Hive0.11版本以后会自动开启map join优化,由两个参数控制:

set hive.auto.convert.join=true; //设置MapJoin优化自动开启 set hive.mapjoin.smalltable.filesize=25000000; //设置小表不超过多大时开启mapjoin优化

如果是大大表关联呢?那就大事化小,小事化了.把大表切分成小表,然后分别map join.

那么如果小表不大不小,那该如何处理呢?

使用map join解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这是就需要特别的处理.

例如: 日志表和用户表做连接

select * from log a left outer join users b on a.user_id = b.user_id;

users表由600w+的记录,把users分发到所有的map上也是个不小的开销,而且map join不支持这么大的小表.如果用普通的join,又会碰到数据倾斜的问题.

改进方案:

select /* +mapjoin(x) */* from log a
left outer join (
 select /* +mapjoin(c)*/ d.*
 form (select distinct user_id from log ) c.join users d on c.user_id = d.user_id) x
 on a.user_id = x.user_id; 

假如,log里user_id有上百万个,这就又回到原来map join问题.所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等.所以这个放啊能解决很多情景下的数据倾斜问题.

原文地址:https://www.cnblogs.com/suixingc/p/hive-de-shu-ju-qing-xie.html