MapReduce实现计数

对于非计算机专人士,大多数统计就是计数,而且许多基础的Hadoop作业都包含了计数过程。我们希望从专利引用数据集中得到专利被引用的次数。这同样是计数。期望的输出结果如下:


专利号 出现次数
10000 1
100000 1
1000006 1
1000007 1
1000011 1
1000017 1
1000026 1
1000033 2
1000043 1
1000044 1
1000045 1
1000046 2
1000049 1
1000051 1
1000054 1
1000065 1
1000067 3
在每个记录中,专利号与引用次数相关联。我们可以为此写一个MapReduce程序。如果有一个类似方式处理数据的MapReduce程序,你就可以复制并修改它使之符合你的要求。
我们已经有了一个获得反向引用索引的程序。可以修改这个程序来输出技术结果,而不是一个专利引用列表。我们只需修改Reducer。如果我们选择让技术结果输出类型为IntWritable,就需要在Reducer代码的3个地方进行声明。在以前的注视着我们称它们为V3

public void reduce(IntWritable key, Iterator<IntWritable> values,
                   OutputCollector<IntWritable, IntWritable> output,
                   Reporter reporter) throws IOException
{
    int count = 0;
    while (values.hasNext())
    {
        count += values.next().get();
    }
    output.collect(key, new IntWritable(count));
}
}

通过修改几行代码并匹配好类型,我们就有了一个新的MapReuce程序。这个程序看上去做的修改很少。让我们再看一个修改较多的例程,不过它依然保留了MapReduce基础程序的结构。
运行前面的例子后,我们现在得到了一个数据集,包含了对每个专利的引用次数的统计。要想做好一个很好地练习,可以对这些统计值再做统计。让我们生成一个引用计数的直方图。我们预计会看到一个有趣的引用计数分布,即大多数专利仅被引用一次,而少部分被引用上百次。
编写MapReduce程序第一步就是了解数据流。当mapper读取一个记录时,它忽略专利号并输出一个键值对

public class CitationHistogram extends Configured implements Tool
{
    public static class MapClass extends MapReduceBase implements
        Mapper<Text, Text, IntWritable, IntWritable>
    {
        private final static IntWritable uno = new IntWritable(1);

        private IntWritable citationCount = new IntWritable();

        public void map(Text key, Text value,
                        OutputCollector<IntWritable, IntWritable> output,
                        Reporter reporter) throws IOException
        {
            citationCount.set(Integer.parseInt(value.toString()));
            output.collect(citationCount, uno);

        }
    }


    public static class Reduce extends MapReduceBase implements
        Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
    {

        @Override
        public void reduce(IntWritable key, Iterator<IntWritable> values,
                           OutputCollector<IntWritable, IntWritable> output,
                           Reporter reporter) throws IOException
        {
            int count = 0;
            while (values.hasNext())
            {
                count += values.next().get();
            }
            output.collect(key, new IntWritable(count));
        }
    }

    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = getConf();
        JobConf job = new JobConf(conf, CitationHistogram.class);
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("CitationHistogram");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        int res = ToolRunner.run(new Configuration(), new CitationHistogram(),
                                 args);
        System.out.println(res);
    }

}

现在类名为CitationHistogram,新的名字将替换程序中所有Myjob相关的部分。Main()方法基本相同。Driver几乎没有改变。输入格式和输出格式任然分别为KeyValueTextInputFormat和TextOutPutFormat。主要的变化在输出的键和值的类,它们现在改为IntWritable来匹配K2和V2的新类型。我们还去掉了这一行:
job.set(“key.value.separator.in.input.line”,”,”);
它用于设置KeyValueTextInputFormat所采用的分隔符,把每个输入行划分为一个键值对。以前使用逗号来处理原始的专利引用数据。这里不对这个属性进行设置,分隔符默认采用制表符,它适合于分隔引用技术数据。

public class MapClass  extends MapReduceBase implements Mapper<Text, Text, IntWritable, IntWritable>
{
    private final static IntWritable uno = new IntWritable();
    private IntWritable citationCount = new IntWritable();
    @Override
    public void map(Text key, Text value,
                    OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
    throws IOException
    {
        citationCount.set(Integer.parseInt(value.toString()));
        output.collect(citationCount, uno);

    }

}

Map()方法中多出了一行,用于设置citationCount做类型转换。出于对效率的考虑,citationCount和uno的定义被放在类中而不是方法中。有多少记录,map()方法就会被调用多少次(对每个JVM而言,就是一个分片中的记录数)。减少在map()方法中生成对象的个数可以提高性能,并减少垃圾回收。由于citationCount和uno被传递给output.collect(),我们依赖output.collect()方法的约定不会修改这两个对象。

原文地址:https://www.cnblogs.com/ainima/p/6331860.html