hive order by,sort by, distribute by, cluster by作用以及用法

1. order by

    Hive中的order by跟传统的sql语言中的order by作用是一样的,会对查询的结果做一次全局排序,所以说,只有hive的sql中制定了order by所有的数据都会到同一个reducer进行处理(不管有多少map,也不管文件有多少的block只会启动一个reducer)。但是对于大量数据这将会消耗很长的时间去执行。
    这里跟传统的sql还有一点区别:如果指定了hive.mapred.mode=strict(默认值是nonstrict),这时就必须指定limit来限制输出条数,原因是:所有的数据都会在同一个reducer端进行,数据量大的情况下可能不能出结果,那么在这样的严格模式下,必须指定输出的条数。

2. sort by

    Hive中指定了sort by,那么在每个reducer端都会做排序,也就是说保证了局部有序(每个reducer出来的数据是有序的,但是不能保证所有的数据是有序的,除非只有一个reducer),好处是:执行了局部排序之后可以为接下去的全局排序提高不少的效率(其实就是做一次归并排序就可以做到全局排序了)。

3. distribute by和sort by一起使用

    ditribute by是控制map的输出在reducer是如何划分的,举个例子,我们有一张表,mid是指这个store所属的商户,money是这个商户的盈利,name是这个store的名字

store:

mid money name
AA 15.0 商店1
AA 20.0 商店2
BB 22.0 商店3
CC 44.0 商店4

    执行hive语句:

[delphi] view plain copy
  1. select mid, money, name from store distribute by mid sort by mid asc, money asc  
我们所有的mid相同的数据会被送到同一个reducer去处理,这就是因为指定了distribute by mid,这样的话就可以统计出每个商户中各个商店盈利的排序了(这个肯定是全局有序的,因为相同的商户会放到同一个reducer去处理)。这里需要注意的是distribute by必须要写在sort by之前。

4. cluster by

    cluster by的功能就是distribute by和sort by相结合,如下2个语句是等价的:

    

[sql] view plain copy
  1. select mid, money, name from store cluster by mid  
[sql] view plain copy
  1. select mid, money, name from store distribute by mid sort by mid  
    如果需要获得与3中语句一样的效果:

[sql] view plain copy
  1. select mid, money, name from store cluster by mid sort by money  

    注意被cluster by指定的列只能是降序,不能指定asc和desc。


mysql中有order by函数,而且是使用频率相当高的一个函数。之前看过一个数据,说计算机25%的工作量都用在排序上面(数据的真伪性没有考证)。从这也就不难看出为什么数据库里order by的操作这么重要了。

hive中除了order by以外,还有sort by。这两有什么区别,跟mysql里的order by又有些什么不同,本博主结合实际使用场景,跟大家稍微絮叨絮叨。

1.order by的使用方式

order by的使用上与mysql最大的不同,请看以下sql语句:

select cardno,count(*)
from tableA
group by idA
order by count(*) desc limit 10
  • 1
  • 2
  • 3
  • 4

这个语句在mysql中查询的时候,肯定是没有问题的,而且我们实际上也经常这么干。但是如果将上述语句提交给hive,会报以下错误:

FAILED: SemanticException [Error 10128]: Line 4:9 Not yet supported place for UDAF 'count'
  • 1

怎么样可以呢?将count(*)给一个别名就好:

select cardno,count(*) as num
from tableA
group by idA
order by num desc limit 10
  • 1
  • 2
  • 3
  • 4

这样就可以了。本博主没查源码,估计是因为hive查询的时候起的是mr任务,mr任务里排序的时候,不认得count(*)是什么东东,所以给个别名就好。

2.order by处理大数据量时候的无力

select col1,col2...
from tableA
where condition
order by col1,col2 desc(or asc)
  • 1
  • 2
  • 3
  • 4

上述sql按col1,col2排序。不过order by是做全局排序,全局排序就意味着在reduce端进行操作的时候,只能有一个reduce。不管如何配置,只能有一个reduce。那当数据量很大的时候,这个reduce就成为了单点,速度会很慢很慢。。。

3.distribute by sort by配合select top N

distribute by,顾名思义,是起分散数据作用的。distribute by col,则是按照col列为key分散到不同的reduce里去,默认采取的是hash算法。 
看到这里,大家有没有似曾相识的感觉?是不是跟group by很像呢?其实他两是很像的。唯一的区别,是distribute by只是分发数据到reduce,而group by将数据分发完以后,后面必须只能跟count,sum,avg等聚合操作。

sort by是局部排序,只确保每个reduce上输出的数据为有序。当然如果只有一个reduce的时候,跟order by是一样的。。。 
如果我们想取top 10,完全可以用sort by代替order by。请看:

select idA from tableA sort by idA limit 10
  • 1

将代码提交上去,首先会有如下输出:

Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Defaulting to jobconf value of: 5
  • 1
  • 2
  • 3

由此可见,reduce的数量不是编译sql时候确定的,而是根据我们之前指定的reduce数确定的。如果没指定,则是根据输入文件大小动态确定。

对比order by

select idA from tableA order by idA limit 10
  • 1

输出如下:

Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
  • 1
  • 2
  • 3

由此可见,order by的reduce数是在编译期间就确定为1了。

再看看sort by 的执行计划:

explain select idA from tableA sort by idA limit 10
  • 1
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: memberaddress
            Statistics: Num rows: 48553436 Data size: 388427488 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: cardno (type: bigint)
              outputColumnNames: _col0
              Statistics: Num rows: 48553436 Data size: 388427488 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: _col0 (type: bigint)
                sort order: +
                Statistics: Num rows: 48553436 Data size: 388427488 Basic stats: COMPLETE Column stats: NONE
                value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Extract
          Statistics: Num rows: 48553436 Data size: 388427488 Basic stats: COMPLETE Column stats: NONE
          Limit
            Number of rows: 10
            Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: bigint)
              sort order: +
              Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Extract
          Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: NONE
          Limit
            Number of rows: 10
            Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

根据执行计划很容易看出:相对于order by的一个job,sort by起了有两个job。第一个job现在每个reduce内部做局部排序,取top10。假设job1起了M个reduce,则第二个job再对M个reduce的输出做排序,但此时输入的数据量只有M*10条,最后取前10条,就得到了我们要的top10。这样与order by的全局排序相比,如果数据量很大的话,效率将大大提高。

4.distribute by 与sort by配合使用

hive (test)> select * from sort_by_test;
OK
1   10
1   20
2   10
2   20
2   30
3   10
3   15
3   40
3   20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
hive (test)> desc sort_by_test;
OK
id                      string
age                     string
  • 1
  • 2
  • 3
  • 4

表中有id与age两个字段。

hive (test)> set mapred.reduce.tasks=2;
hive (test)> select * from sort_by_test
           > sort by id;
  • 1
  • 2
  • 3

结果如下:

1   10
2   30
2   20
2   10
3   40
1   20
3   20
3   15
3   10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
hive (test)> set mapred.reduce.tasks=2;
hive (test)> select * from sort_by_test
           > distribute by id
           > sort by id;
  • 1
  • 2
  • 3
  • 4
2   30
2   20
2   10
1   20
1   10
3   20
3   40
3   15
3   10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

两个语句对比,很容易看出,加上distribute by以后,distribute by后面的col相同的值都被分配到了同一个reduce里。


正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
原文地址:https://www.cnblogs.com/candlia/p/11920243.html