MapReduce的自定义排序

  1 package com.mengyao.hadoop.mapreduce;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.text.SimpleDateFormat;
  7 import java.util.Date;
  8 
  9 import org.apache.hadoop.conf.Configuration;
 10 import org.apache.hadoop.conf.Configured;
 11 import org.apache.hadoop.fs.Path;
 12 import org.apache.hadoop.io.LongWritable;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.io.WritableComparable;
 15 import org.apache.hadoop.mapreduce.Job;
 16 import org.apache.hadoop.mapreduce.Mapper;
 17 import org.apache.hadoop.mapreduce.Reducer;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 20 import org.apache.hadoop.util.Tool;
 21 import org.apache.hadoop.util.ToolRunner;
 22 
 23 public class SortApp extends Configured implements Tool {
 24 
 25     static class Num2 implements WritableComparable<Num2> {
 26 
 27         private long c1;
 28         private long c2;
 29         
 30         public Num2(){
 31             
 32         }
 33         
 34         public Num2(long c1, long c2) {
 35             this.c1 = c1;
 36             this.c2 = c2;
 37         }
 38 
 39         @Override
 40         public void write(DataOutput out) throws IOException {
 41             out.writeLong(this.c1);
 42             out.writeLong(this.c2);
 43         }
 44 
 45         @Override
 46         public void readFields(DataInput in) throws IOException {
 47             this.c1 = in.readLong();
 48             this.c2 = in.readLong();
 49         }
 50 
 51         @Override
 52         public int compareTo(Num2 num2) {
 53             long min = this.c1 - num2.c1;
 54             if (min != 0) {
 55                 return (int)min;
 56             }
 57             
 58             return (int)(this.c2 - num2.c2);
 59         }
 60 
 61         public void set(long c1, long c2) {
 62             this.c1 = c1;
 63             this.c2 = c2;
 64         }
 65         
 66     }
 67     
 68     static class SortMapper extends Mapper<LongWritable, Text, Num2, LongWritable> {
 69         
 70         private Num2 k = null;
 71         private LongWritable v = null;
 72         
 73         @Override
 74         protected void setup(Mapper<LongWritable, Text, Num2, LongWritable>.Context context)
 75                 throws IOException, InterruptedException {
 76             k = new Num2();
 77             v = new LongWritable();
 78         }
 79 
 80         @Override
 81         protected void map(LongWritable key, Text value, Context context)
 82                 throws IOException, InterruptedException {
 83             String[] fields = value.toString().split("\t");
 84             if (fields != null && fields.length == 2) {
 85                 k.set(Long.parseLong(fields[0]), Long.parseLong(fields[1]));
 86                 v.set(Long.parseLong(fields[0]));
 87                 context.write(k, v);                
 88             }
 89         }
 90     }
 91     
 92     static class SortReducer extends Reducer<Num2, LongWritable, LongWritable, LongWritable> {
 93         @Override
 94         protected void reduce(Num2 key, Iterable<LongWritable> value, Context context)
 95                 throws IOException, InterruptedException {
 96             context.write(new LongWritable(key.c1), new LongWritable(key.c2));
 97         }
 98     }
 99     
100     @Override
101     public int run(String[] args) throws Exception {
102         Configuration conf = getConf();
103         conf.set("mapreduce.job.jvm.numtasks", "-1");        
104         conf.set("mapreduce.map.speculative", "false");        
105         conf.set("mapreduce.reduce.speculative", "false");    
106         conf.set("mapreduce.map.maxattempts", "4");            
107         conf.set("mapreduce.reduce.maxattempts", "4");        
108         conf.set("mapreduce.map.skip.maxrecords", "0");    
109         Job job = Job.getInstance(conf, SortApp.class.getSimpleName());
110         job.setJarByClass(SortApp.class);
111         
112         FileInputFormat.addInputPath(job, new Path(args[0]));
113         FileOutputFormat.setOutputPath(job, new Path(args[1]));
114         
115         job.setMapperClass(SortMapper.class);
116         job.setMapOutputKeyClass(Num2.class);
117         job.setMapOutputValueClass(LongWritable.class);
118         
119         job.setReducerClass(SortReducer.class);
120         job.setOutputKeyClass(LongWritable.class);
121         job.setOutputValueClass(LongWritable.class);
122         
123         return job.waitForCompletion(true)?0:1;
124     }
125 
126     public static int createJob(String[] params) {
127         Configuration conf = new Configuration();
128         int status = 1;
129         try {
130             status = ToolRunner.run(conf, new SortApp(), params);
131         } catch (Exception e) {
132             e.printStackTrace();
133             throw new RuntimeException(e);
134         }
135         
136         return status;
137     }
138     
139     public static void main(String[] args) throws Exception {
140         args = new String[]{"/testdata/sortdata", "/job/mapreduce/"+SortApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date())};
141         if (args.length != 2) {
142             System.err.println("Usage: "+SortApp.class.getSimpleName()+" <in> <out>");
143             System.exit(2);
144         } else {
145             int status = createJob(args);
146             System.exit(status);
147         }
148     }
149 
150 }
原文地址:https://www.cnblogs.com/mengyao/p/4865599.html