Hadoop1.x代码求出一百万中最大的100个数

  1 /***********************************************
  2 这一百万数据只是在个人电脑上模拟,实际数据可能达到一亿。本人没有测试过
  3 这一百万数据的文件存储格式如下:
  4 4566
  5 1321634
  6 132132
  7 165446
  8 即:一行有一个数字
  9 
 10 下面用MapReduce实现
 11 
 12 ***********************************************/
 13 
 14 import java.io.IOException;
 15 import java.net.URI;
 16 import java.util.Arrays;
 17 
 18 import org.apache.hadoop.conf.Configuration;
 19 import org.apache.hadoop.fs.FileSystem;
 20 import org.apache.hadoop.fs.Path;
 21 import org.apache.hadoop.io.LongWritable;
 22 import org.apache.hadoop.io.NullWritable;
 23 import org.apache.hadoop.io.Text;
 24 import org.apache.hadoop.mapreduce.Job;
 25 import org.apache.hadoop.mapreduce.Mapper;
 26 import org.apache.hadoop.mapreduce.Reducer;
 27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 28 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 29 
 30 /**
 31  * 作业:求最大的100个值
 32  *
 33  */
 34 public class Top100App {
 35     
 36     static final String INPUT = "hdfs://192.168.56.100:9000/input";
 37     static final String OUT = "hdfs://192.168.56.100:9000/out";
 38     
 39     static final Path INPUT_PATH = new Path(INPUT);
 40     static final Path OUT_PATH = new Path(OUT);
 41     
 42     static final int topNum = 100;
 43     
 44     public static void main(String[] args) throws Exception{
 45         
 46         Configuration conf = new Configuration();
 47         FileSystem fileSystem = FileSystem.get(new URI(OUT),conf);
 48         if(fileSystem.exists(OUT_PATH)){
 49             fileSystem.delete(OUT_PATH,true);
 50         }
 51         
 52         Job job = new Job(conf,Top100App.class.getSimpleName());
 53         FileInputFormat.setInputPaths(job, INPUT_PATH);
 54         job.setMapperClass(MyMapper.class);
 55         job.setReducerClass(MyReducer.class);
 56         job.setOutputKeyClass(LongWritable.class);
 57         job.setOutputValueClass(NullWritable.class);
 58         FileOutputFormat.setOutputPath(job, OUT_PATH);
 59         job.waitForCompletion(true);
 60     }
 61     
 62     static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
 63         @Override
 64         protected void map(
 65                 LongWritable key,
 66                 Text value,
 67                 Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
 68                 throws IOException, InterruptedException {
 69             context.write(new LongWritable(Long.parseLong(value.toString())), NullWritable.get());
 70         }
 71     }
 72     
 73     static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
 74         long[] topArray = new long[topNum];
 75         int count = 0;
 76         @Override
 77         protected void reduce(
 78                 LongWritable k2,
 79                 Iterable<NullWritable> v2s,
 80                 Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
 81                 throws IOException, InterruptedException {
 82             long num = Long.parseLong(k2.toString());
 83             if(count < topNum){
 84                 topArray[count] = num;
 85                 count++;
 86             }else{
 87                 Arrays.sort(topArray);
 88                 if(num > topArray[0]){
 89                     topArray[0] = num;
 90                 }
 91             }
 92         }
 93         @Override
 94         protected void cleanup(
 95                 Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
 96                 throws IOException, InterruptedException {
 97             Arrays.sort(topArray);
 98             for(int i = topArray.length -1 ; i > -1 ; i--){                
 99                 context.write(new LongWritable(topArray[i]), NullWritable.get());
100             }
101         }
102     }
103     
104     
105 }
原文地址:https://www.cnblogs.com/litaiqing/p/4539078.html