hive 总结四(优化)

本文参考:黑泽君相关博客
本文是我总结日常工作中遇到的坑,结合黑泽君相关博客,选取、补充了部分内容。

表的优化

小表join大表、大表join小表

将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;
再进一步,可以使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别

hive> set hive.auto.convert.join;
hive.auto.convert.join=true

这一方面的优化个人觉得也就这个样了。


大表join大表

有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。
出现上述问题的主要原因有两个:
数据倾斜
存在大量异常值,比如空值,坏数据等。

  • 异常值
    下面是两个大表
    nullbigtable:key包含null
    bigtable:关联表
hive> insert overwrite table resulttable 
    >select n.* 
    >from nullbigtable n 
    >left join 
    >bigtable o 
    >on 
    >n.id=o.id;
.
.
.
Time taken: 40.346 seconds, Fetched: 1342933 row(s)

hive> insert overwrite table resulttable 
    >select n.* 
    >from 
    >(select * from nullbigtable where id is not null) n 
    >left join 
    >bigtable o 
    >on 
    >n.id=o.id;
.
.
.
Time taken: 38.346 seconds, Fetched: 1321347 row(s)

上面看起来稍微有点作用,可能是我的测试null设置的比较少。
  • 数据倾斜
    如果本身null数据是合法的,或异常数据需要保留,那么就要换种方式处理

给异常值设置随机数,目的是为了让null或一场数据均匀分布到各个reducer中

insert overwrite table jointable
hive>select n.* 
    >from nullbigtable n 
    >full join 
    >bigtable o 
    >on 
    >case when n.id is null then concat('hive', rand()) else n.id end=o.id;

笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件。
Hive只能使用1个reducer来完成笛卡尔积(笛卡尔积过大会撑爆内存)。


MapJoin

MapJoin工作机制

通过MapReduce Local Task,将小表读入内存
生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。

MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中
顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。

测试
关闭Mapjoin功能
hive> set hive.auto.convert.join=false;
hive> select
    > big.*
    > from 
    > tmp.orclocal small
    > join 
    > tmp.orcTest  big
    > on small.userid=big.userid
    > limit 1;
Query ID = hdfs_20190716161515_f462647a-14fd-4808-8eb5-4c60dc5663f1
Total jobs = 1
Launching Job 1 out of 1
.
.
.
Stage-Stage-1: Map: 6  Reduce: 4   Cumulative CPU: 222.55 sec   HDFS Read: 251052684 HDFS Write: 1138 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 42 seconds 550 msec
OK
02012138
Time taken: 48.529 seconds, Fetched: 1 row(s)

hive> select
    > big.*
    > from 
    > tmp.orcTest  big
    > join 
    > tmp.orclocal small
    > on small.userid=big.userid
    > limit 1;
Query ID = hdfs_20190716161515_6bac9e09-f579-42c9-84ab-2fb9ef5c8760
Total jobs = 1
Launching Job 1 out of 1
.
.
.
Stage-Stage-1: Map: 6  Reduce: 4   Cumulative CPU: 219.75 sec   HDFS Read: 251048180 HDFS Write: 1138 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 39 seconds 750 msec
OK
02012138
Time taken: 47.599 seconds, Fetched: 1 row(s)

开启Mapjoin功能
hive> set hive.auto.convert.join=true;
hive> select
    > big.*
    > from 
    > tmp.orclocal small
    > join 
    > tmp.orcTest  big
    > on small.userid=big.userid
    > limit 1;
Query ID = hdfs_20190716161818_85f3ca54-c06b-4d21-8597-7feb86541f54
Total jobs = 1
.
.
.
Total MapReduce CPU Time Spent: 39 seconds 800 msec
OK
02012138
Time taken: 20.849 seconds, Fetched: 1 row(s)

hive> 
    > select
    > big.*
    > from 
    > tmp.orcTest  big
    > join 
    > tmp.orclocal small
    > on small.userid=big.userid
    > limit 1;
Query ID = hdfs_20190716161919_8774a3a0-baa4-4d87-afdf-4701cd2dcba6
Total jobs = 1
.
.
.
Stage-Stage-3: Map: 5   Cumulative CPU: 38.98 sec   HDFS Read: 175115579 HDFS Write: 1440 SUCCESS
Total MapReduce CPU Time Spent: 38 seconds 980 msec
OK
02012138
Time taken: 19.672 seconds, Fetched: 1 row(s)

对比两个结果,开启mapjoin几乎效率提升了一倍


group by

默认情况下,Map阶段key相同的数据会全部分发给一个Reduce,
当一个key数据过大时就会出现数据倾斜导致任务耗时长甚至失败。

但是并不是所有的聚合操作都需要在Reduce端完成,
很多聚合操作都可以先在Map端进行部分聚合(预处理),
最后在Reduce端得出最终结果。(类似于Combine)

是否在Map端进行聚合,默认为true
hive> set hive.map.aggr;
hive.map.aggr=true

设置在Map端进行聚合操作的条目数目
hive> set hive.groupby.mapaggr.checkinterval;
hive.groupby.mapaggr.checkinterval=100000

有数据倾斜的时候进行负载均衡(默认是false)
hive> set hive.groupby.skewindata;
hive.groupby.skewindata=false

设置对应值 老规矩

当选项设定为shive.map.aggr=true,生成的查询计划会有两个MR Job。

第一个MRJob中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;

第二个MR Job再根据预处理的数据结果按照group by key分布到Reduce中(这个过程可以保证相同的group by key被分布到同一个Reduce中),最后完成最终的聚合操作。

案例:
设置5个reduce个数
hive> set mapreduce.job.reduces=5;

执行去重id查询(只能用一个Reduce)
hive> select count(distinct userid ) from tmp.orcTest;
Query ID = hdfs_20190716163434_54ae4ccc-a95f-4a5c-924f-6fb8e50c8393
Total jobs = 1
.
.
.
Stage-Stage-1: Map: 5  Reduce: 1   Cumulative CPU: 233.47 sec   HDFS Read: 13234944 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 53 seconds 470 msec
OK
1260351
Time taken: 76.007 seconds, Fetched: 1 row(s)

采用GROUP BY去重id(可以使用多个Reduce,子查询帮我们做去重工作,把数据分发给了5个Reduce进行去重处理,大数据量的情况下效率高)
hive> select count(userid) from (select userid from tmp.orcTest group by userid) a;
Query ID = hdfs_20190716163535_5136be23-60bc-4808-a87f-338975588bb2
Total jobs = 2
.
.
.
1260351
Time taken: 90.242 seconds, Fetched: 1 row(s)

从结果看,貌似翻车了,这个是因为数据量问问题,一共也就才1000W的数据量,效果不明显,如果数据量大几个量级效果就很明显了
count distinct() 踩坑

如果有个一需求是对 两个字段进行去重 计数,
相信很多人都会踩到我遇到的坑。
直接用count (distinct c1,c2)
但是这样统计是错的

hive> select count(distinct userid ,errorcode) from tmp.orcTest;
Query ID = hdfs_20190716164242_b8198330-e1a7-4e23-ab35-2533dc228507
Total jobs = 1
.
.
.
OK
2196359
Time taken: 124.874 seconds, Fetched: 1 row(s)

hive> 
    > select count(1) from (select userid,errorcode from tmp.orcTest group by userid,errorcode) a;
Query ID = hdfs_20190716164444_08c11673-6530-4c1a-bdfd-b2768fc7083f
Total jobs = 2
OK
2196359
Time taken: 137.943 seconds, Fetched: 1 row(s)
我擦 又翻车了  等我下次找个案例

count ( distinct ) 中 distinct 的字段只能有一个,
如果要对多个字段去重使用 grouy by


动态分区

关系型数据库中,对分区表insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,
Hive中也提供了类似的机制,即动态分区(Dynamic Partition)的概念,
在使用Hive的动态分区前,需要进行相应的配置。

查看、设置相关属性

开启动态分区功能(默认true,开启)
hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=true

设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,  
nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive> set hive.exec.dynamic.partition.mode;
hive.exec.dynamic.partition.mode=strict

在所有执行MR的节点上,最大一共可以创建多少个动态分区。
hive> set hive.exec.max.dynamic.partitions;
hive.exec.max.dynamic.partitions=1000

在每个执行MR的节点上,最大可以创建多少个动态分区。该值要大于可能分区的个数。  
比如一周七天有7个分区,如果设置为6就会报错
hive> set hive.exec.max.dynamic.partitions.pernode;
hive.exec.max.dynamic.partitions.pernode=100

整个MR Job中,最大可以创建多少个HDFS文件。
hive> set hive.exec.max.created.files;
hive.exec.max.created.files=100000

当有空分区生成时,是否抛出异常。一般默认即可
hive> set hive.error.on.empty.partition;
hive.error.on.empty.partition=false

数据倾斜

Map数量

Q:决定Map数量的因素?

A:主要有三个因素:

  1. input的文件总个数
  2. input的文件大小
  3. 集群设置的文件块大小
  4. 依稀记得还有一个map数量限制(待确认)

Q:map数量越多越好?

A:并不是,如果有很多小文件(远小于一个block的大小),每个小文件会被看作一个块当做一个map任务来完成。这个时候可能启动map任务耗费的代价远高于逻辑处理的代价,造成资源浪费严重。

Q:是不是保证每个map处理接近一个block块大小的数据量即可?

A:并不是,正常情况下一个block对应一个map,但是如果对应的数据只有一两个字段,且字段内容很短,那么可能一个block包含的记录条数就会达到几千万,如果处理的逻辑还是相对复杂的,那么一个map很大概率也是吃不消的。

对于上述问题,我们要根据实际的情况增加或者减少map的数量。

  • 合并小文件(减少map)
    在map执行前合并小文件,减少map数
    CombineHiveInputFormat 系统默认的格式,具有小文件合并的功能
hive> set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

HiveInputFormat没有对小文件合并功能。
  • 复杂文件(增加Map数)

Q:什么情况下考虑增加map?

A: 下面几个情况

  1. 当input的文件都很大
  2. 任务逻辑处理复杂
  3. map执行非常慢的时候
  4. 文件列数量较少
    当上述原因可能导致任务处理的速度变慢时候,可以考虑尝试增加map

当然有个公式也可以来判断,不过这个公式暂时没研究

computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

Reduce 数量
  • 调整reduce个数方法

方法一: 通过设置参数

每个Reduce处理的数据量(默认是256MB,args1) 
hive> set hive.exec.reducers.bytes.per.reducer;
hive.exec.reducers.bytes.per.reducer=67108864
每个任务最大的reduce数(默认为1009,args2)
hive> set hive.exec.reducers.max;
hive.exec.reducers.max=1099

计算公式


N=min(args2,总输入数据量/args1)

方法二: 通过配置文件

hadoop的mapred-default.xml中修改
mapreduce.job.reduces=15;

Q: reduce是不是越多越好?

A: 不是reduce太多也会带来一些列问题:

  1. 启动、初始化reduce会消耗大量资源和时间。
  2. 一个reduce对应一个文件,如果生成的是小文件会对后续任务或结果文件产生不好的影响(hadoop 小文件问题)
    要让reduce处理数据量大小要合适,而不要禁锢自己的思维,一定要多少,合适就好

并行执行
打开任务并行执行(默认关闭)
hive> set hive.exec.parallel;
hive.exec.parallel=false

同一个sql允许最大并行度(默认为8)
hive> set hive.exec.parallel.thread.number;
hive.exec.parallel.thread.number=8

并行执行在系统资源比较空闲的时候才有优势,否则,没资源,也谈不上并行了。

严格模式

严格模式是为了防止用户执行那些可能意想不到的、不好的影响的查询。
设置方法一:

直接设置参数
hive> set hive.mapred.mode;
hive.mapred.mode=nonstrict

设置方法二:

修改配置文件
hive-default.xml.template
<property>
    <name>hive.mapred.mode</name>
    <value>strict</value>
    <description>
      执行Hive操作的模式。
      在严格模式下,不允许运行一些有风险的查询。它们包括:
      笛卡儿积
      没有为查询选择分区。
      比较bigint和字符串。
      比较 bigints 和 doubles.
      Order by 没有 limit(只是  Order by  一般的查询 没有limit 也是可以的).
    </description>
</property>
Order by 没有 limit
hive> set hive.mapred.mode=strict;
hive> select * from  tmp.orcTest order by userid desc;
FAILED: SemanticException 1:36 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'userid'
没有为查询选择分区
hive> set hive.mapred.mode=strict;
hive> select *  from orcTest_d;
FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "orcTest_d" Table "orcTest_d"

JVM重用

场景:难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

问题:Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。

解决方法:JVM重用可以使得JVM实例在同一个job中重新使用N次

配置方法一:

hive> set mapreduce.job.jvm.numtasks;
mapreduce.job.jvm.numtasks=1

配置方法二:

Hadoop的mapred-site.xml
<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm. If set to -1, there is
  no limit. 
  </description>
</property>

一般设置10-20,需要经过根据具体业务确定

Q:jvm重用没有缺点吗?

A:开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。
如果出现数据倾斜导致某几个task 执行极度缓慢,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。


推测执行

推测执行简单的解释就是,某个task执行的进度明显慢于其他的task,这可能是程序问题,也可能是该节点bug、或者资源问题,于是在其他地方重新启动一个相同的task处理相同数据, 让他们赛跑,谁赢了用谁的,结束慢的。

配置方法一:

如果为true,则可以并行执行某些map任务的多个实例。
hive> set mapreduce.map.speculative;
mapreduce.map.speculative=false

如果为true,则可以并行执行某些reduce任务的多个实例。
hive> set mapreduce.reduce.speculative;
mapreduce.reduce.speculative=true

配置方法二:

<property>
    <name>hive.mapred.reduce.tasks.speculative.execution</name>
    <value>true</value>
    <description>是否开启reduce的推测执行机制 </description>
</property>
原文地址:https://www.cnblogs.com/lillcol/p/11198054.html