MapReduce进阶:多路径输入输出

前言

当我们得意于 MapReduce 从一个数据输入目录,把数据经过程序处理之后输出到另一个目录时。可能你正在错过一些更好的方案,因为 MapReduce 是支持多路径的输入与输出的。比如,你一个项目中的多个 Job 产生了多个输出路径,后面又需要另一个 Job 去处理这些不路径下的数据。你要怎么办?暂停程序后,手动处理?看完本文,我想你会给你的这种想法来上一记耳光。(说笑了,别当真)


版权说明

著作权归作者所有。
商业转载请联系作者获得授权,非商业转载请注明出处。
本文作者:Q-WHai
发表日期: 2016年6月18日
本文链接:http://blog.csdn.net/lemon_tree12138/article/details/51707283
来源:CSDN
更多内容:分类 >> 大数据之 Hadoop


多路径输入

写了这么多的 MapReudce 的程序,我想你一定已经了解了 MapReduce 是如何将输入的数据加载到程序中进行计算的了。一般情况下,我们是通过 FileInputFormat 类的 addInputPath 方法。看到这个 add 关键字,就可能产生很多联想,事实上这种联想是正确的。我们的确可以使用多个目录共同输入数据,并且还不止一种方式。

方式一

可以多添加几个输入目录,只要按照之前添加一个目录的方式,继续添加就 ok 了。就像下面这样:

FileInputFormat.addInputPath(job, new Path(inputPath_1));
FileInputFormat.addInputPath(job, new Path(inputPath_2));
FileInputFormat.addInputPath(job, new Path(inputPath_3));

这里如果你是一个重视代码细节的人,你肯定会重构这段代码:

private void setInputPathMothed1(Job job) throws IOException {
    FileInputFormat.addInputPath(job, new Path(inputPath_1));
    FileInputFormat.addInputPath(job, new Path(inputPath_2));
    FileInputFormat.addInputPath(job, new Path(inputPath_3));
}

方式二

如果你嫌上面的代码太多了,你还有另外一种选择:

FileInputFormat.addInputPaths(job, String.join(",", inputPath_1, inputPath_2, inputPath_3));

通过上面的代码,你可以一次性全部加载这些不同的目录,很方便。
当我们打开 FileInputFormat.addInputPaths() 的源码,看到 addInputPaths() 的代码:

/**
 * Add the given comma separated paths to the list of inputs for
 *  the map-reduce job.
 * 
 * @param job The job to modify
 * @param commaSeparatedPaths Comma separated paths to be added to
 *        the list of inputs for the map-reduce job.
 */
public static void addInputPaths(Job job, 
                               String commaSeparatedPaths
                               ) throws IOException {
    for (String str : getPathStrings(commaSeparatedPaths)) {
        addInputPath(job, new Path(str));
    }
}

这里看似方便的 FileInputFormat.addInputPaths(),其实只是 hadoop 给我们这些懒惰的开发者的进一层封装罢了。

方式三:

这种方式有一些特殊,也是我推荐你去使用的一种方式。你可以先看代码感受一下。

private void setInputPathMothed3(Job job) throws IOException {
    MultipleInputs.addInputPath(job, new Path(inputPath_1), TextInputFormat.class, CoreComputer.CoreMapper.class);
    MultipleInputs.addInputPath(job, new Path(inputPath_2), TextInputFormat.class, CoreComputer.CoreMapper.class);
    MultipleInputs.addInputPath(job, new Path(inputPath_3), TextInputFormat.class, CoreComputer.CoreMapper.class);
}

上面的代码中使用一个新的类 MultipleInputs。从类的命名上就可以看到这是一个专门处理多路径输入的问题的。在上面的代码中,我们看到 MultipleInputs.addInputPath() 多了两个不同的参数。进入源码可以看到他们分别是输入数据的格式,以及数据处理的 Mapper。
其实这两个参数是可以让你通过更加灵活的方式来处理数据。inputFormatClass 是可以让你输入不同类型的数据,mapperClass 是可以让你使用不同的 Mapper 来处理不同的数据。正因为这种可选择性,你的程序就更加的灵活了。不过上面的代码中,我并没有采用不同的 Mapper,如果你感兴趣,可以尝试一下。

小结

看到这里,你可能会有疑惑,难道在 Mapper 和 Reducer 里面就不用设置了么?是的,我们不需要调整 Mapper 和 Reducer 的核心代码就可以实现多路径输入。


多路径输出

核心代码修改

多路径的输出没有多路径输入那么多可选择的方案,且在多路径输出中,需要编写的代码量也比多路径输入要多一些。其中还包括了对 Reducer 的修改。详细的参考下面的代码。

public static class CoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private MultipleOutputs<Text, IntWritable> multipleOutputs = null;

    @Override
    protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
        ( ... 省略无关的 N 行 ... )
        multipleOutputs.write(splitKeys[1], new Text(splitKeys[0]), count);
    }

    @Override
    protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

上面的代码中,setup() 与 cleanup() 模块只是对 MultipleOutputs 的初始化与关闭操作,需要说明的地方不多。主要有以下两点:
1. 将 MultipleOutputs 的初始化放在 setup() 中,因为在 setup() 只会被调用一次,如果放在 reduce() 中,则 MultipleOutputs 可能被 reduce 方法初始化 N 次,而你全然不知;
2. 你需要在 cleanup() 方法中关闭 MultipleOutputs。通过源码我们了解到,关闭 MultipleOutputs,也就是关闭 RecordWriter,并且是一堆 RecordWriter,因为这里会有很多 reduce 被调用。

/**
 * Closes all the opened outputs.
 * 
 * This should be called from cleanup method of map/reduce task.
 * If overridden subclasses must invoke <code>super.close()</code> at the
 * end of their <code>close()</code>
 * 
 */
@SuppressWarnings("unchecked")
public void close() throws IOException, InterruptedException {
for (RecordWriter writer : recordWriters.values()) {
  writer.close(context);
}
}

还有一个是你需要重点关注的,那就是 reduce() 方法里的 multipleOutputs.write(…)。你需要把以前的 context.write(…) 替换成现在的这个。

调用代码修改

客户端调用方面,只需要在代码

FileOutputFormat.setOutputPath(job, new Path(outputPath));

之前添加多路径的设置,即可。如下:

public class ComputerClient {

    public static void main(String[] args) throws Exception {
        ( ... 省略无关的 N 行 ... )
    }

    private void execute() throws Exception {
        runFirstJob();
    }

    private int runFirstJob() throws Exception {
        ( ... 省略无关的 N 行 ... )        
        addNamedOutput(job);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    private void addNamedOutput(Job job) {
        addNamedOutput(job, "android");
        addNamedOutput(job, "hadoop");
        addNamedOutput(job, "ios");
        addNamedOutput(job, "java");
        addNamedOutput(job, "python");
    }

    private void addNamedOutput(Job job, String pathName) {
        MultipleOutputs.addNamedOutput(job, pathName, TextOutputFormat.class, Text.class, IntWritable.class);
    }
}

效果展示

通过上面的学习并编写正确的程序,这样就可以获得如下的效果。
这里写图片描述


工程源码下载

原文地址:https://www.cnblogs.com/fengju/p/6335977.html