hivesql常用set

set hive.execution.engine = tez;  --"mr", "tez", "spark"

set tez.queue.name=root.hello;

set tez.grouping.min-size=556000000;

set tez.grouping.max-size=3221225472;

set hive.tez.auto.reducer.parallelism=true;

set mapreduce.map.cpu.vcores=6;
set mapreduce.map.memory.mb=3072;
set mapreduce.map.java.opts=-Xmx2400m -Dfile.encoding=UTF-8;
set mapreduce.reduce.cpu.vcores=6;
set mapreduce.reduce.memory.mb=6144;
set mapreduce.reduce.java.opts=-Xmx4900m -Dfile.encoding=UTF-8;
set hive.exec.max.created.files=900000;
set mapred.job.queue.name=root.ESS-GODDOG-OFFLINE;

set mapred.job.name=t_dm_relation_graph;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=3000;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.fetch.task.conversion=more;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=2048;
set mapreduce.job.jvm.numtasks=-1;
set hive.exec.reducers.bytes.per.reducer=500000000;
--set mapred.reduce.tasks=1024;
set hive.auto.convert.join=true;
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=134217728;
set mapred.min.split.size.per.rack=134217728;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=16000000;
set io.file.buffer.size=2097152;
set mapreduce.task.io.sort.mb=300;
set mapreduce.map.sort.spill.percent=0.8;
set mapreduce.task.io.sort.factor=10;
set mapreduce.reduce.shuffle.parallelcopies=8;
set hive.exec.compress.intermediate=true;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapreduce.reduce.merge.inmem.threshold=0;
set mapreduce.reduce.input.buffer.percent=0.8;
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;


# 关闭local hadoop
set hive.exec.mode.local.auto=false;

set mapred.job.priority=HIGH;

set mapreduce.job.reduce.slowstart.completedmaps=0.15

Map端调优

map数控制

对于map运行时间过长或者map数据倾斜比较严重时可以通过设置切片大小实现对map数的控制,hive设置如下

set mapred.max.split.size=100000000;

set mapred.min.split.size.per.node=100000000;

set mapred.min.split.size.per.rack=100000000;

 map小文件优化

使用Combinefileinputformat,将多个小文件打包作为一个整体的inputsplit,减少map任务数

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

Map 内存控制

ApplicationMaster执行在独立的jvm中以child process的方式执行 Mapper/Reducer task

子任务继承ApplicationMaster的环境,用户可以通过mapreduce.{map|reduce}.java.opts 指定child-jvm参数

set mapreduce.map.memory.mb=2048;

set mapreduce.map.java.opts=-Xmx1536m

MapJoin优化

set hive.auto.convert.join = true;

set hive.mapjoin.smalltable.filesize=25000000;
select /*+ MAPJOIN(table_a)*/,a.*,b.* from table_a a join table_b b on a.id = b.id

reduce端优化

reduce数控制

hive1.2.1自己如何确定reduce数: reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为256*1000*1000。hive.exec.reducers.max(每个任务最大的reduce数,默认为1009)

可以直接通过mapred.reduce.tasks(新mapreduce.job.reduces) 设置map数的大小或者通过hive.exec.reducers.bytes.per.reducer设置每个reduce task处理的数据量大小来间接控制reduce大小

set mapred.reduce.tasks = 15;

set hive.exec.reducers.bytes.per.reducer=500000000; 

其中

set mapreduce.job.reduces=number的方式优先级最高(旧参数:mapred.reduce.tasks)。

set hive.exec.reducers.max=number优先级次之,

set hive.exec.reducers.bytes.per.reducer=number 优先级最低。从hive0.14开始,一个reducer处理文件的大小的默认值是256M。

reduce内存控制

set mapreduce.reduce.memory.mb=2048;

set mapreduce.reduce.java.opts=-Xmx1536m

  

MapReduce输出压缩优化

Map端压缩

--启用中间数据压缩

set hive.exec.compress.intermediate=true

-- 启用最终数据输出压缩

set hive.exec.compress.output=true;

--开启mapreduce中map输出压缩功能

set mapreduce.map.output.compress=true;

--设置mapreduce中map输出数据的压缩方式

set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;

 

--开启mapreduce最终输出数据压缩

set mapreduce.output.fileoutputformat.compress=true;

--设置mapreduce最终数据输出压缩方式

set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;

--设置mapreduce最终数据输出压缩为块压缩。三种压缩选择:NONE, RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。

set mapreduce.output.fileoutputformat.compress.type=BLOCK;

hive创建文件限制

Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.

一个MR job中允许创建多少个HDFS文件

MR对文件创建的总数是有限制的,这个限制取决于参数:

hive.exec.max.created.files,默认值是100000,可以根据实际情况调整

  hive.error.on.empty.partition   false  当动态分区插入产生空值时是否抛出异常

set hive.exec.max.created.files=150000

MapReduce文件合并优化

--设置map端输出进行合并,默认为true  

set hive.merge.mapfiles = true  

--设置MapReduce结果输出进行合并,默认为false  

set hive.merge.mapredfiles = true  

--设置合并文件的大小  

set hive.merge.size.per.task = 256*1000*1000  

--当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。  

set hive.merge.smallfiles.avgsize=16000000

Group by优化

group by数据倾斜

hive.map.aggr=true      //map端是否聚合

hive.map.aggr.hash.force.flush.memory.threshold=0.9  //map侧聚合最大内存,超出后flush

hive.map.aggr.hash.min.reduction=0.5  //开启聚合的最小阈值

hive.map.aggr.hash.percentmemory=0.5 //map端hash表聚合内存大小

--group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

set hive.groupby.mapaggr.checkinterval=100000 ;

--如果是group by过程出现倾斜 应该设置为true

set hive.groupby.skewindata=true;

--join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

set hive.skewjoin.key=100000;

--如果是join 过程出现倾斜 应该设置为true

set hive.optimize.skewjoin=true;

空key转换

有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上
案例

  1. 不随机分布空null值
    1). 设置5个reduce个数
    set mapreduce.job.reduces = 5;
    2). JOIN两张表
insert overwrite table jointable select n.* from nullidtable n left join ori b on n.id = b.id;

 可以看出来,出现了数据倾斜,某些reducer的资源消耗远大于其他reducer。

  1. 随机分布空null值
    1) . 设置5个reduce个数
    set mapreduce.job.reduces = 5;
    2). JOIN两张表
insert overwrite table jointable select n.* from nullidtable n full join ori o on case when n.id is null then concat('hive', rand()) else n.id end = o.id;

 可以看出来,消除了数据倾斜,负载均衡reducer的资源消耗。

动态分区优化

hive默认关闭了动态分区,使用动态分区时候,hive.exec.dynamic.partition参数必须设置成true;nonstrict模式表示允许所有的分区字段都可以使用动态分区。一般需要设置为nonstrict;当分区数比较多时hive.exec.max.dynamic.partitions和hive.exec.max.dynamic.partitions.pernode也需要根据时间情况调大

set hive.exec.dynamic.partition=true;

set hive.exec.dynamic.partition.mode=nonstrict;

set hive.exec.max.dynamic.partitions=3000;

set hive.exec.max.dynamic.partitions.pernode=1000;

 

Fetch优化

我们在执行hive代码的时候,一条简单的命令大部分都会转换成为mr代码在后台执行,但是有时候我们仅仅只是想获取一部分数据而已,仅仅是获取数据,还需要转化成为mr去执行就比较浪费时间和内存了

hive提供了一个参数hive.fetch.task.conversion,这个参数所对应着"none", "minimal", "more":

1.none下 : disable hive.fetch.task.conversion。所有查询都启动MR。

2.minimal下,我们执行select * ,limit,filter在一个表所属的分区表上操作,这三种情况都会直接进行数据的拿去,也就是直接把数据从对应的表格拿出来,不用跑mr代码

3.more模式下,运行select,filter,limit,都是运行数据的fetch,不跑mr应用

set hive.fetch.task.conversion=more;

Insert overwrite优化

当动态分区过多时,由于需要创建太多的文件,FileSinkOperator很有可能会触发GC overlimit问题,可以启用hive.optimize.sort.dynamic.partition,动态分区列将被全局排序。这样我们就可以为每个分区值只打开一个文件写入器,可以极大的减小reduce内存的压力

set hive.optimize.sort.dynamic.partition=true;

Hive Sql并行执行优化

hive.exec.parallel参数控制在同一个sql中的不同的job是否可以同时运行,默认为false。有时候一个sql中的有些job是不相关的,设置hive.exec.parallel=true可以让这些不相关的job同时跑

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=32;    //同一个sql允许最大并行度,默认为8。

1.1.1 reduce数据倾斜导致报错?

 

 

 常见出错的原因:

通过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。

如果没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的作业。mapper任务会读取输入记录然后将它们发送到目标分区目录。

在这种情况下,每个mapper必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。

mapper在运行时所需的内存量随着它遇到的分区数量的增加而增加。

优化动作:

set hive.optimize.sort.dynamic.partition = true

通过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

类似的详细说明,链接,如下:

https://www.jianshu.com/p/cfb827642905?from=groupmessage&isappinstalled=0 

原文地址:https://www.cnblogs.com/LIAOBO/p/13612752.html