package com.mengyao.hadoop.mapreduce; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 将普通文本文件并生成SequenceFile格式的文件 * @author mengyao * */ public class SequenceFileOutputFormatApp extends Configured implements Tool { private static final String APP_NAME = SequenceFileOutputFormatApp.class.getSimpleName(); static class SequenceFileOutputFormatAppMapper extends Mapper<LongWritable, Text, LongWritable, Text> { private LongWritable outputKey; private Text outputValue; @Override protected void setup( Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { this.outputKey = new LongWritable(); this.outputValue = new Text(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { this.outputKey.set(key.get()); this.outputValue.set(value.toString()); context.write(outputKey, outputValue); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, APP_NAME); job.setJarByClass(SequenceFileOutputFormatApp.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPaths(job, args[0]); job.setMapperClass(SequenceFileOutputFormatAppMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); /** * 关于输出SequenceFile文件压缩有三种,如下 * 1、不压缩 * SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE); * 2、单条记录级压缩(基于每条记录,只压缩值,不压缩键。),与不压缩大小基本相同。 * SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.RECORD); * SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); * 3、块级压缩(基于一次性压缩多条记录,效率较高) * SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); * SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); */ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE); SequenceFileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true)?0:1; } public static int createJob(String[] args) { Configuration conf = new Configuration(); conf.set("dfs.datanode.socket.write.timeout", "7200000"); conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); int status = 0; try { status = ToolRunner.run(conf, new SequenceFileOutputFormatApp(), args); } catch (Exception e) { e.printStackTrace(); } return status; } public static void main(String[] args) throws Exception { args = new String[]{ "/mapreduces/bookOutline.txt", "/mapreduces/"+APP_NAME+"/"+new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())}; if (args.length != 2) { System.out.println("Usage:"); } else { int status = createJob(args); System.exit(status); } } }