代码如下, 后备参考:
package com.bigdata.hadoop.hdfs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountTest { //step 1 Mapper Class public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text mapOutPutKey = new Text(); private final static IntWritable mapOutPutValue = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //get lines value String lineValue = value.toString(); String[] strs = lineValue.split(" "); for(String str : strs){ mapOutPutKey.set(str); context.write(mapOutPutKey, mapOutPutValue); } } } //step 2 Reducer Class public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable outPutVlaue = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { //temp : sum int sum = 0; for(IntWritable value : values){ sum += value.get(); } outPutVlaue.set(sum); context.write(key, outPutVlaue); } } //step 3 Driver public int run(String[] args) throws Exception, InterruptedException{ //get configuration Configuration configuration = new Configuration();
//get a job Job job = Job.getInstance(configuration,this.getClass().getName()); job.setJarByClass(getClass());
//get a input path Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath);
//get a output path Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); //Mapper job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //Reducer job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //submit job boolean isSUccess = job.waitForCompletion(true); return isSUccess ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://linux-66-64.liuwl.com:8020/user/liuwl/tmp/input", "hdfs://linux-66-64.liuwl.com:8020/user/liuwl/tmp/output" }; int status = new WordCountTest().run(args); System.exit(status); } }