MapReduce的Mapper端JOIN

  1 package com.mengyao.hadoop.mapreduce;
  2 
  3 import java.io.IOException;
  4 import java.text.SimpleDateFormat;
  5 import java.util.Date;
  6 import java.util.Vector;
  7 
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.conf.Configured;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.NullWritable;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.mapreduce.Counter;
 15 import org.apache.hadoop.mapreduce.Job;
 16 import org.apache.hadoop.mapreduce.Mapper;
 17 import org.apache.hadoop.mapreduce.Reducer;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class MapJoinApp extends Configured implements Tool {
 25 
 26     static class MapJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
 27         
 28         private Text k;
 29         private Text v;
 30         
 31         @Override
 32         protected void setup(
 33                 Mapper<LongWritable, Text, Text, Text>.Context context)
 34                         throws IOException, InterruptedException {
 35             k = new Text();
 36             v = new Text();        
 37         }
 38 
 39         @Override
 40         protected void map(LongWritable key, Text value, Context context)
 41                 throws IOException, InterruptedException {
 42             Counter errorLineCounter = context.getCounter("ErrorTotal", "errorLine");
 43             Counter notFindCounter = context.getCounter("ErrorTotal", "notFind");
 44             FileSplit inputSplit = (FileSplit) context.getInputSplit();
 45             String fileName = inputSplit.getPath().getName();
 46             String userAddressTable = "userAddressTable.txt";
 47             String userTable = "userTable.txt";
 48             if (fileName != null && !fileName.isEmpty()) {
 49                 if (fileName.equals(userAddressTable)) {
 50                     String[] fields = value.toString().split("\t");
 51                     String addressId = fields[0];
 52                     String address = fields[1];
 53                     k.set(addressId);
 54                     v.set("ua@"+address);
 55                     context.write(k, v);
 56                     System.err.println(k.toString()+"\t"+v.toString());
 57                 } else if(fileName.equals(userTable)) {
 58                     String[] fields = value.toString().split("\t");
 59                     String id = fields[0];
 60                     String name = fields[1];
 61                     String addressId = fields[2];
 62                     k.set(addressId);
 63                     v.set("u@"+id+"\t"+name);
 64                     System.err.println(k.toString()+"\t"+v.toString());
 65                     context.write(k, v);
 66                 } else {
 67                     notFindCounter.increment(1L);
 68                 }
 69             } else {
 70                 errorLineCounter.increment(1L);
 71             }
 72         }
 73     }
 74     
 75     static class MapJoinReducer extends Reducer<Text, Text, NullWritable, Text> {
 76         @Override
 77         protected void reduce(Text key, Iterable<Text> value, Context context)
 78                 throws IOException, InterruptedException {
 79             Vector<String> uArr = new Vector<String>();
 80             Vector<String> uaArr = new Vector<String>();
 81             for (Text text : value) {
 82                 String item = text.toString();
 83                 if (item.contains("ua@")) {
 84                     uaArr.add(item.split("ua@")[1]);
 85                 }
 86                 if (item.contains("u@")) {
 87                     uArr.add(item.split("u@")[1]);
 88                 }
 89             }
 90             int i;
 91             for (i = 0; i < uArr.size(); i++) {
 92                 for (String uaItem : uaArr) {
 93                     context.write(NullWritable.get(), new Text(uArr.get(i) +"\t"+ uaItem));
 94                 }
 95             }
 96             
 97         }
 98     }
 99 
100     @Override
101     public int run(String[] args) throws Exception {
102         Configuration conf = getConf();
103         
104         conf.set("mapreduce.job.jvm.numtasks", "-1");        
105         conf.set("mapreduce.map.speculative", "false");        
106         conf.set("mapreduce.reduce.speculative", "false");    
107         conf.set("mapreduce.map.maxattempts", "4");            
108         conf.set("mapreduce.reduce.maxattempts", "4");        
109         conf.set("mapreduce.map.skip.maxrecords", "0");    
110         Job job = Job.getInstance(conf, MapJoinApp.class.getSimpleName());
111         job.setJarByClass(MapJoinApp.class);
112         
113         FileInputFormat.addInputPath(job, new Path(args[0]));
114         FileOutputFormat.setOutputPath(job, new Path(args[1]));
115         
116         job.setMapperClass(MapJoinMapper.class);
117         job.setMapOutputKeyClass(Text.class);
118         job.setMapOutputValueClass(Text.class);
119         
120 //        job.setCombinerClass(JoinReducer.class);
121         
122         job.setReducerClass(MapJoinReducer.class);
123         job.setOutputKeyClass(NullWritable.class);
124         job.setOutputValueClass(Text.class);
125         
126         return job.waitForCompletion(true)?0:1;
127     }
128     
129     public static int createJob(String[] params){
130         Configuration conf = new Configuration();
131         int status = 1;
132         try {
133             status = ToolRunner.run(conf, new MapJoinApp(), params);
134         } catch (Exception e) {
135             e.printStackTrace();
136             throw new RuntimeException(e);
137         }
138         
139         return status;
140     }
141     
142     public static void main(String[] args) {
143         args = new String[2];
144         args[0] = "/testdata/user*";
145         args[1] = "/job/mapreduce/"+MapJoinApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date());
146         if (args.length != 2) {
147             System.out.println("Usage: "+MapJoinApp.class.getSimpleName()+" <in> <out>");
148             System.exit(2);
149         } else {
150             int status = createJob(args);
151             System.exit(status);
152         }
153     }
154 
155 
156 }
原文地址:https://www.cnblogs.com/mengyao/p/4865587.html