MapReduce的SequenceFileOutputFormat使用

package com.mengyao.hadoop.mapreduce;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 将普通文本文件并生成SequenceFile格式的文件
 * @author mengyao
 *
 */
public class SequenceFileOutputFormatApp extends Configured implements Tool {
    
    private static final String APP_NAME = SequenceFileOutputFormatApp.class.getSimpleName();
    
    static class SequenceFileOutputFormatAppMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        
        private LongWritable outputKey;
        private Text outputValue;
        
        @Override
        protected void setup(
                Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                throws IOException, InterruptedException {
            this.outputKey = new LongWritable();
            this.outputValue = new Text();
        }
        
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                throws IOException, InterruptedException {
            this.outputKey.set(key.get());
            this.outputValue.set(value.toString());
            context.write(outputKey, outputValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, APP_NAME);
        job.setJarByClass(SequenceFileOutputFormatApp.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPaths(job, args[0]);
        job.setMapperClass(SequenceFileOutputFormatAppMapper.class);

        job.setNumReduceTasks(0);
        
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        
        /**
         * 关于输出SequenceFile文件压缩有三种,如下
         *         1、不压缩
         *            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE);
         *        2、单条记录级压缩(基于每条记录,只压缩值,不压缩键。),与不压缩大小基本相同。
         *            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.RECORD);
         *            SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
         *        3、块级压缩(基于一次性压缩多条记录,效率较高)
         *            SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
         *            SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
         */
        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE);
        
        SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true)?0:1;
    }
    
    public static int createJob(String[] args) {
        Configuration conf = new Configuration();
        conf.set("dfs.datanode.socket.write.timeout", "7200000");
        conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
        conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
        int status = 0;
        
        try {
            status = ToolRunner.run(conf, new SequenceFileOutputFormatApp(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        return status;
    }
    
    public static void main(String[] args) throws Exception {
        args = new String[]{
                "/mapreduces/bookOutline.txt", 
                "/mapreduces/"+APP_NAME+"/"+new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())};
        if (args.length != 2) {
            System.out.println("Usage:");
        } else {
            int status = createJob(args);
            System.exit(status);
        }
        
    }
}
原文地址:https://www.cnblogs.com/mengyao/p/4865598.html