《Hadoop实战》之专利统计

数据

专利数据的特性

  • 专利引用数据所构成的关系图与网页链接以及社会网络图大同小异
  • 专利发布以时间为序,特性类似于时间序列
  • 专利关联到一个人和一个位置,可视为个人信息或地理数据

首先拿到专利数据:http://data.nber.org/patents/

本文使用是的cite75-99.txt,该文件涵盖了自1975年到1999年间对美国专利的引用,包含超过1600万条数据,前几行如下图:

其中第一列为专利号、第二列为被第一列引用的专利号

CITING CITED
3858241 956203

需求一

  1. 读取专利引用数据,对于每一个专利找到哪些专列对他进行了引用并进行合并。
  2. 进行倒序排序

思路

  • 针对1,将Mapper的键值互换输出即可
  • 针对2,简单方法是设置reduce=1,直接输出全局排序文件
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;


// 读取专利引用数据,对于每一个专利找到哪些专列对他进行了引用并进行合并。
public class FindCitedPatentsAndOrder extends Configured implements Tool {

    public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {

        @Override
        public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            output.collect(value, key);  // 关键点
        }
    }

    public static class ReduceClass extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {
            String csv = "";
            while (values.hasNext()) {
                if (csv.length() > 0) csv += ",";
                csv += values.next().toString();
            }
            output.collect(key, new Text(csv));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf job = new JobConf(getConf(), getClass());

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("FindCitedPatentsAndOrder");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ",");

        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new FindCitedPatentsAndOrder(), args);
        System.exit(exitCode);
    }
}

需求二

  • 计算不同引用次数专利的数目

思路

  • 在FindCitedPatentsAndOrder的基础上修改Reduce函数,统计数目
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

// 计算不同引用次数专利的数目
public class CitedPatentsNumberCounter extends Configured implements Tool {
    public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {

        @Override
        public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            output.collect(value, key);  // 关键点
        }
    }

    public static class ReduceClass extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output,
                           Reporter reporter) throws IOException {
            int count = 0;
            while (values.hasNext()){
                values.next();
                count++;
            }
            output.collect(key, new IntWritable(count));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf job = new JobConf(getConf(), getClass());

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("CitedPatentsNumberCounter");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);  // 同时设置K2,V2和K3,V3的类型

        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new CitedPatentsNumberCounter(), args);
        System.exit(exitCode);
    }
}

需求三

  • 在需求二输出的被引用数量数据的基础上,统计被引用数量的频次

思路

  • 将输入数据替换成需求二的输出数据即可
  • 为了重用Mapper里面的OutputKey和OutputValue,将它们在类中初始化
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class CitationFrequencyStatistics extends Configured implements Tool {
    public static class MapClass extends MapReduceBase implements Mapper<Text, Text, IntWritable, IntWritable> {
        private final  static IntWritable UNO = new IntWritable(1);  // 单位1
        private IntWritable citationCount = new IntWritable();


        @Override
        public void map(Text key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
                throws IOException {
            citationCount.set(Integer.parseInt(value.toString()));
            output.collect(citationCount, UNO);  // 关键点
        }
    }

    public static class ReduceClass extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        @Override
        public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, IntWritable> output,
                           Reporter reporter) throws IOException {
            int count = 0;
            while (values.hasNext()){
                values.next();
                count++;
            }
            output.collect(key, new IntWritable(count));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf job = new JobConf(getConf(), getClass());

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("CitationFrequencyStatistics");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);  // 同时设置K2,V2和K3,V3的类型

        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new CitationFrequencyStatistics(), args);
        System.exit(exitCode);
    }
}

Hadoop的新API

Hadoop最新版本的MapReduce Release 0.20.0的API包括了一个全新的Mapreduce JAVA API,有时候也称为上下文对象。

  新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用 。

  新的API和旧的API之间有下面几个明显的区别。

  • 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
  • 新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
  • 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
  • 新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
  • 新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。

基于新api重写的Hadoop基础程序模板

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

public class MyJob extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] citation = value.toString().split(",");
            context.write(new Text(citation[1]), new Text(citation[0]));
        }
    }

    public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String csv = "";
            for (Text val: values) {
                if (csv.length() > 0) csv += ",";
                csv += val.toString();
            }
            context.write(key, new Text(csv));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "Myjob");
        job.setJarByClass(MyJob.class);

        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        return 0;
    }
}

原文地址:https://www.cnblogs.com/vvlj/p/14099275.html