MapReduce的WordCount

  1 package com.mengyao.hadoop.mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.util.Tool;
 17 import org.apache.hadoop.util.ToolRunner;
 18 
 19 /**
 20  * 输入文件目录为HDFS上的/mapreduces/word.txt,内容如下:
 21  *         hadoop    zookeeper    hbase    hive
 22  *         flume    sqoop    pig    mahout
 23  *         hadoop    spark    mllib    hive    zookeeper
 24  *         hadoop    storm    kafka    redis    zookeeper
 25  *
 26  * 输出目录为HDFS上的/mapreduces/wordcount/
 27  *     _SUCCESS空文件表示作业执行成功(如果是_FAILD文件则失败)
 28  *  part-r-00000文件表示作业的结果,内容如下:
 29  *      flume    1
 30  *      hadoop    3
 31  *      hbase    1
 32  *      hive    2
 33  *      kafka    1
 34  *      mahout    1
 35  *      mllib    1
 36  *      pig    1
 37  *      redis    1
 38  *      spark    1
 39  *      sqoop    1
 40  *      storm    1
 41  *      zookeeper    3
 42  *  
 43  * @author mengyao
 44  *
 45  */
 46 public class WordCount extends Configured implements Tool {
 47 
 48     static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
 49         
 50         private Text outputKey;
 51         private LongWritable outputValue;
 52         
 53         @Override
 54         protected void setup(Context context)
 55                 throws IOException, InterruptedException {
 56             this.outputKey = new Text();
 57             this.outputValue = new LongWritable(1L);
 58         }
 59         
 60         @Override
 61         protected void map(LongWritable key, Text value, Context context)
 62                 throws IOException, InterruptedException {
 63             final String[] words = value.toString().split("\t");
 64             for (String word : words) {
 65                 this.outputKey.set(word);
 66                 context.write(this.outputKey, this.outputValue);            
 67             }
 68         }
 69     }
 70     
 71     static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 72         
 73         private Text outputKey;
 74         private LongWritable outputValue;
 75         
 76         @Override
 77         protected void setup(Context context)
 78                 throws IOException, InterruptedException {
 79             this.outputKey = new Text();
 80             this.outputValue = new LongWritable();
 81         }
 82         
 83         @Override
 84         protected void reduce(Text key, Iterable<LongWritable> value, Context context)
 85                 throws IOException, InterruptedException {
 86             long count = 0L;
 87             for (LongWritable item : value) {
 88                 count += item.get();
 89             }
 90             this.outputKey.set(key);
 91             this.outputValue.set(count);
 92             context.write(this.outputKey, this.outputValue);
 93         }
 94     }
 95     
 96     @Override
 97     public int run(String[] args) throws Exception {
 98         Job job = Job.getInstance(getConf(), WordCount.class.getSimpleName());
 99         job.setJarByClass(WordCount.class);
100         
101         job.setInputFormatClass(TextInputFormat.class);
102         FileInputFormat.addInputPath(job, new Path(args[0]));
103         FileOutputFormat.setOutputPath(job, new Path(args[1]));
104         
105         job.setMapperClass(WordCountMapper.class);
106         job.setMapOutputKeyClass(Text.class);
107         job.setMapOutputValueClass(LongWritable.class);
108         
109         job.setCombinerClass(WordCountReducer.class);
110         
111         job.setReducerClass(WordCountReducer.class);
112         job.setOutputKeyClass(Text.class);
113         job.setOutputValueClass(LongWritable.class);
114         
115         return job.waitForCompletion(true)?0:1;
116     }
117 
118     public static int createJob(String[] args) {
119         Configuration conf = new Configuration();
120         conf.set("dfs.datanode.socket.write.timeout", "7200000");
121         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
122         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
123         int status = 0;
124         
125         try {
126             status = ToolRunner.run(conf, new WordCount(), args);
127         } catch (Exception e) {
128             e.printStackTrace();
129         }
130         
131         return status;
132     }
133     
134     public static void main(String[] args) {
135         args = new String[]{"/mapreduces/word.txt", "/mapreduces/wordcount"};
136         if (args.length!=2) {
137             System.out.println("Usage: "+WordCount.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>");
138         } else {
139             int status = createJob(args);
140             System.exit(status);
141         }
142     }
143 
144 }
原文地址:https://www.cnblogs.com/mengyao/p/4865561.html