使用hadoop统计多个文本中每个单词数目

程序源码

 1 import java.io.IOException;
 2 import java.util.StringTokenizer;
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
15 
16 public class WordCount {
17     public static class WordCountMap extends
18             Mapper<LongWritable, Text, Text, IntWritable> {
19         private final IntWritable one = new IntWritable(1);//输出的值  1
20         private Text word = new Text();//输出的键 单词
21 
22         public void map(LongWritable key, Text value, Context context)
23                 throws IOException, InterruptedException {//处理经过  TextInputFormat  产生的  <k1,v1>,然后产生 <k2,v2>
24             String line = value.toString();//读取文本中
25             StringTokenizer token = new StringTokenizer(line);//按照空格对单词进行切割
26             while (token.hasMoreTokens()) {
27                 word.set(token.nextToken());//读取到的单词作为键值
28                 context.write(word, one);//以  单词,1的中间形式交给reduce处理
29             }
30         }
31     }
32 
33     public static class WordCountReduce extends
34             Reducer<Text, IntWritable, Text, IntWritable> {
35         public void reduce(Text key, Iterable<IntWritable> values,
36                 Context context) throws IOException, InterruptedException {
37             int sum = 0;
38             for (IntWritable val : values) {
39                 sum += val.get();
40             }
41             context.write(key, new IntWritable(sum));
42         }
43     }
44 
45     public static void main(String[] args) throws Exception {
46         Configuration conf = new Configuration();
47         Job job = new Job(conf);
48         job.setJarByClass(WordCount.class);
49         job.setJobName("wordcount");
50         job.setOutputKeyClass(Text.class);
51         job.setOutputValueClass(IntWritable.class);
52         job.setMapperClass(WordCountMap.class);
53         job.setReducerClass(WordCountReduce.class);
54         job.setInputFormatClass(TextInputFormat.class);//生成可供Map处理的键值对
55         job.setOutputFormatClass(TextOutputFormat.class);
56         FileInputFormat.addInputPath(job, new Path(args[0]));
57         FileOutputFormat.setOutputPath(job, new Path(args[1]));
58         job.waitForCompletion(true);
59     }
60 }

1 编译源码

javac -classpath /opt/hadoop-1.2.1/hadoop-core-1.2.1.jar:/opt/hadoop-1.2.1/lib/commons-cli-1.2.jar -d ./word_count_class/ WordCount.java
将源码编译成class文件并放在当前文件夹下的word_count_class目录,当然,首先需要创建该目录

2 将源码打成jar包

进入源码目录

jar -cvf wordcount.jar  *

3 上传输入文件

先在hadoop中为本次任务创建一个输入文件存放目录

hadoop fs -mkdir input_wordcount

将input目录下的所有文本文件上传到hadoop中的input_wordcount目录下

hadoop fs -put input/* input_wordcount/

注意:不能在运行前穿创建输出文件夹

4 上传jar并执行

hadoop jar word_count_class/wordcount.jar input_wordcount output_wordcount

5 查看计算结果

程序输出目录

 hadoop fs -ls output_wordcount

程序输出内容

hadoop fs -cat output_wordcount/part-r-00000



版本二:自己实际操作中的程序

Map程序

 1 package com.zln.chapter03;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapred.MapReduceBase;
 7 import org.apache.hadoop.mapred.Mapper;
 8 import org.apache.hadoop.mapred.OutputCollector;
 9 import org.apache.hadoop.mapred.Reporter;
10 
11 import java.io.IOException;
12 import java.util.StringTokenizer;
13 
14 /**
15  * Created by sherry on 15-7-12.
16  */
17 public class WordCountMap extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable> {
18     private final static IntWritable one = new IntWritable(1);//每个单词 +1
19     private Text word = new Text();
20 
21     @Override
22     public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
23         String line = text.toString();
24         StringTokenizer tokenizer = new StringTokenizer(line);//分割出单词
25         while (tokenizer.hasMoreTokens()){
26             word.set(tokenizer.nextToken());
27             outputCollector.collect(word,one);
28         }
29     }
30 }

Reduce程序

 1 package com.zln.chapter03;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapred.MapReduceBase;
 6 import org.apache.hadoop.mapred.OutputCollector;
 7 import org.apache.hadoop.mapred.Reducer;
 8 import org.apache.hadoop.mapred.Reporter;
 9 
10 import java.io.IOException;
11 import java.util.Iterator;
12 
13 /**
14  * Created by sherry on 15-7-12.
15  */
16 public class WordCountReduce extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
17     @Override
18     public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
19         int sum = 0;
20         while (iterator.hasNext()){
21             sum += iterator.next().get();
22         }
23         outputCollector.collect(text,new IntWritable(sum));
24     }
25 }


主函数

 1 package com.zln.chapter03;
 2 
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapred.*;
 7 
 8 import java.io.IOException;
 9 
10 
11 /**
12  * Created by sherry on 15-7-12.
13  */
14 public class WordCount {
15     public static void main(String[] args) throws IOException {
16         JobConf conf = new JobConf(WordCount.class);
17         conf.setJobName("wordCount");
18 
19         //设置输出格式
20         conf.setOutputKeyClass(Text.class);
21         conf.setOutputValueClass(IntWritable.class);
22 
23         //设置MapReduce类
24         conf.setMapperClass(WordCountMap.class);
25         conf.setReducerClass(WordCountReduce.class);
26 
27         //设置处理输入类
28         conf.setInputFormat(TextInputFormat.class);
29         //设置处理输出类
30         conf.setOutputFormat(TextOutputFormat.class);
31 
32         FileInputFormat.setInputPaths(conf, new Path(args[0]));
33         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
34 
35         JobClient.runJob(conf);
36     }
37 }

准备输入文件

file1

Hello Word By Word
Hello Word By zln

file2

Hello Hadoop
Hello GoodBye

放在同一个目录下:/home/sherry/IdeaProjects/Hadoop/WordCount/输入文件准备

编译class打成一个jar包

我使用IDEA进行编译。注意不要忘记指定main函数

上传输入文件

root@sherry:/opt/hadoop-1.2.1# hadoop fs -mkdir /user/root/zln/WordCount/InputFiles
root@sherry:/opt/hadoop-1.2.1# hadoop fs -put /home/sherry/IdeaProjects/Hadoop/WordCount/输入文件准备/* /user/root/zln/WordCount/InputFiles

上传jar并执行

root@sherry:/opt/hadoop-1.2.1# hadoop jar /home/sherry/IdeaProjects/Hadoop/out/artifacts/WordCount_jar/WordCount.jar /user/root/zln/WordCount/InputFiles /user/root/zln/WordCount/OutputFiles

查看执行结果

root@sherry:/opt/hadoop-1.2.1# hadoop fs -ls /user/root/zln/WordCount/OutputFiles
root@sherry:/opt/hadoop-1.2.1# hadoop fs -text /user/root/zln/WordCount/OutputFiles/part-00000


版本三:使用新版本的API对Map  Reduce  main函数进行重写

Map

 1 package com.zln.chapter03;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 
 8 import java.io.IOException;
 9 import java.util.StringTokenizer;
10 
11 /**
12  * Created by sherry on 15-7-12.
13  */
14 public class WordCountMap extends Mapper<LongWritable,Text,Text,IntWritable> {
15     private final static IntWritable one = new IntWritable(1);//每个单词 +1
16     private Text word = new Text();
17 
18 
19     @Override
20     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
21         String line = value.toString();
22         StringTokenizer tokenizer = new StringTokenizer(line);//分割出单词
23         while (tokenizer.hasMoreTokens()){
24             word.set(tokenizer.nextToken());
25             context.write(word,one);
26         }
27     }
28 
29 }

Reduce

 1 package com.zln.chapter03;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6 
 7 import java.io.IOException;
 8 
 9 /**
10  * Created by sherry on 15-7-12.
11  */
12 public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
13 
14     @Override
15     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
16         int sum = 0;
17         for (IntWritable intWritable:values){
18             sum += intWritable.get();
19         }
20         context.write(key,new IntWritable(sum));
21     }
22 }

Main

 1 package com.zln.chapter03;
 2 
 3 
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
13 import org.apache.hadoop.util.Tool;
14 import org.apache.hadoop.util.ToolRunner;
15 
16 
17 
18 /**
19  * Created by sherry on 15-7-12.
20  */
21 public class WordCount extends Configured implements Tool{
22 
23     public int run(String[] args) throws Exception {
24         Job job = new Job(getConf());
25         job.setJarByClass(WordCount.class);
26         job.setJobName("WordCount");
27 
28 
29         job.setOutputKeyClass(Text.class);
30         job.setOutputValueClass(IntWritable.class);
31 
32         job.setMapperClass(WordCountMap.class);
33         job.setReducerClass(WordCountReduce.class);
34 
35         job.setInputFormatClass(TextInputFormat.class);
36         job.setOutputFormatClass(TextOutputFormat.class);
37 
38         FileInputFormat.setInputPaths(job,new Path(args[0]));
39         FileOutputFormat.setOutputPath(job,new Path(args[1]));
40 
41         boolean success = job.waitForCompletion(true);
42         return success?0:1;
43     }
44 
45     public static void main(String[] args) throws Exception {
46         int ret = ToolRunner.run(new WordCount(),args);
47         System.exit(ret);
48     }
49 }
原文地址:https://www.cnblogs.com/sherrykid/p/4604717.html