Hadoop 实现多文件输出

比如word.txt内容如下:

aaa bbb aba abc

bba bbd bbbc

cc ccd cce

要求按单词的首字母区分单词并分文件输出

代码如下:

LineRecordWriter

package com.hadoop.multi;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class LineRecordWriter<K, V> extends RecordWriter<K, V> {

	private static final String utf8 = "UTF-8";

	private static final byte[] newline;

	static {
		try {
			newline = "n".getBytes(utf8);
		} catch (UnsupportedEncodingException uee) {
			throw new IllegalArgumentException("can't find " + utf8
					+ " encoding");
		}
	}

	protected DataOutputStream out;
	private final byte[] keyValueSeparator;

	public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
		this.out = out;
		try {
			this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
		} catch (UnsupportedEncodingException uee) {
			throw new IllegalArgumentException("can't find " + utf8
					+ " encoding");
		}
	}

	public LineRecordWriter(DataOutputStream out) {
		this(out, "t");
	}

	private void writeObject(Object o) throws IOException {
		if (o instanceof Text) {
			Text to = (Text) o;
			out.write(to.getBytes(), 0, to.getLength());
		} else {
			out.write(o.toString().getBytes(utf8));
		}
	}

	public synchronized void write(K key, V value) throws IOException {
		boolean nullKey = key == null || key instanceof NullWritable;
		boolean nullValue = value == null || value instanceof NullWritable;
		if (nullKey && nullValue) {
			return;
		}
		if (!nullKey) {
			writeObject(key);
		}
		if (!(nullKey || nullValue)) {
			out.write(keyValueSeparator);
		}
		if (!nullValue) {
			writeObject(value);
		}
		out.write(newline);
	}

	public synchronized void close(TaskAttemptContext context)
			throws IOException {
		out.close();
	}

}


MultipleOutputFormat

package com.hadoop.multi;
   
import java.io.DataOutputStream;   
import java.io.IOException;   
import java.util.HashMap;   
import java.util.Iterator;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.fs.FSDataOutputStream;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Writable;   
import org.apache.hadoop.io.WritableComparable;   
import org.apache.hadoop.io.compress.CompressionCodec;   
import org.apache.hadoop.io.compress.GzipCodec;   
import org.apache.hadoop.mapreduce.OutputCommitter;   
import org.apache.hadoop.mapreduce.RecordWriter;   
import org.apache.hadoop.mapreduce.TaskAttemptContext;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.util.ReflectionUtils; 

public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
        extends FileOutputFormat<K, V> {
	
	private MultiRecordWriter writer = null; 
	
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,   
		   InterruptedException {   
		if (writer == null) {   
		    writer = new MultiRecordWriter(job, getTaskOutputPath(job));   
		}   
		return writer;   
	}
	
	private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {   
        Path workPath = null;   
        OutputCommitter committer = super.getOutputCommitter(conf);   
        if (committer instanceof FileOutputCommitter) {   
            workPath = ((FileOutputCommitter) committer).getWorkPath();   
        } else {   
            Path outputPath = super.getOutputPath(conf);   
            if (outputPath == null) {   
                throw new IOException("Undefined job output-path");   
            }   
            workPath = outputPath;   
        }   
        return workPath;   
    } 
	
	protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);   

	public class MultiRecordWriter extends RecordWriter<K, V> {   
        
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;   
        private TaskAttemptContext job = null;   
           
        private Path workPath = null;   
        public MultiRecordWriter(TaskAttemptContext job, Path workPath) {   
            super();   
            this.job = job;   
            this.workPath = workPath;   
            recordWriters = new HashMap<String, RecordWriter<K, V>>();   
        }   
        @Override   
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {   
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();   
            while (values.hasNext()) {   
                values.next().close(context);   
            }   
            this.recordWriters.clear();   
        }   
        @Override   
        public void write(K key, V value) throws IOException, InterruptedException {   
            //得到输出文件名   
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());   
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);   
            if (rw == null) {   
                rw = getBaseRecordWriter(job, baseName);   
                this.recordWriters.put(baseName, rw);   
            }   
            rw.write(key, value);   
        }   
          
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)   
                throws IOException, InterruptedException {   
            Configuration conf = job.getConfiguration();   
            boolean isCompressed = getCompressOutput(job);   
            String keyValueSeparator = ",";   
            RecordWriter<K, V> recordWriter = null;   
            if (isCompressed) {   
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,   
                        GzipCodec.class);   
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);   
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());   
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);   
                recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec   
                        .createOutputStream(fileOut)), keyValueSeparator);   
            } else {   
                Path file = new Path(workPath, baseName);   
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);   
                recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);   
            }   
            return recordWriter;   
        }   
    }   
	
}


MultiFileOutPut

package com.hadoop.multi;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.hadoop.multi.MultipleOutputFormat;

public class MultiFileOutPut {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  
  public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> {   
      @Override   
      protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) {   
          char c = key.toString().toLowerCase().charAt(0);   
          if (c >= 'a' && c <= 'z') {   
              return c + ".txt";   
          }   
          return "other.txt";   
      }   
  }  

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(MultiFileOutPut.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setOutputFormatClass(AlphabetOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


原文地址:https://www.cnblogs.com/jiangu66/p/3187108.html