工作采坑札记:2. Hadoop中MultipleInputs的使用陷阱

1. 背景

近日在一个Hadoop项目中使用MultipleInputs增加多输入文件时,发现相同路径仅会加载一次,导致后续的统计任务严重失真。本博文旨在记录异常的排查及解决方案。

2. 情景重现

(1) 准备简版的输入文件test,文件内容为"i am ws",输入的HDFS路径为/work/justTest/test

(2) 源码信息如下,主要是wordCount实现,其中/work/justTest/test作为输入路径,被输入两次:

  1 package com.ws.test;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 15 
 16 public class MutilInputTest {
 17 
 18 	public static void main(String[] args) {
 19 		testMultiInputs();
 20 	}
 21 
 22 	/**
 23 	 * 测试方法
 24 	 */
 25 	public static void testMultiInputs() {
 26 
 27 		Configuration conf = new Configuration();
 28 
 29 		conf.set("mapreduce.job.queuename", "default");
 30 		conf.setBoolean("mapreduce.map.output.compress", true);
 31 		conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
 32 		conf.setInt("mapreduce.task.timeout",0);
 33 		conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f);
 34 
 35 		String input = "/work/justTest/test";
 36 		try {
 37 			createMultiInputsTestJob(conf,
 38 					input , Test1Mapper.class,
 39 					input , Test2Mapper.class,
 40 					"/work/justTest/temp", 2, TestReduce.class)
 41 					.waitForCompletion(true);
 42 		} catch (Exception e) {
 43 			e.printStackTrace();
 44 		}
 45 	}
 46 
 47 	/**
 48 	 * 任务构建
 49 	 * @param conf
 50 	 * @param input1
 51 	 * @param mapper1
 52 	 * @param input2
 53 	 * @param mapper2
 54 	 * @param outputDir
 55 	 * @param reduceNum
 56 	 * @param reducer
 57 	 * @return
 58 	 */
 59 	static Job createMultiInputsTestJob(Configuration conf,
 60 			String input1, Class<? extends Mapper> mapper1,
 61 			String input2, Class<? extends Mapper> mapper2,
 62 			String outputDir,
 63 			int reduceNum, Class<? extends Reducer> reducer) {
 64 		try {
 65 			Job job = new Job(conf);
 66 			job.setJobName("MultiInputsTest");
 67 			job.setJarByClass(MutilInputTest.class);
 68 
 69 			job.setNumReduceTasks(reduceNum);
 70 			job.setReducerClass(reducer);
 71 
 72 			job.setInputFormatClass(TextInputFormat.class);
 73 			MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, mapper1);
 74 			MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, mapper2);
 75 
 76 			Path outputPath = new Path(outputDir);
 77 			outputPath.getFileSystem(conf).delete(outputPath, true);
 78 
 79 			job.setOutputFormatClass(TextOutputFormat.class);
 80 			TextOutputFormat.setOutputPath(job, outputPath);
 81 
 82 			job.setMapOutputKeyClass(Text.class);
 83 			job.setMapOutputValueClass(Text.class);
 84 
 85 			job.setOutputKeyClass(Text.class);
 86 			job.setOutputValueClass(Text.class);
 87 
 88 			return job;
 89 		} catch (Exception e) {
 90 			return null;
 91 		}
 92 	}
 93 
 94 	/**
 95 	 * Mapper类
 96 	 *
 97 	 */
 98 	static class Test1Mapper extends Mapper<LongWritable, Text, Text, Text> {
 99 		Context context;
100 
101 		String type;
102 
103 		@Override
104 		protected void setup(Context context) throws IOException,
105 				InterruptedException {
106 			this.context = context;
107 			this.type = getDataType();
108 			super.setup(context);
109 		}
110 
111 		@Override
112 		protected void map(LongWritable key, Text value, Context context)
113 				throws IOException, InterruptedException {
114 			String[] words = value.toString().split("");
115 			for(String word : words){
116 				context.getCounter(this.type+"_map_total", "input").increment(1);
117 				context.write(new Text(word), new Text("1"));
118 			}
119 		}
120 
121 		protected String getDataType(){
122 			return "test1";
123 		}
124 	}
125 
126 	/**
127 	 * Mapper类继承
128 	 *
129 	 */
130 	static class Test2Mapper extends Test1Mapper{
131 		@Override
132 		protected String getDataType() {
133 			return "test2";
134 		}
135 	}
136 
137 	/**
138 	 * Reduce类
139 	 *
140 	 */
141 	static class TestReduce extends Reducer<Text, Text, Text, Text> {
142 		@Override
143 		protected void reduce(Text key, Iterable<Text> values, Context context)
144 				throws IOException, InterruptedException {
145 			int total = 0;
146 			while(values.iterator().hasNext()){
147 				total += Integer.parseInt(values.iterator().next().toString());
148 			}
149 			context.getCounter("reduce_total", key.toString()+"_"+total).increment(1);
150 		}
151 	}
152 
153 }
154 
View Code

(3) 任务执行记录如下:

  1 18/08/12 21:33:57 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
  2 18/08/12 21:33:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  3 18/08/12 21:33:59 INFO input.FileInputFormat: Total input paths to process : 1
  4 18/08/12 21:33:59 INFO mapreduce.JobSubmitter: number of splits:1
  5 18/08/12 21:34:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39623
  6 18/08/12 21:34:00 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39623
  7 18/08/12 21:34:00 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39623/
  8 18/08/12 21:34:00 INFO mapreduce.Job: Running job: job_1527582903778_39623
  9 18/08/12 21:34:06 INFO mapreduce.Job: Job job_1527582903778_39623 running in uber mode : false
 10 18/08/12 21:34:06 INFO mapreduce.Job:  map 0% reduce 0%
 11 18/08/12 21:34:12 INFO mapreduce.Job:  map 100% reduce 0%
 12 18/08/12 21:34:17 INFO mapreduce.Job:  map 100% reduce 50%
 13 18/08/12 21:34:22 INFO mapreduce.Job:  map 100% reduce 100%
 14 18/08/12 21:34:22 INFO mapreduce.Job: Job job_1527582903778_39623 completed successfully
 15 18/08/12 21:34:22 INFO mapreduce.Job: Counters: 53
 16 	File System Counters
 17 		FILE: Number of bytes read=64
 18 		FILE: Number of bytes written=271730
 19 		FILE: Number of read operations=0
 20 		FILE: Number of large read operations=0
 21 		FILE: Number of write operations=0
 22 		HDFS: Number of bytes read=263
 23 		HDFS: Number of bytes written=0
 24 		HDFS: Number of read operations=9
 25 		HDFS: Number of large read operations=0
 26 		HDFS: Number of write operations=4
 27 	Job Counters
 28 		Launched map tasks=1
 29 		Launched reduce tasks=2
 30 		Rack-local map tasks=1
 31 		Total time spent by all maps in occupied slots (ms)=14760
 32 		Total time spent by all reduces in occupied slots (ms)=49344
 33 		Total time spent by all map tasks (ms)=3690
 34 		Total time spent by all reduce tasks (ms)=6168
 35 		Total vcore-seconds taken by all map tasks=3690
 36 		Total vcore-seconds taken by all reduce tasks=6168
 37 		Total megabyte-seconds taken by all map tasks=15114240
 38 		Total megabyte-seconds taken by all reduce tasks=50528256
 39 	Map-Reduce Framework
 40 		Map input records=1
 41 		Map output records=3
 42 		Map output bytes=14
 43 		Map output materialized bytes=48
 44 		Input split bytes=255
 45 		Combine input records=0
 46 		Combine output records=0
 47 		Reduce input groups=3
 48 		Reduce shuffle bytes=48
 49 		Reduce input records=3
 50 		Reduce output records=0
 51 		Spilled Records=6
 52 		Shuffled Maps =2
 53 		Failed Shuffles=0
 54 		Merged Map outputs=2
 55 		GC time elapsed (ms)=183
 56 		CPU time spent (ms)=3150
 57 		Physical memory (bytes) snapshot=1009094656
 58 		Virtual memory (bytes) snapshot=24295927808
 59 		Total committed heap usage (bytes)=2306867200
 60 	Shuffle Errors
 61 		BAD_ID=0
 62 		CONNECTION=0
 63 		IO_ERROR=0
 64 		WRONG_LENGTH=0
 65 		WRONG_MAP=0
 66 		WRONG_REDUCE=0
 67 	File Input Format Counters
 68 		Bytes Read=0
 69 	File Output Format Counters
 70 		Bytes Written=0
 71 	reduce_total
 72 		am_1=1
 73 		i_1=1
 74 		ws_1=1
 75 	test2_map_total
 76 		input=3
View Code

从日志中可以看出: 1)第三行显示"需要处理的总输入路径为1",2) map阶段的计数器显示总共的输入词数为3,且仅有test2相关计数,reduce阶段的计数器显示单词个数均为1。

由此,会引发疑问,为什么明明输入两个相同文件,hadoop仅检测到只有一个文件呢?

3. 原因排查

既然到Map、Reduce时的文件已经是仅有一个,因此需要在创建任务的时候进行排查。遂查看了与输入路径相关的MultipleInputs源码:

  1 @SuppressWarnings("unchecked")
  2   public static void addInputPath(Job job, Path path,
  3       Class<? extends InputFormat> inputFormatClass,
  4       Class<? extends Mapper> mapperClass) {
  5 
  6     addInputPath(job, path, inputFormatClass);
  7     Configuration conf = job.getConfiguration();
  8     String mapperMapping = path.toString() + ";" + mapperClass.getName();
  9     String mappers = conf.get(DIR_MAPPERS);
 10     conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
 11        : mappers + "," + mapperMapping);
 12 
 13     job.setMapperClass(DelegatingMapper.class);
 14   }
 15 
 16 
 17 public static void addInputPath(Job job, Path path,
 18       Class<? extends InputFormat> inputFormatClass) {
 19     String inputFormatMapping = path.toString() + ";"
 20        + inputFormatClass.getName();
 21     Configuration conf = job.getConfiguration();
 22     String inputFormats = conf.get(DIR_FORMATS);
 23     conf.set(DIR_FORMATS,
 24        inputFormats == null ? inputFormatMapping : inputFormats + ","
 25            + inputFormatMapping);
 26 
 27     job.setInputFormatClass(DelegatingInputFormat.class);
 28   }
 29 
View Code

通过源码可以观察到,在设置DIR_FORMATS和DIR_MAPPERS属性时,均以"输入路径;文件格式类名或Mapper类名"的格式进行创建,而在Job运行前,对于相同数据路径会仅保留其中一个,且若传入路径相同,则仅保存最后一个调用MultipleInputs.addInputPath对应的配置信息。因此相应的解决方案是传入不同的路径

4. 解决方案

将相同的输入内容文件,作为不同的路径传入。

(1) 不同之处的代码如下:

  1 public static void testMultiInputs() {
  2 
  3 		Configuration conf = new Configuration();
  4 
  5 		conf.set("mapreduce.job.queuename", "default");
  6 		conf.setBoolean("mapreduce.map.output.compress", true);
  7 		conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
  8 		conf.setInt("mapreduce.task.timeout",0);
  9 		conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f);
 10 
 11 		String input = "/work/justTest/";
 12 		try {
 13 			createMultiInputsTestJob(conf,
 14 					input+"test1", Test1Mapper.class,
 15 					input+"test2", Test2Mapper.class,
 16 					input+"/temp", 2, TestReduce.class)
 17 					.waitForCompletion(true);
 18 		} catch (Exception e) {
 19 			e.printStackTrace();
 20 		}
 21 	}
View Code

(2) 运行日志如下所示:

  1 18/08/12 21:58:15 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
  2 18/08/12 21:58:15 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  3 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
  4 18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
  5 18/08/12 21:58:16 INFO mapreduce.JobSubmitter: number of splits:2
  6 18/08/12 21:58:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39628
  7 18/08/12 21:58:17 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39628
  8 18/08/12 21:58:17 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39628/
  9 18/08/12 21:58:17 INFO mapreduce.Job: Running job: job_1527582903778_39628
 10 18/08/12 21:58:22 INFO mapreduce.Job: Job job_1527582903778_39628 running in uber mode : false
 11 18/08/12 21:58:22 INFO mapreduce.Job:  map 0% reduce 0%
 12 18/08/12 21:58:28 INFO mapreduce.Job:  map 100% reduce 0%
 13 18/08/12 21:58:34 INFO mapreduce.Job:  map 100% reduce 100%
 14 18/08/12 21:58:35 INFO mapreduce.Job: Job job_1527582903778_39628 completed successfully
 15 18/08/12 21:58:35 INFO mapreduce.Job: Counters: 55
 16 	File System Counters
 17 		FILE: Number of bytes read=66
 18 		FILE: Number of bytes written=362388
 19 		FILE: Number of read operations=0
 20 		FILE: Number of large read operations=0
 21 		FILE: Number of write operations=0
 22 		HDFS: Number of bytes read=528
 23 		HDFS: Number of bytes written=0
 24 		HDFS: Number of read operations=12
 25 		HDFS: Number of large read operations=0
 26 		HDFS: Number of write operations=4
 27 	Job Counters
 28 		Launched map tasks=2
 29 		Launched reduce tasks=2
 30 		Data-local map tasks=1
 31 		Rack-local map tasks=1
 32 		Total time spent by all maps in occupied slots (ms)=27332
 33 		Total time spent by all reduces in occupied slots (ms)=59792
 34 		Total time spent by all map tasks (ms)=6833
 35 		Total time spent by all reduce tasks (ms)=7474
 36 		Total vcore-seconds taken by all map tasks=6833
 37 		Total vcore-seconds taken by all reduce tasks=7474
 38 		Total megabyte-seconds taken by all map tasks=27987968
 39 		Total megabyte-seconds taken by all reduce tasks=61227008
 40 	Map-Reduce Framework
 41 		Map input records=2
 42 		Map output records=6
 43 		Map output bytes=28
 44 		Map output materialized bytes=96
 45 		Input split bytes=512
 46 		Combine input records=0
 47 		Combine output records=0
 48 		Reduce input groups=3
 49 		Reduce shuffle bytes=96
 50 		Reduce input records=6
 51 		Reduce output records=0
 52 		Spilled Records=12
 53 		Shuffled Maps =4
 54 		Failed Shuffles=0
 55 		Merged Map outputs=4
 56 		GC time elapsed (ms)=272
 57 		CPU time spent (ms)=4440
 58 		Physical memory (bytes) snapshot=1346195456
 59 		Virtual memory (bytes) snapshot=29357146112
 60 		Total committed heap usage (bytes)=3084910592
 61 	Shuffle Errors
 62 		BAD_ID=0
 63 		CONNECTION=0
 64 		IO_ERROR=0
 65 		WRONG_LENGTH=0
 66 		WRONG_MAP=0
 67 		WRONG_REDUCE=0
 68 	File Input Format Counters
 69 		Bytes Read=0
 70 	File Output Format Counters
 71 		Bytes Written=0
 72 	reduce_total
 73 		am_2=1
 74 		i_2=1
 75 		ws_2=1
 76 	test1_map_total
 77 		input=3
 78 	test2_map_total
 79 		input=3
View Code

(3) 通过日志可以看到,运行结果符合原始目标。

原文地址:https://www.cnblogs.com/mengrennwpu/p/9463458.html