MR中使用sequnceFIle输入文件

转换原始数据为块压缩的SequenceFIle

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.hadoop.compression.lzo.LzoCodec;

public class ToSeqFile extends Configured implements Tool {
    @Override
    public int run(String[] arg0) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        Configuration conf=getConf();
        FileSystem fs = FileSystem.get(conf);
    
        FileInputFormat.setInputPaths(job, "/home/hadoop/tmp/tmplzo.txt");
        Path outDir=new Path("/home/hadoop/tmp/tmplzo.out");
        fs.delete(outDir,true);
        FileOutputFormat.setOutputPath(job, outDir);
        
        //job.setMapperClass(IndentityMapper);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        //设置OutputFormat为SequenceFileOutputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //允许压缩
         SequenceFileOutputFormat.setCompressOutput(job, true);
         //压缩算法为gzip
         SequenceFileOutputFormat.setOutputCompressorClass(job, LzoCodec.class);
        //压缩模式为BLOCK
         SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);


        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ToSeqFile(), args);
        System.exit(res);
    }
}

MR处理压缩后的sequenceFile

import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.ContextFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
//import org.apache.hadoop.mapred.DeprecatedLzoTextInputFormat;






import com.hadoop.compression.lzo.LzoCodec;
import com.hadoop.mapreduce.LzoTextInputFormat;

public class compress extends Configured implements Tool {
	private static final Log log = LogFactory.getLog(compress.class);

	private static class ProvinceMapper extends
			Mapper<Object, Text, Text, Text> {
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//System.out.println(value);
			
			// InputSplit inputSplit = context.getInputSplit();
			//String fileName = ((FileSplit) inputSplit).getPath().toString();
			
			//System.out.println(fileName);
			context.write(value, value);
		}
	}

	private static class ProvinceReducer extends
			Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text va : values) {
				// System.out.println("reduce " + key);
				context.write(key, key);
			}
		}
	}

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new compress(), args);
	}

	public static final String REDUCES_PER_HOST = "mapreduce.sort.reducesperhost";

	@Override
	public int run(String[] args) throws Exception {
		log.info("我的服务查询开始.....................................");
        
		long beg = System.currentTimeMillis();
		int result = 0;
		Configuration conf = new Configuration();

		conf.set(
				"io.compression.codecs",
				"org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");
		conf.set("io.compression.codec.lzo.class",
				"com.hadoop.compression.lzo.LzoCodec");
		
		
		conf.setBoolean("mapreduce.map.output.compress", true);
	    conf.setClass("mapreduce.map.output.compression.codec", SnappyCodec.class, CompressionCodec.class);
	   // conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); // 是否压缩输出
	    conf.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class, CompressionCodec.class);

		String[] argArray = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		if (argArray.length != 2) {
			System.err.println("Usage: compress <in> <out>");
			System.exit(1);
		}

		// Hadoop总共有5个Job.java
		// /hadoop-2.0.0-cdh4.5.0/src/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
		Job job = new Job(conf, "compress");
		job.setJarByClass(compress.class);
		job.setMapperClass(ProvinceMapper.class);
		job.setReducerClass(ProvinceReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		 //job.setInputFormatClass(LzoTextInputFormat.class); // TextInputFormat
		// MyFileinput
		
		// 使用lzo索引文件作为输入文件
		// job.setInputFormatClass(LzoTextInputFormat.class);
		job.setInputFormatClass(SequenceFileInputFormat.class);
		
		// SequenceFileOutputFormat.set(job, LzoCodec.class);
		
		// 测试块大小
		// FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
		// FileInputFormat.setMinInputSplitSize(job, 301349250);
		// FileInputFormat.setMaxInputSplitSize(job, 10000);

		// 推测执行的开关 另外还有针对map和reduce的对应开关
		// job.setSpeculativeExecution(false);
		FileInputFormat.addInputPath(job, new Path(argArray[0]));
		FileOutputFormat.setOutputPath(job, new Path(argArray[1]));

		String uri = argArray[1];
		Path path = new Path(uri);
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		if (fs.exists(path)) {
			fs.delete(path);
		}

		result = job.waitForCompletion(true) ? 0 : 1;
		
//		try {
//			result = job.waitForCompletion(true) ? 0 : 1;
//		} catch (ClassNotFoundException | InterruptedException e) {
//			e.printStackTrace();
//		}
		long end = (System.currentTimeMillis() -beg) ;
        System.out.println("耗时:" + end);
		return result;
	}
}

测试结果

文件大小 544M(未使用任何压缩)
耗时:73805

使用 seqencefile(block使用lzo压缩, 中间结果使用snappy压缩)

44207s

原文地址:https://www.cnblogs.com/chengxin1982/p/3956920.html