MapReduce数据处理两表join连接 (Ruduce端连接)

http://blog.csdn.net/qq272936993/article/details/7457553


现在这里有两个text文档,需要把它合并成一个文档,并且里面的数据不能有冗余..


user.txt文件: 

UserId       UserName   DepNo
10000000     Li         1001
10000001     Wang       1001
10000002     Zhang      1002
10000003     Wei        1004
10000004     He         1003
10000005     Jin        1002



depart.txt文件: 

DepNo        DepName
1001         Develop
1002         Test
1003         HR
1004         Market 



生成文件: 

10000000     Li         1001       Develop
10000001     Wang       1001       Develop
10000002     Zhang      1002       Test
10000003     Wei        1004       Market
10000004     He         1003       HR
10000005     Jin        1002       Test



因为user.txt文档的第3个字段与depart.txt的第1个字段是相同的, 所以我把他们做为key值. 

 

public class Advanced extends Configured implements Tool {

	public static class AdMap extends Mapper<LongWritable, Text, Text, TextPair>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
		 	String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();	
		 	String line = value.toString();
		 	String[] childline = line.split(" ");    //以空格截取			       
                        if(filePath.contains("user.txt") ){  //判断是哪一张表
				TextPair pair = new TextPair();
				pair.setFlag("0");         //这是个标识   0.表示 user.txt     1表示depart.txt
				pair.setKey(childline[2]);
				pair.setValue(childline[0]+" "+childline[1]);
				pair.setContent(pair.toString());	
                           context.write(new Text(pair.getKey()), pair);
			}else if (filePath.contains("depart.txt")){
			 	TextPair pair = new  TextPair();
				pair.setFlag("1");
				pair.setKey(childline[0]);
				pair.setValue(childline[0]+" " +childline[1]);
				pair.setContent(pair.toString()); 
				context.write(new Text(pair.getKey()), pair);
		       }		
			
		}		
	}
	
	public static class AdReduce extends Reducer<Text, TextPair, Text, Text>{

		@Override
		public void reduce(Text key, Iterable<TextPair> values,
				Context context)
				throws IOException, InterruptedException {
			 
                        List<Text> listUser = new ArrayList<Text>();     
			List<Text> listDepart = new ArrayList<Text>();
			Iterator<TextPair> it = values.iterator();
			TextPair pair = new TextPair();
			while(it.hasNext()){
				pair = it.next();
				if("0".equals(pair.getFlag())){
				    listUser.add(new Text(pair.getValue()));
                            }
				else {
				    listDepart.add(new Text(pair.getValue()));
                            }
			}
			
			for(int i = 0 ; i<listUser.size(); i++){
			    for(int j = 0 ;j<listDepart.size();j++){ i 
				context.write(key, new Text(listUser.get(j)+" " +listDepart.get(i)));
			    }
			}
			
			
		}
		
	}
	 
	public static void main(String[] args) {
		try {
                   int res = ToolRunner.run(new Configuration(), new Advanced(), args);
			System.exit(res);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[2]))){
			System.out.println("error : file is exists");
			System.exit(-1);
		}
		
		Job job = new Job(conf , "Advanced");
		job.setJarByClass(Advanced.class);
		job.setMapperClass(AdMap.class);
		job.setReducerClass(AdReduce.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TextPair.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]),new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));
		return job.waitForCompletion(true) ? 0 : 1;
	} 
}
 		


class TextPair implements WritableComparable<TextPair>{

		public String getValue() {
			return value;
		}

		public void setValue(String value) {
			this.value = value;
		}

		@Override
		public String toString() {
			return " " + key +" "+ value; 
		}

		public String getFlag() {
			return flag;
		}

		public void setFlag(String flag) {
			this.flag = flag;
		}

		public String getKey() {
			return key;
		}

		public void setKey(String key) {
			this.key = key;
		}

		public String getContent() {
			return content;
		}

		public void setContent(String content) {
			this.content = content;
		}

		private String flag = "";
		private String key ="";
		private String value ="";
		private String content = "";
		
		

		public TextPair(String flag, String key, String value, String content) {
			this.flag = flag;
			this.key = key;
			this.value = value;
			this.content = content;
		}

		public TextPair() {
		}

		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeUTF(this.flag);
			out.writeUTF(this.key);
			out.writeUTF(this.value);
			out.writeUTF(this.content);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			this.flag = in.readUTF();
			this.key = in.readUTF();
			this.value = in.readUTF();
			this.content = in.readUTF();
		}

		@Override
		public int compareTo(TextPair o) {
			// TODO Auto-generated method stub
			return 0;
		}
		
		
	}


原文地址:https://www.cnblogs.com/leeeee/p/7276278.html