Hive高级(5):优化(5)HiveSQL优化方法与实践(二)

来源:https://mp.weixin.qq.com/s/_jZr9CIEtu92kE1r6XIFzA

Order by的优化原理

如果指定了hive.mapred.mode=strict(默认值是nonstrict),这时就必须指定limit来限制输出条数,原因是:所有的数据都会在同一个reducer端进行,数据量大的情况下可能不能出结果,那么在这样的严格模式下,必须指定输出的条数。

所以数据量大的时候能不用order by就不用,可以使用sort by结合distribute by来进行实现。

sort by是局部排序;

distribute by是控制map怎么划分reducer。

cluster by=distribute by + sort by

被distribute by设定的字段为KEY,数据会被HASH分发到不同的reducer机器上,然后sort by会对同一个reducer机器上的每组数据进行局部排序。

 例如:

select mid, money, name 
from store 
cluster by mid

select mid, money, name 
from store 
distribute by mid 
sort by mid

如果需要获得与上面的中语句一样的效果:

select mid, money, name 
from store 
cluster by mid 
sort by money

注意被cluster by指定的列只能是降序,不能指定asc和desc。

不过即使是先distribute by然后sort by这样的操作,如果某个分组数据太大也会超出reduce节点的存储限制,常常会出现137内存溢出的错误,对大数据量的排序都是应该避免的。

Count(distinct …)优化

如下的sql会存在性能问题:

SELECT COUNT( DISTINCT id ) FROM TABLE_NAME WHERE ...;

主要原因是COUNT这种“全聚合(full aggregates)”计算时,它会忽略用户指定的Reduce Task数,而强制使用1,这会导致最终Map的全部输出由单个的ReduceTask处理。这唯一的Reduce Task需要Shuffle大量的数据,并且进行排序聚合等处理,这使得它成为整个作业的IO和运算瓶颈。

图形如下:

 为了避免这一结构,我们采用嵌套的方式优化sql:

SELECT COUNT(*) 
FROM (
  SELECT DISTINCT id FROM TABLE_NAME WHERE … 
) t;

这一结构会将任务切分成两个,第一个任务借用多个reduce实现distinct去重并进行初步count计算,然后再将计算结果输出到第二个任务中进行计数。

另外,再有的方法就是用group by()嵌套代替count(distinct a)。

如果能用group by的就尽量使用group by,因为group by性能比distinct更好。

HiveSQL细节优化

1) 设置合理的mapreduce的task数,能有效提升性能。

set mapred.reduce.tasks=n

2) 在sql中or的用法需要加括号,否则可能引起无分区限制:

Select x
from t 
where ds=d1 
and (province=’gd’ or province=’gx’)

3) 对运算结果进行压缩:

set hive.exec.compress.output=true;

4) 减少生成的mapreduce步骤:

4.1)使用CASE…WHEN…代替子查询;

4.2)尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段;

5) 在map阶段读取数据前,FileInputFormat会将输入文件分割成split。split的个数决定了map的个数。

mapreduce.input.fileinputformat.split.minsize 默认值 0
mapreduce.input.fileinputformat.split.maxsize 默认值 Integer.MAX_VALUE
dfs.blockSize 默认值 128M,所以在默认情况下 map的数量=block数

6) 常用的参数:

hive.exec.reducers.bytes.per.reducer=1000000;

设置每个reduce处理的数据量,reduce个数=map端输出数据总量/参数;

set hive.mapred.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set mapred.job.name=p_${v_date};
set mapred.job.priority=HIGH;
set hive.groupby.skewindata=true;
set hive.merge.mapredfiles=true;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapred.output.compression.type=BLOCK;
set mapreduce.map.memory.mb=4096;
set mapreduce.reduce.memory.mb=4096;
set hive.hadoop.supports.splittable.combineinputformat=true;
set mapred.max.split.size=16000000;
set mapred.min.split.size.per.node=16000000;
set mapred.min.split.size.per.rack=16000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.exec.reducers.bytes.per.reducer=128000000;

7)设置map个数:
  map个数和来源表文件压缩格式有关,.gz格式的压缩文件无法切分,每个文件会生成一个map;

set hive.hadoop.supports.splittable.combineinputformat=true; 只有这个参数打开,下面的3个参数才能生效
set mapred.max.split.size=16000000; 每个map负载;
set mapred.min.split.size.per.node=100000000; 每个节点map的最小负载,这个值必须小于set mapred.max.split.size的值;
set mapred.min.split.size.per.rack=100000000; 每个机架map的最小负载;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
原文地址:https://www.cnblogs.com/qiu-hua/p/14296678.html