[db] hadoop && pig

hadoop && pig

Hadoop

最近需要用到hadoop操作,发现hadoop的官网真的良心的,没那么多废话,直接说明白怎么用,而且还是中文的,简单粗暴啊!!!

hadoop document

在mapreduce中,map的输出是具有自动排序功能的!!!

在reduce阶段 ,0-33%阶段是 shuffle 阶段,就是根据键值来将本条记录发送到指定的reducer,这个阶段应该是在map还没有完全完成的时候就已经开始了,提高了程序的执行效率。34%-65%阶段是sort阶段,就是reduce根据收到的键值进行排序。map阶段也会发生排序,map的输出结果是以键值为顺序排序后输出,可以通过只有map阶段处理的输出来验证。66%-100%阶段是处理阶段,这个阶段才是真正的处理阶段,如果程序卡在66%,那就是reduce程序有问题了。

pig

另外还有一个pig语言,是一种过程语言,类似于mysql(这玩意也不熟~尴尬)。
总结下pig的用法,顺带实验例子。

这个网站也不错

pig数据类型

  • double | float | long | int | chararry
    | bytearray
  • tuple | bag | map
    tuple类似matlab cell,元素类型可以不同('haha',1)
    bag相当于tuple的集合,用{}表示,{('haha',1),('hehe',2)}
    field是列数据标识
    map相当于hash表,key为chararray,value为任意类型

运行和注释

运行:
本地模式:pig -x local
集群模式:pig -x mapreduce 或者 pig
批处理pig文件,上两行命令后接pig文件名,pig xx.pig
注释:
行注释 --
段注释 /**/

pig latin

>> cat 1.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2

[]代表可选项
pig命令不区分大小写

LOAD & Schema

load = LOAD 'data_path' [USING function] [AS schema]
A = LOAD '1.txt' USING PigStorage(' ') AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);

将1.txt的每行数据用' '分割,分装到对应的col1,col2的列名进行数据解析,如果没有指定,可以用$0 $n来索引。

pig默认PigStorage()读入本地磁盘或者hadoop路径数据,org.apache.hcatalog.pig.HCatLoader()读取hive表;()中的是分隔符。

Pig的schema运行我们指定relation为特定的结构,为字段指定名称和类型。如果省略类型信息,则默认为bytearray,也可以完全不指定schema。

但是不指定SCHEMA有时候会出问题,所以建议在load时写上,实在用不到的话用A,B,C...啥的随便写一些就好

describe A;
out:
A: {col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double}
A = LOAD '1.txt' USING PigStorage(' ') AS (col1, col2, col3, col4, col5, col6);
out:
A: {col1: bytearray,col2: bytearray,col3: bytearray,col4: bytearray,col5: bytearray,col6: bytearray}
A = LOAD '1.txt' USING PigStorage(' ');
out:
Schema for A unknown.
hive

Hive的组件HCatalog为解决这个问题提供了一种方案,通过提供Hive metastore的访问接口,Pig查询可以通过名字来引用Schema。

pig -useHCatalog
records = LOAD "records" USING org.apache.hcatalog.pig.HCatLoader();

GROUP

B = GROUP A BY (col2, col3, col4);
out:
((1,2,3),{(a,1,2,3,1.4,0.2),(a,1,2,3,4.2,9.8)})
((1,2,5),{(a,1,2,5,7.7,5.9)})
((3,0,5),{(a,3,0,5,3.5,2.1)})
((7,9,9),{(a,7,9,9,2.6,6.2),(b,7,9,9,,)})

用(col2, col3, col4)对A分组,然后按组将每条tuple汇聚成一个bag.B:{group:(col2,col3,col4),A:bag:{tuple,tuple}}

group分组操作,将数据分组为group_col:bag,第一个字段被命名为'group',第二个字段是bag,包含'group'对应的值的所有tuple集合。

FOREACH

C = FOREACH B GENERATE group, AVG(A.col5), AVG(A.col6);
out:
((1,2,3),2.8,5.0)
((1,2,5),7.7,5.9)
((3,0,5),3.5,2.1)
((7,9,9),2.6,6.2)
其中(1.4+4,2)/2=2.8

foreach是遍历每个组中的tuple,并对其进行处理。

AVG(平均),COUNT(计数),MIN,MAX基本和excel的缩写一致。

C:{group:(col2,col3,col4),double,double}.

一般foreach和generate是一块使用的,在数据较大时,建议尽早使用foreach generate过滤掉多余信息以减少数据交换。

FILTER

d = filter A by $0 == 'a';
out:
(a,1,2,3,4.2,9.8)
(a,3,0,5,3.5,2.1)
(a,7,9,9,2.6,6.2)
(a,1,2,5,7.7,5.9)
(a,1,2,3,1.4,0.2)

按照要求过滤数据,可以使用and or连接多个条件,滤除无用信息null 0 -1...

CONCAT & SUBSTRING

B = FOREACH A GENERATE CONCAT($0, (chararray)$1,(chararray)$2,(chararray)$3);
out:
(a123)
(a305)
(b799)
(a799)
(a125)
(a123)

C = foreach B generate (chararray)SUBSTRING($0,0,2);
out:
(a1)
(a3)
(b7)
(a7)
(a1)
(a1)

concat拼接两字符串,substring按长度截取字符串[0,2)左闭右开区间。

order

c = ORDER b BY cnt ASC;
ASC升序	DESC降序

显示和存储

DUMP C;
STORE C INTO 'output_path'

DUMP显示,上边所有的out部分。store是存储。output_path必须是不存在的路径,pig自己新建。

JOIN,UNION,COGROUP,CROSS

a.txt:
(2,Tie)
(4,Coat)
(3,Hat)
(1,Scarf)
b.txt:
(Joe,2)
(Hank,4)
(Ali,0)
(Eve,3)
(Hank,2)
A = LOAD 'a.txt' USING PigStorage(',');
B = LOAD 'b.txt' USING PigStorage(',');

JOIN

C = JOIN A BY $0, B BY $1;
out:
(2,Tie,Hank,2)
(2,Tie,Joe,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)

根据key得到行加入。inner join,一般用A小表join B大表,起到部分过滤的作用。

还有一个left join: left outer

UNION

D = UNION A, B;
out:
(Joe,2)
(Hank,4)
(Ali,0)
(Eve,3)
(Hank,2)
(2,Tie)
(4,Coat)
(3,Hat)
(1,Scarf)

可以对不同字段数的数据集进行union操作。

COGROUP

E = COGROUP A BY $0, B BY $1;
E = COGROUP A BY $0, B BY $1 outer;
out:
(0,{},{(Ali,0)})
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Hank,2),(Joe,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})
F = COGROUP A BY $0 inner, B BY $1;
out:
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Hank,2),(Joe,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})

输出一组嵌套的tuple结构。COGROUP为每个不同的key生成一个tuple。每个tuple的第一个字段就是key。其他字段是各个关系中匹配该键值的元组所组成的 bag。第一个bag中是A中的匹配tuple,第二个bag是B的,没有匹配的则为空{}。
COGROUP的默认类型outer连接。

CROSS

F = CROSS A, B;
out:
(1,Scarf,Hank,2)
(1,Scarf,Eve,3)
(1,Scarf,Ali,0)
(1,Scarf,Hank,4)
(1,Scarf,Joe,2)
(3,Hat,Hank,2)
(3,Hat,Eve,3)
(3,Hat,Ali,0)
(3,Hat,Hank,4)
(3,Hat,Joe,2)
(4,Coat,Hank,2)
(4,Coat,Eve,3)
(4,Coat,Ali,0)
(4,Coat,Hank,4)
(4,Coat,Joe,2)
(2,Tie,Hank,2)
(2,Tie,Eve,3)
(2,Tie,Ali,0)
(2,Tie,Hank,4)
(2,Tie,Joe,2)

CROSS 笛卡尔积。会将第一个关系中的每个元组和第二中的所有元组进行连接。这个操作的输出结果的大小是输入关系的大小的乘积。

功能code块

1)两张数据表中某个字段的交集、差集

a.txt
uidk,12,3
hfd,132,99
bbN,463,231
UFD,13,10
b.txt
908,uidk,888
345,hfd,557
28790,re,00000
A = LOAD 'a.txt' USING PigStorage(',') AS (acol1:chararray, acol2:int, acol3:int);
B = LOAD 'b.txt' USING PigStorage(',') AS (bcol1:int, bcol2:chararray, bcol3:int);
C = COGROUP A BY acol1, B BY bcol2;

out:
(re,{},{(28790,re,0)})
(UFD,{(UFD,13,10)},{})
(bbN,{(bbN,463,231)},{})
(hfd,{(hfd,132,99)},{(345,hfd,557)})
(uidk,{(uidk,12,3)},{(908,uidk,888)})

DESCRIBE C;
out:
C: {group: chararray,A: {(acol1: chararray,acol2: int,acol3: int)},B: {(bcol1: int,bcol2: chararray,bcol3: int)}}

D = filter C by NOT IsEmpty(A) ;
D = filter C by IsEmpty(B);
out:
(UFD,{(UFD,13,10)},{})
(bbN,{(bbN,463,231)},{})

D = filter C by NOT IsEmpty(A) AND NOT IsEmpty(B);
(hfd,{(hfd,132,99)},{(345,hfd,557)})
(uidk,{(uidk,12,3)},{(908,uidk,888)})

2)对表中某个字段的字符串切分

b.txt
908,uidk-haha,888
345,hfd-hehe,557
28790,re-hehe,00000
B = LOAD 'b.txt' USING PigStorage(',') AS (bcol1:int, bcol2:chararray, bcol3:int);
A = FOREACH B GENERATE FLATTEN(STRSPLIT(bcol2,'-',2)) AS (name,joy);
out:
(uidk,haha)
(hfd,hehe)
(re,hehe)
A = FOREACH B GENERATE FLATTEN(STRSPLIT(bcol2,'-',2).$0) AS name;

STRSPLIT('原始字符串', '分隔符(默认空格,特殊字符分割使用反斜杠 )', '限制返回的个数(超出舍弃)')

  1. 使用UDF来hash字段
    多记录一步,在UDF的返回值,如果不是字符串就不要用chararray的类型,因为在大数据量的时候会出问题。
cat func_hash.py 

@outputSchema('hash_product:int')
def get_hash(product):
    if len(product) >= 50:
        return 0
    return hash(product)  

pig:
REGISTER 'hdfs://ns3/hadoop_path/func_hash.py' USING jython AS myUdfs;
DEFINE tohash myUdfs.get_hash();
A = FOREACH A   GENERATE 
		b        AS b,
		tohash(c)     AS c;

Reference

https://www.w3cschool.cn/apache_pig/
https://www.zybuluo.com/BrandonLin/note/449340
http://blog.csdn.net/bingduanlbd/article/details/52049683
http://blog.csdn.net/zythy/article/details/18426347
http://blackproof.iteye.com/blog/1791980
http://www.360doc.com/content/15/0520/20/13670635_472030452.shtml
http://blog.csdn.net/gg584741/article/details/51712242
https://www.codelast.com/原创pig中的一些基础概念总结/
http://www.aboutyun.com/thread-6713-1-1.html
https://wenku.baidu.com/view/d530e025c5da50e2524d7fa7.html
http://blog.csdn.net/duchang110/article/details/17781865

原文地址:https://www.cnblogs.com/zhanxiage1994/p/7991992.html