MapReduce-MulitipleOutputs实现自己定义输出到多个文件夹

输入源数据例子:

Source1-0001
Source2-0002
Source1-0003
Source2-0004
Source1-0005
Source2-0006
Source3-0007
Source3-0008
描写叙述:
  • Source1开头的数据属于集合A。
  • Source2开头的数据属于集合B;
  • Source3开头的数据即属于集合A,也属于集合B。

输出要求:

  • 完整保留集合A数据(包括Source1、Source3开头数据)
  • 完整保留集合B数据(包括Source2、Source3开头数据)

程序实现:

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.mahout.common.AbstractJob;

import com.yhd.common.util.HadoopUtil;

/**
 * AbstractJob 是mahout的Job模板,能够不使用该模板,
 * 实则的核心部分在于MultipleOutputs部分
 * 
 * @author ouyangyewei
 *
 */
public class TestMultipleOutputsJob extends AbstractJob {
    @Override
    public int run(String[] args) throws Exception {
        addInputOption();
        addOutputOption();
        
        Map<String, List<String>> parseArgs = parseArguments(args);
        if(parseArgs==null){
            return -1;
        }
        
        HadoopUtil.delete(getConf(), getOutputPath());
        
        Configuration conf = new Configuration();
        conf.setInt("mapred.reduce.tasks", 4);
        conf.set("mapred.job.queue.name", "pms");
        conf.set("mapred.child.java.opts", "-Xmx3072m");
        conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05");

        Job job = new Job(new Configuration(conf));
        job.setJobName("TestMultipleOutputsJob");
        job.setJarByClass(TestMultipleOutputsJob.class);
        job.setMapperClass(MultipleMapper.class);
        job.setNumReduceTasks(0);
        FileInputFormat.setInputPaths(job, this.getInputPath());
        FileOutputFormat.setOutputPath(job, this.getOutputPath());
        
        /** 输出文件格式将为:Source1-m-**** */
        MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class);
        /** 输出文件格式将为:Source2-m-**** */
        MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class);
        
        boolean suceeded = job.waitForCompletion(true);
        if(!suceeded) {
            return -1;
        }
        return 0;
    }
    
    /**
     * 
     * @author ouyangyewei
     *
     */
    public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> {
        private MultipleOutputs<Text, Text> mos = null;

        @Override
        protected void setup(Context context
                             ) throws IOException, InterruptedException {
            mos = new MultipleOutputs<Text, Text>(context);
        }

        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {
            String line = value.toString();
            String[] tokenizer = line.split("-");

            if (tokenizer[0].equals("Source1")) {
                /** 集合A的数据 */
                mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
            } else if (tokenizer[0].equals("Source2")) {
                /** 集合B的数据 */
                mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
            }
            
            /** 集合A交集合B的数据 */
            if (tokenizer[0].equals("Source3")) {
                mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
                mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
            }
        }

        protected void cleanup(Context context
                               ) throws IOException, InterruptedException {
            mos.close();
        }
    }
    
    /**
     * @param args
     */
    public static void main(String[] args) {
        System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
            "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
        System.setProperty("javax.xml.parsers.SAXParserFactory", 
            "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
        
        TestMultipleOutputsJob instance = new TestMultipleOutputsJob();
        try {
            instance.run(args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用hadoop jar命令调度执行jar包代码:
hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob 
--input /user/pms/workspace/ouyangyewei/testMultipleOutputs 
--output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output

程序执行以后,输出的结果:
[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
Found 4 items
-rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
-rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
-rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS
-rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
Source1	0001
Source1	0003
Source1	0005
Source3	0007
Source3	0008

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
Source2	0002
Source2	0004
Source2	0006
Source3	0007
Source3	0008


补充于2014-12-18:

这样的方式的缺陷是会产生非常多类似Source1或Source2开头的子文件,一种非常好的方式就是指定baseOutputPath,将Source1开头的文件放在同一个文件夹中管理

对上述代码进行改写实现文件夹管理:

public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {
            String line = value.toString();
            String[] tokenizer = line.split("-");

            if (tokenizer[0].equals("Source1")) {
                /** 集合A的数据 */
                mos.write("Source1", 
                          new Text(tokenizer[0]), 
                          tokenizer[1],
                          "Source1/part");
            } else if (tokenizer[0].equals("Source2")) {
                /** 集合B的数据 */
                mos.write("Source2", 
                          new Text(tokenizer[0]), 
                          tokenizer[1],
                          "Source2/part");
            }
            
            /** 集合A交集合B的数据 */
            if (tokenizer[0].equals("Source3")) {
                mos.write("Source1", 
                          new Text(tokenizer[0]), 
                          tokenizer[1],
                          "Source1/part");
                
                mos.write("Source2", 
                          new Text(tokenizer[0]), 
                          tokenizer[1],
                          "Source2/part");
            }
        }
程序执行以后,输出的结果:
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output
Found 4 items
-rw-r--r--   3 pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/_SUCCESS
-rw-r--r--   3 pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/part-r-00000
drwxr-xr-x   - pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
drwxr-xr-x   - pms pms          0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
Found 1 items
-rw-r--r--   3 pms pms   65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1/part-r-00000

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2
Found 1 items
-rw-r--r--   3 pms pms   65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2/part-r-00000

能够參考下:http://dirlt.com/mapred.html

原文地址:https://www.cnblogs.com/yangykaifa/p/6773904.html