hadoop streaming 多路输出 [转载]

转载 http://www.cnblogs.com/shapherd/archive/2012/12/21/2827860.html

hadoop 支持reduce多路输出的功能,一个reduce可以输出到多个part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在输出<key,value>对的时候,在value的后面追加"#X"后缀,比如#A,输出的文件就是part-00000-A,不同的后缀可以把key,value输出到不同的文件中,方便做输出类型分类, #X仅仅用做指定输出文件后缀, 不会体现到输出的内容中

使用方法

启动脚本中需要指定-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat或者-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat, 输出就会按照多路输出的方式进行分文件输出

所有标准输出的value中都要加上 #X后缀,X代表A-Z, 不然会报invalid suffix错误 

$HADOOP_HOME_PATH/bin/hadoop streaming 
      -Dhadoop.job.ugi="$HADOOP_JOB_UGI" 
      -file ./map.sh 
      -file ./red.sh 
      -file ./config.sh 
      -mapper "sh -x map.sh" 
      -reducer "sh -x red.sh" 
      -input $NEW_INPUT_PATH 
      -input $OLD_INPUT_PATH 
      -output  $OUTPUT_PATH 
      -jobconf stream.num.map.output.key.fields=1 
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 
      -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat 
      -jobconf mapred.job.name="test-shapherd-dist-diff" 
      -jobconf mapred.job.priority=HIGH 
      -jobconf mapred.job.map.capacity=100 
      -jobconf mapred.job.reduce.capacity=100 
      -jobconf mapred.reduce.tasks=3

在red脚本中可以所以的输出都加上后缀, 这样输出就是分part的了,比如大数据diff对比的脚本

map.sh如下:

source ./config.sh
 
awk 'BEGIN{
}
{
    if(match("'${map_input_file}'","'$OLD_INPUT_PATH'"))
    {
        print $0"	"0
    next
    }
    if(match("'${map_input_file}'","'$NEW_INPUT_PATH'"))
    print $0"	"1
}'
 
exit 0

red.sh如下

awk  -F"	" 'BEGIN{
    key=""
        flag=0
        num=0
        old_num=0
        new_num=0
        diff_num=0
}
{
    if($NF == "0")
        old_num++
    else
        new_num++
    if($1 != key)
    {
        if(key != "")
        {
            if(num <= 1)
            {
                diff_num++
                if(flag == "0")
                    print $0"#A"
                else
                    print $0"#B"
            }
        }
        key=$1
        flag=$NF
        num=1
        next
    }
 
    if(key == $1)
    {
        num++
        next   
    }
 
}
END{
        if(num  == 1)
        {
            if(flag == "0")
                print $0"#A"
            else
                print $0"#B"
        }
 
        print old_num"	shapherd#C"
        print new_num"	shapherd#D"
        print diff_num"	shapherd#E"
}'
 
 
exit 0

我的两个大数据没有diff, 所以输出就是:

part-00000-C
part-00000-D
part-00000-E
part-00001-C
part-00001-D
part-00001-E
part-00002-C
part-00002-D
part-00002-E

没有A和B结尾的

注意事项

    • 多路输出最多支持26路, 也就是字母只能是A-Z范围。
    • reduce的输入key和value的分隔符默认是 , 如果输出中没有 ,reduce脚本会把整行当作key, value就是空的,这时如果加了#X,会报invalid suffix错误,因为#X作为了key的一部分,这种问题一种是保证你的key和value是按照 分隔的, 一种是指定自己想要的分隔符。
原文地址:https://www.cnblogs.com/sailrancho/p/4930156.html