Mapreduce设置多路径输入输出

转载请注明:http://www.cnblogs.com/demievil/p/4058279.html

我的github博客:http://demievil.github.io/

最近写Mapreduce程序时,想用到多路径输入,一次输入多个文件夹下的数据。并且希望输出路径也可以区分,修改输出文件的名称。查了相关资料,已实现。

  • 多路径输入

设置Mapreduce的输入是HDFS上多个文件夹下的数据,在main函数下稍作配置即可,示例代码如下:

public static void main(String[] args) throws Exception {        
        String ioPath[] = {
                "hdfs://10.1.2.3:8020/user/me/input/folder1",
                "hdfs://10.1.2.3:8020/user/me/input/folder2",
                "hdfs://10.1.2.3:8020/user/me/output"
                
        };
        
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://10.1.2.3:8020");
        conf.set("mapreduce.jobtracker.address", "10.1.2.3:8021");

        Job job = Job.getInstance(conf, "Job-Name");    
        job.setJarByClass(TestMain.class);
        job.setReducerClass(TestReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileSystem fs = FileSystem.get(conf);
        fs.delete(new Path(ioPath[2]),true);
        
        MultipleInputs.addInputPath(job, new Path(ioPath[0]), TextInputFormat.class, TagUrlMapper.class);
        MultipleInputs.addInputPath(job, new Path(ioPath[1]), TextInputFormat.class, TagUrlMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(ioPath[2]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

使用MultipleInputs.addInputPath()方法添加输入路径,输入类型和Mapper类。

  • 多路径输出

在reducer中配置多路径输出及输出文件名的开头,示例代码:

public class TestReducer extends Reducer<Text, Text, Text, Text> {
    private  MultipleOutputs<Text, Text> mos;
   @Override
    protected void setup(Context context) 
                   throws IOException, InterruptedException {
      super.setup(context);
      mos = new MultipleOutputs<Text, Text>(context);
    }
    
   @Override
   protected void cleanup(Context context) 
                  throws IOException, InterruptedException {
       super.cleanup(context);
       mos.close();
   }    
  @Override
  public void reduce(Text key, Iterable<Text> values,
      Context context)throws IOException, InterruptedException {
    
      while(values.iterator().hasNext()){
          tag = values.iterator().next();
          if (......){
              mos.write(key, new Text("taged"), "taged/taged");
          }
          else{
              mos.write(key, new Text("untaged"), "untaged/untaged");
          }
      }
}

使用MultipleOutputs类来控制输出路径。重写Reducer的setup()和cleanup()方法,如示例代码所示。

输出路径示例如下:image

我的github博客:http://demievil.github.io/

原文地址:https://www.cnblogs.com/kodyan/p/4058279.html