Apache Pig处理数据示例

Apache Pig是一个高级过程语言,可以调用MapReduce查询大规模的半结构化数据集。
样例执行的环境为cloudera的单节点虚拟机

读取结构数据中的指定列

  • 在hdfs上放置一个文件
    [cloudera@quickstart ~]$ hdfs dfs -put /etc/passwd /user/cloudera/
  • 启动Apache Pig
    [cloudera@quickstart ~]$ pig -x mapreduce
    grunt>
  • 使用Pig Latin进行数据提取
    grunt> A = load '/user/cloudera/passwd' using PigStorage(':');
    grunt> B = foreach A generate $0, $4, $5 ;
    grunt> dump B;

    (root,root,/root)
    (bin,bin,/bin)
    (daemon,daemon,/sbin)
    (adm,adm,/var/adm)
    (lp,lp,/var/spool/lpd)
    (sync,sync,/sbin)
    (shutdown,shutdown,/sbin)
    (halt,halt,/sbin)
    (mail,mail,/var/spool/mail)
    (uucp,uucp,/var/spool/uucp)
    (operator,operator,/root)
    (games,games,/usr/games)
    (gopher,gopher,/var/gopher)
    (ftp,FTP User,/var/ftp)
    (nobody,Nobody,/)
    (dbus,System message bus,/)
    (vcsa,virtual console memory owner,/dev)
    (abrt,,/etc/abrt)
    (haldaemon,HAL daemon,/)
    (ntp,,/etc/ntp)
    (saslauth,Saslauthd user,/var/empty/saslauth)
    (postfix,,/var/spool/postfix)
    (sshd,Privilege-separated SSH,/var/empty/sshd)
    (tcpdump,,/)
    (zookeeper,ZooKeeper,/var/lib/zookeeper)
    (cloudera-scm,Cloudera Manager,/var/lib/cloudera-scm-server)
    (rpc,Rpcbind Daemon,/var/cache/rpcbind)
    (apache,Apache,/var/www)
    (solr,Solr,/var/lib/solr)
    (hbase,HBase,/var/lib/hbase)
    (hdfs,Hadoop HDFS,/var/lib/hadoop-hdfs)
    (yarn,Hadoop Yarn,/var/lib/hadoop-yarn)
    (hive,Hive,/var/lib/hive)
    (sentry,Sentry,/var/lib/sentry)
    (impala,Impala,/var/lib/impala)
    (mapred,Hadoop MapReduce,/var/lib/hadoop-mapreduce)
    (hue,Hue,/usr/lib/hue)
    (flume,Flume,/var/lib/flume-ng)
    (spark,Spark,/var/lib/spark)
    (sqoop,Sqoop,/var/lib/sqoop)
    (sqoop2,Sqoop 2 User,/var/lib/sqoop2)
    (oozie,Oozie User,/var/lib/oozie)
    (mysql,MySQL Server,/var/lib/mysql)
    (kms,Hadoop KMS,/var/lib/hadoop-kms)
    (llama,Llama,/var/lib/llama)
    (httpfs,Hadoop HTTPFS,/var/lib/hadoop-httpfs)
    (gdm,,/var/lib/gdm)
    (rtkit,RealtimeKit,/proc)
    (pulse,PulseAudio System Daemon,/var/run/pulse)
    (avahi-autoipd,Avahi IPv4LL Stack,/var/lib/avahi-autoipd)
    (cloudera,,/home/cloudera)
    grunt> 
    ……

其中dump的作用是将内容转存到屏幕。

  • 将结果转存到HDFS中
    grunt> store B into 'userinfo.out';

对数据进行分组(group by)操作

  • 读入文件,并使用AS来制定列名
    grunt> passwd = load '/user/cloudera/passwd' using PigStorage(':') AS (user:chararray,passwd:chararray, uid:int, gid:int, userinfo:chararray, home:chararray, shell:chararray);
  • 进行GROUP BY操作
    grunt> grp_shell = GROUP passwd BY shell;
    grunt > dump grp_shell;
    (/bin/bash,{(root,x,0,0,root,/root,/bin/bash),(httpfs,x,481,480,Hadoop HTTPFS,/var/lib/hadoop-httpfs,/bin/bash),(llama,x,500,481,Llama,/var/lib/llama,/bin/bash),(kms,x,482,482,Hadoop KMS,/var/lib/hadoop-kms,/bin/bash),(mysql,x,27,27,MySQL Server,/var/lib/mysql,/bin/bash),(mapred,x,489,488,Hadoop MapReduce,/var/lib/hadoop-mapreduce,/bin/bash),(impala,x,490,489,Impala,/var/lib/impala,/bin/bash),(yarn,x,493,492,Hadoop Yarn,/var/lib/hadoop-yarn,/bin/bash),(hdfs,x,494,493,Hadoop HDFS,/var/lib/hadoop-hdfs,/bin/bash),(cloudera,x,501,501,,/home/cloudera,/bin/bash)})
    (/bin/sync,{(sync,x,5,0,sync,/sbin,/bin/sync)})
    ……
  • 对分组结果进行计数(count)
    grunt> counts = FOREACH grp_shell GENERATE group, COUNT(passwd);
    grunt> DUMP counts;

    (/bin/bash,10)
    (/bin/sync,1)
    (/bin/false,1)
    (/sbin/halt,1)
    (/sbin/nologin,37)
    (/sbin/shutdown,1)

模糊匹配行

  • 使用FILTER过滤匹配行
    grunt> bashs = FILTER passwd by shell MATCHES '.*bash+.*';
    grunt> DUMP bashs;
  • 对结果进行转存
    grunt> store bashs into 'test_bashs.out';
    [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/test_bashs.out
    Found 2 items
    -rw-r--r--   1 cloudera cloudera          0 2017-02-21 18:41 /user/cloudera/test_bashs.out/_SUCCESS
    -rw-r--r--   1 cloudera cloudera        531 2017-02-21 18:41 /user/cloudera/test_bashs.out/part-m-00000
    [cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/test_bashs.out/part-m-00000
    root    x       0       0       root    /root   /bin/bash
    hdfs    x       494     493     Hadoop HDFS     /var/lib/hadoop-hdfs    /bin/bash
    yarn    x       493     492     Hadoop Yarn     /var/lib/hadoop-yarn    /bin/bash
    impala  x       490     489     Impala  /var/lib/impala /bin/bash
    mapred  x       489     488     Hadoop MapReduce        /var/lib/hadoop-mapreduce       /bin/bash
    mysql   x       27      27      MySQL Server    /var/lib/mysql  /bin/bash
    kms     x       482     482     Hadoop KMS      /var/lib/hadoop-kms     /bin/bash
    llama   x       500     481     Llama   /var/lib/llama  /bin/bash
    httpfs  x       481     480     Hadoop HTTPFS   /var/lib/hadoop-httpfs  /bin/bash
    cloudera        x       501     501             /home/cloudera  /bin/bash

总结

  • pig处理方式类似于awk或者sed的行处理方式,其优势在于可以调用MapReduce对HDFS中的文件进行处理和分析,降低了使用MR的门槛
  • 筛选过程中支持正则表达式,增加了处理数据的灵活性
  • 对于需要定期执行的操作,可以写成pig脚本,使用pig -x mapreduce exec script.pig的方式调用就可以了
原文地址:https://www.cnblogs.com/shenfeng/p/apache_pig_example.html