hadoop用mutipleInputs实现map读取不同格式的文件

mapmap读取不同格式的文件这个问题一直就有,之前的读取方式是在map里获取文件的名称,依照名称不同分不同的方式读取,比如以下的方式

//取文件名
InputSplit inputSplit = context.getInputSplit();
String fileName = ((FileSplit) inputSplit).getPath().toString();
			
if(fileName.contains("track")) {
} else if(fileName.contains("complain3")) {
}

这样的方式有两个问题,一是在每读入一条数据的时候都要获取文件的名称,二是要依据名称推断依照什么样的格式进行解析,显得非常丑陋,事实上hadoop提供了解决问题的方法

使用mutipleInputs来解决

public class MutipleInputsTest {
	
	private static String complain = "/dsap/rawdata/operate/complain3/";
	private static String csOperate = "/dsap/rawdata/creditSystemSearchLog/";
	private static String output = "/dsap/rawdata/mutipleInputsTest/result1";
	
	public static class Mapper1 
    extends Mapper<Object, Text, Text, Text>{
		
		public void map(Object key, Text value, Context context
                 ) throws IOException, InterruptedException {
			Counter counter = context.getCounter("myCounter", "counter1");
			counter.increment(1l);
			
		}
	}
	
	public static class Mapper2 
    extends Mapper<Object, Text, Text, Text>{
		
		public void map(Object key, Text value, Context context
                 ) throws IOException, InterruptedException {
			Counter counter = context.getCounter("myCounter", "counter2");
			counter.increment(1l);
			
		}
	}
	 
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = new Job(conf, "mutipleInputsTest");
		job.setJarByClass(MutipleInputsTest.class);
		
		MultipleInputs.addInputPath(job, new Path(complain + "20141217"), TextInputFormat.class, Mapper1.class);
		MultipleInputs.addInputPath(job, new Path(csOperate + "20141217"), TextInputFormat.class, Mapper2.class);
		FileOutputFormat.setOutputPath(job, new Path(output));
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.waitForCompletion(true);
		
		/**获取自己定义counter的大小。假设等于质心的大小。说明质心已经不会发生变化了,则程序停止迭代*/
		long counter1 = job.getCounters().getGroup("myCounter").findCounter("counter1").getValue();
		long counter2 = job.getCounters().getGroup("myCounter").findCounter("counter2").getValue();
		System.out.println("counter:" + counter1 + "	" + counter2);

	}
}

看一下执行结果


能够看到两个不同格式的文件进入了两个不同的mapper进行处理。这样在两个mapper里就能够仅仅针对某一种格式的文件进行解析了

原文地址:https://www.cnblogs.com/zsychanpin/p/6919791.html