mapreduce (五) MapReduce实现倒排索引 修改版 combiner是把同一个机器上的多个map的结果先聚合一次

(总感觉上一篇的实现有问题)http://www.cnblogs.com/i80386/p/3444726.html   combiner是把同一个机器上的多个map的结果先聚合一次
现重新实现一个:
思路:
第一个mapreduce仅仅做 <word_docid,count>的统计,即某个单词在某一篇文章里出现的次数。(原理跟wordcount一样,只是word变成了word_docid)
第二个mapreduce将word_docid在map阶段拆开,重新组合为<word,docid_count> 然后在combine和reduce阶段(combine和reduce是同一个函数)组合为 <word,doc1:count1,doc2:count2,doc3:count3>这种格式
import java.io.IOException;
1 思路:
0.txt MapReduce is simple
1.txt MapReduce is powerfull is simple
2.txt Hello MapReduce bye MapReduce

采用两个JOB的形式实现
一:第一个JOB(跟wordcount一致,只是wordcount中的word换做了word:dicid)
1 map函数:context.write(word:docid, 1) 即将word:docid作为map函数的输出
输出key 输出value
MapReduce:0.txt 1
is:0.txt 1
simple:0.txt 1
Mapreduce:1.txt 1
is:1.txt 1
powerfull:1.txt 1
is:1.txt 1
simple:1.txt 1
Hello:2.txt 1
MapReduce:2.txt 1
bye:2.txt 1
MapReduce:2.txt 1
2 Partitioner函数:HashPartitioner
略,根据map函数的输出key(word:docid)进行分区

3 reduce函数:累加输入values
输出key 输出value
MapReduce:0.txt 1 => MapReduce 0.txt:1

is:0.txt 1 => is 0.txt:1
simple:0.txt 1 => simple 0.txt:1
Mapreduce:1.txt 1 => Mapreduce 1.txt:1
is:1.txt 2 => is 1.txt:2
powerfull:1.txt 1 => powerfull 1.txt:1
simple:1.txt 1 => simple 1.txt:1
Hello:2.txt 1 => Hello 2.txt:1
MapReduce:2.txt 2 => MapReduce 2.txt:2
bye:2.txt 1 => bye 2.txt:1
二:第二个JOB
1 map函数:

输入key 输入value 输出key 输出value
MapReduce:0.txt 1 => MapReduce 0.txt:1

is:0.txt 1 => is 0.txt:1
simple:0.txt 1 => simple 0.txt:1
Mapreduce:1.txt 1 => Mapreduce 1.txt:1
is:1.txt 2 => is 1.txt:2
powerfull:1.txt 1 => powerfull 1.txt:1
simple:1.txt 1 => simple 1.txt:1
Hello:2.txt 1 => Hello 2.txt:1
MapReduce:2.txt 2 => MapReduce 2
2 reduce函数 (组合values)
输出key 输出value
MapReduce 0.txt:1,1.txt:1 2.txt:2
is 0.txt:1,is 1.txt:2
simple 0.txt:1,1.txt:1
powerfull 1.txt:1
Hello 2.txt:1
bye 2.txt:1

import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; public class MyInvertIndex { public static class SplitMapper extends Mapper<Object, Text, Text, IntWritable> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); //String pth = split.getPath().toString(); String name = split.getPath().getName(); String[] tokens = value.toString().split("\s"); for (String token : tokens) { context.write(new Text(token + ":" + name), new IntWritable(1)); } } } public static class CombineMapper extends Mapper<Text, IntWritable, Text, Text> { public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { int splitIndex = key.toString().indexOf(":"); context.write(new Text(key.toString().substring(0, splitIndex)), new Text(key.toString().substring(splitIndex + 1) + ":" + value.toString())); } } public static class CombineReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer buff = new StringBuffer(); for (Text val : values) { buff.append(val.toString() + ","); } context.write(key, new Text(buff.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String dir_in = "hdfs://localhost:9000/in_invertedindex"; String dir_out = "hdfs://localhost:9000/out_invertedindex"; Path in = new Path(dir_in); Path out = new Path(dir_out); Path path_tmp = new Path("word_docid" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); Configuration conf = new Configuration(); try { Job countJob = new Job(conf, "invertedindex_count"); countJob.setJarByClass(MyInvertIndex.class); countJob.setInputFormatClass(TextInputFormat.class); countJob.setMapperClass(SplitMapper.class); countJob.setCombinerClass(IntSumReducer.class); countJob.setPartitionerClass(HashPartitioner.class); countJob.setMapOutputKeyClass(Text.class); countJob.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(countJob, in); countJob.setReducerClass(IntSumReducer.class); // countJob.setNumReduceTasks(1); countJob.setOutputKeyClass(Text.class); countJob.setOutputValueClass(IntWritable.class); countJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(countJob, path_tmp); countJob.waitForCompletion(true); Job combineJob = new Job(conf, "invertedindex_combine"); combineJob.setJarByClass(MyInvertIndex.class); combineJob.setInputFormatClass(SequenceFileInputFormat.class); combineJob.setMapperClass(CombineMapper.class); combineJob.setCombinerClass(CombineReducer.class); combineJob.setPartitionerClass(HashPartitioner.class); combineJob.setMapOutputKeyClass(Text.class); combineJob.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(combineJob, path_tmp); combineJob.setReducerClass(CombineReducer.class); // combineJob.setNumReduceTasks(1); combineJob.setOutputKeyClass(Text.class); combineJob.setOutputValueClass(Text.class); combineJob.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(combineJob, out); combineJob.waitForCompletion(true); } finally { FileSystem.get(conf).delete(path_tmp, true); } } }

运行结果:
Hello    2.txt:1,,
MapReduce    2.txt:2,1.txt:1,0.txt:1,,
bye    2.txt:1,,
is    1.txt:2,0.txt:1,,
powerfull    1.txt:1,,
simple    1.txt:1,0.txt:1,,




原文地址:https://www.cnblogs.com/i80386/p/3600174.html