MapReduce编程:词频统计

首先在项目的src文件中需要加入以下文件,log4j的内容为:

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

代码如下:

 1 package org.apache.hadoop.examples;
 2      
 3     import java.io.IOException;
 4     import java.util.Iterator;
 5     import java.util.StringTokenizer;
 6     import org.apache.hadoop.conf.Configuration;
 7     import org.apache.hadoop.fs.Path;
 8     import org.apache.hadoop.io.IntWritable;
 9     import org.apache.hadoop.io.Text;
10     import org.apache.hadoop.mapreduce.Job;
11     import org.apache.hadoop.mapreduce.Mapper;
12     import org.apache.hadoop.mapreduce.Reducer;
13     import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14     import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15     import org.apache.hadoop.util.GenericOptionsParser;
16      
17     public class WordCount {
18         public WordCount() {
19         }
20          
21         //main函数,MapReduce程序运行的入口
22         public static void main(String[] args) throws Exception {
23             Configuration conf = new Configuration();   //指定HDFS相关的参数
24             
25             //String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
26             String[] otherArgs = new String[]{"input","output"};
27             if(otherArgs.length < 2) {
28                 System.err.println("Usage: wordcount <in> [<in>...] <out>");
29                 System.exit(2);
30             }
31          
32             //通过Job类设置Hadoop程序运行时的环境变量
33             Job job = Job.getInstance(conf, "word count");  //设置环境参数 
34             job.setJarByClass(WordCount.class);  //设置整个程序的类名
35             job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
36             job.setCombinerClass(WordCount.IntSumReducer.class); 
37             job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
38             job.setOutputKeyClass(Text.class);  //设置输出类型,因为输出的形式是<单词,个数>,所以这里用Text,类似于Java的String,但还是有些区别
39             job.setOutputValueClass(IntWritable.class);  //设置输出类型,类似于Java的Int
40      
41             for(int i = 0; i < otherArgs.length - 1; ++i) {  
42                 FileInputFormat.addInputPath(job, new Path(otherArgs[i]));    //设置输入文件
43             }
44      
45             FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));   //设置输出文件
46             System.exit(job.waitForCompletion(true)?0:1);  //提交作业
47         }
48      
49         //Reduce处理逻辑
50         public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
51             private IntWritable result = new IntWritable();
52      
53             public IntSumReducer() {
54             }
55      
56             public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
57                 int sum = 0;
58      
59                 IntWritable val;
60                 for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
61                     val = (IntWritable)i$.next();
62                 }
63      
64                 this.result.set(sum);
65                 context.write(key, this.result);
66             }
67         }
68      
69         
70         //Map处理逻辑
71         public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
72             private static final IntWritable one = new IntWritable(1);
73             private Text word = new Text();
74      
75             public TokenizerMapper() {
76             }
77      
78             public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
79                 StringTokenizer itr = new StringTokenizer(value.toString());   //分词器
80      
81                 while(itr.hasMoreTokens()) {
82                     this.word.set(itr.nextToken());
83                     context.write(this.word, one);  //输出键值对
84                     //这里也可以直接写成context.write(new Text(word), new IntWritable(1));
85                 }
86      
87             }
88         }
89     }    
原文地址:https://www.cnblogs.com/zyb993963526/p/10244721.html