Apache Nutch 1.3 学习笔记三(Inject)

1. Inject是干嘛的?

NutchInject是用来把文本格式的url列表注入到抓取数据库中,一般是用来引导系统的初始化。
这里的文本格式如下:

 

  1. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source  

这里的url与其元数据之间用Tab隔开,这里有两个保留的元数据,如下
nutch.score :
表示特定url的分数
nutch.fetchInterval :
表示特定url的抓取间隔,单位为毫秒
Inject
注入后生成的数据库为二进制结构,是HadoopMapSequenceFileOutputFormat格式

2. Inject运行命令

 

  1. bin/nutch inject <url_dir> <crawl_db>  


在本地运行后的输出结果如下:

 

  1. Injector: starting at 2011-08-23 10:14:10  
  2. Injector: crawlDb: db/crawldb  
  3. Injector: urlDir: urls  
  4. Injector: Converting injected urls to crawl db entries.  
  5. Injector: Merging injected urls into crawl db.  
  6. Injector: finished at 2011-08-23 10:14:12, elapsed: 00:00:02  


你可以用如下命令来查看其数据库内容

 

  1. bin/nutch readdb <crawl_db> -stats -sort  


在本机的输出如下:

 

  1. rawlDb statistics start: db/crawldb  
  2. Statistics for CrawlDb: db/crawldb  
  3. TOTAL urls: 1  
  4. retry 0:    1  
  5. min score:  1.0  
  6. avg score:  1.0  
  7. max score:  1.0  
  8. status 1 (db_unfetched):    1  
  9.    www.baidu.com :  1  
  10. CrawlDb statistics: done  

3. Inject源代码分析

   我们知道Injector.javaNutch源代码中的位置为org.apache.nutch.crawl.Injector.java
   
其中有一个main入口函数,使用Hadoop的工具类ToolRunner来运行其实例
   
但其是终入口函数还是void inject(Path crawlDb, Path urlDir)
   
其中有两个MP任务,第一个主要是把文件格式的输入转换成<url,CrawlDatum>格式的输出,这里的
   CrawlDatum
Nutch对于单个抓取url对象的一个抽象,其中有很多url的相关信息
   
第二个MP主要是把上面新生成的输出与旧的CrawlDb数据进行合并,生成一个新的CrawlDb

3.1 对于Inject中第一个MP任务的分析

   第一个MP任务主要代码如下:
    

 

  1. JobConf sortJob = new NutchJob(getConf()); // 生成一个Nutch的配置抽象  
  2.     sortJob.setJobName("inject " + urlDir);  
  3.     FileInputFormat.addInputPath(sortJob, urlDir); // 设置InputFormat,这里为FileInputFormat,这里要注意的是可以调用多次addInputPath这个方法,效果是会有多个输入源  
  4.     sortJob.setMapperClass(InjectMapper.class);    // 这里设置了Mapper方法,主要是用于解析、过滤和规格化url文本,把其转换成<url,CrawlDatum>格式  
  5.     
  6.     
  7.     FileOutputFormat.setOutputPath(sortJob, tempDir); // 这里定义了一个输出路径,这里的tempDir=mapred.temp.dir/inject-temp-Random()  
  8.     sortJob.setOutputFormat(SequenceFileOutputFormat.class); // 这里配置了输出格式,这里为SequenceFileOutputFormat,这是MP的一种二进制输出结构  
  9.     sortJob.setOutputKeyClass(Text.class);         // 这里配置了MP的输出<key,value>的类型,这里为<Text,CrawlDatum>  
  10.     sortJob.setOutputValueClass(CrawlDatum.class);  
  11.     sortJob.setLong("injector.current.time", System.currentTimeMillis());  
  12.     JobClient.runJob(sortJob);                     // 这里用于提交任务到JobTracker,让其运行任务  


这里对InjectMapper中的主要代码进行分析:
这个类主要用于对url进行解析、过滤和规格化

 

  1. public void map(WritableComparable key, Text value,  
  2.                    OutputCollector<Text, CrawlDatum> output, Reporter reporter)  
  3.      throws IOException {  
  4.      String url = value.toString();              // value is line of text  
  5.     
  6.     
  7.      if (url != null && url.trim().startsWith("#")) {   // 这里以#号开头的文本就过滤  
  8.          /* Ignore line that start with # */  
  9.          return;  
  10.      }  
  11.     
  12.     
  13.      // if tabs : metadata that could be stored  
  14.      // must be name=value and separated by \t  
  15.      float customScore = -1f;  
  16.      int customInterval = interval;  
  17.      Map<String,String> metadata = new TreeMap<String,String>();  // 设置属性的一个容器  
  18.      if (url.indexOf("\t")!=-1){  
  19.       String[] splits = url.split("\t");    // 对一行文本进行切分  
  20.       url = splits[0];  
  21.       for (int s=1;s<splits.length;s++){  
  22.           // find separation between name and value  
  23.           int indexEquals = splits[s].indexOf("=");  
  24.           if (indexEquals==-1) {  
  25.               // skip anything without a =  
  26.               continue;           
  27.           }  
  28.           String metaname = splits[s].substring(0, indexEquals);   // 得到元数据的名字  
  29.           String metavalue = splits[s].substring(indexEquals+1);   // 得到元数据的值  
  30.           if (metaname.equals(nutchScoreMDName)) {         // 看是不是保留的元数据  
  31.               try {  
  32.               customScore = Float.parseFloat(metavalue);}  
  33.               catch (NumberFormatException nfe){}  
  34.           }  
  35.           else if (metaname.equals(nutchFetchIntervalMDName)) {  
  36.               try {  
  37.                   customInterval = Integer.parseInt(metavalue);}  
  38.               catch (NumberFormatException nfe){}  
  39.           }  
  40.           else metadata.put(metaname,metavalue);   // 如果这个元数据不是保留的元数据,就放到容器中  
  41.       }  
  42.      }  
  43.      try {  
  44.        url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);   // url进行规格化,这里调用的是plugins中的插件  
  45.        url = filters.filter(url);             // filter the url            // url进行过滤  
  46.      } catch (Exception e) {  
  47.        if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }  
  48.        url = null;  
  49.      }  
  50.      if (url != null) {                          // if it passes  
  51.        value.set(url);                           // collect it  
  52.     // 这里生成一个CrawlDatum对象,设置一些url的初始化数据  
  53.        CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, customInterval);  
  54.        datum.setFetchTime(curTime);    // 设置当前url的抓取时间  
  55.        // now add the metadata  
  56.        Iterator<String> keysIter = metadata.keySet().iterator();  
  57.        while (keysIter.hasNext()){     // 配置其元数据  
  58.         String keymd = keysIter.next();  
  59.         String valuemd = metadata.get(keymd);  
  60.         datum.getMetaData().put(new Text(keymd), new Text(valuemd));  
  61.        }  
  62.     // 设置初始化分数  
  63.        if (customScore != -1) datum.setScore(customScore);  
  64.        else datum.setScore(scoreInjected);  
  65.        try {  
  66.         // 这里对url的分数进行初始化  
  67.         scfilters.injectedScore(value, datum);  
  68.        } catch (ScoringFilterException e) {  
  69.         if (LOG.isWarnEnabled()) {  
  70.             LOG.warn("Cannot filter injected score for url " + url  
  71.                     + ", using default (" + e.getMessage() + ")");  
  72.         }  
  73.        }  
  74.     // Map 收集相应的数据,类型为<Text,CrawlDatum>  
  75.        output.collect(value, datum);  
  76.      }  
  77.    }  
  78.  }  

   3.2 第二个MP任务的分析

第二个MP任务主要是对crawlDb进行合并,源代码如下:

 

  1. // merge with existing crawl db  
  2.         JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); // 这里对Job进行相应的配置  
  3.         FileInputFormat.addInputPath(mergeJob, tempDir);          // 这里配置了输入的文本数据,就是上面第一个MP任务的输出  
  4.         mergeJob.setReducerClass(InjectReducer.class);            // 这里配置了Reduce的抽象类,这里会覆盖上面createJob设置的Reduce  
  5.         JobClient.runJob(mergeJob);                               // 提交运行任务  
  6.         CrawlDb.install(mergeJob, crawlDb);                       // 把上面新生成的目录重命名为crawlDb的标准文件夹名,然后再删除老的目录  
  7.     
  8.     
  9.         // clean up  
  10.         FileSystem fs = FileSystem.get(getConf());  
  11.         fs.delete(tempDir, true);                                 // 把第一个MP任务的输出目录删除  


下面是createJob的源代码说明:

 

  1. public static JobConf createJob(Configuration config, Path crawlDb) throws IOException {  
  2.             // 生成新的CrawlDb文件名  
  3.             Path newnewCrawlDb =  new Path(crawlDb,Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));  
  4.     
  5.     
  6.             JobConf job = new NutchJob(config);   // 生成相应的Job配置抽象  
  7.             job.setJobName("crawldb " + crawlDb);  
  8.     
  9.     
  10.             Path current = new Path(crawlDb, CURRENT_NAME);  
  11.             if (FileSystem.get(job).exists(current)) {   // 如果存在老的CrawlDb目录,将其加入InputPath路径中,和上面的tempDir一起进行合并  
  12.                 FileInputFormat.addInputPath(job, current);    
  13.             }  
  14.             // NOTE:有没有注意到这里如果有老的CrawlDb目录的话,那它的文件格式是MapFileOutputFormat,而下面对其读取用了SequenceFileInputFormat来读,因为这两个类底层都是调用了SequenceFileReaderWriter来读写的,所以可以通用。  
  15.             job.setInputFormat(SequenceFileInputFormat.class);  // 设置CrawlDb目录文件的格式为SequenceFileInputFormat  
  16.     
  17.     
  18.             job.setMapperClass(CrawlDbFilter.class);     // 设置相应的Map操作,主要是过滤和规格化url  
  19.             job.setReducerClass(CrawlDbReducer.class);   // 设置相应的Reduce操作,主要是对url进行聚合  
  20.     
  21.     
  22.             FileOutputFormat.setOutputPath(job, newCrawlDb);  // 设置新的输出路径  
  23.             job.setOutputFormat(MapFileOutputFormat.class);   // 设置输出的格式,这里是MapFileOutputFormat  
  24.             // 这里设置了输出的类型<Text,CrawlDatum>  
  25.             job.setOutputKeyClass(Text.class);  
  26.             job.setOutputValueClass(CrawlDatum.class);  
  27.     
  28.     
  29.     return job;  
  30.   }  



下面来看看覆盖的InjectReducer都干了些什么,部分源代码如下:

 

  1. public void reduce(Text key, Iterator<CrawlDatum> values,OutputCollector<Text, CrawlDatum> output, Reporter reporter)  
  2.           throws IOException {  
  3.                 boolean oldSet = false;  
  4.             // 把相同url聚合后的结果进行处理,这个循环主要是新注入的url与老的url有没有相同的,  
  5.             // 如果有相同的话就不设置其状态,支持collect出去了  
  6.                 while (values.hasNext()) {  
  7.             CrawlDatum val = values.next();  
  8.             if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {  
  9.                     injected.set(val);  
  10.                     injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);  
  11.             } else {  
  12.                     old.set(val);  
  13.                     oldSet = true;  
  14.             }  
  15.             }     
  16.                 CrawlDatum res = null;  
  17.                 if (oldSet) res = old; // don't overwrite existing value  
  18.                 else res = injected;  
  19.     
  20.     
  21.                 output.collect(key, res);  
  22.         }  
  23.             }  


最后来看一下CrawlDb.install方法都干了些什么,其源代码如下:

 

  1. public static void install(JobConf job, Path crawlDb) throws IOException {  
  2.     Path newCrawlDb = FileOutputFormat.getOutputPath(job);  // 得到第二个MP任务的输出目录  
  3.     FileSystem fs = new JobClient(job).getFs();  
  4.     Path old = new Path(crawlDb, "old");  
  5.     Path current = new Path(crawlDb, CURRENT_NAME);         // 得到CrawlDb的正规目录名,也就是有没有老的CrawlDb  
  6.     if (fs.exists(current)) {  
  7.     // 如果有老的CrawlDb目录,把老的目录名生命名为old这个名字  
  8.             if (fs.exists(old)) fs.delete(old, true);   // 这里判断old这个目录是不是已经存在,如果存在就删除之  
  9.             fs.rename(current, old);  
  10.     }  
  11.     fs.mkdirs(crawlDb);  
  12.     fs.rename(newCrawlDb, current);                 // 这里是把第二个MP任务的输出目录重命名为current目录,也就是正规目录名  
  13.     if (fs.exists(old)) fs.delete(old, true);       // 删除重使名后的老的CrawlDb目录  
  14.     Path lock = new Path(crawlDb, LOCK_NAME);  
  15.     LockUtil.removeLockFile(fs, lock);              // 目录解锁  
  16.     }  

4. 总结

Inject主要是从文本文件中注入新的url,使其与老的crawlDb中的url进行合并,然后把老的CrawlDb目录删除,现把新生成的CrawlDb临时目录重命名为CrawlDb目录名。
流程如下:
url_dir -> MapReduce1(inject new urls) -> MapReduece2(merge new urls with old crawlDb) -> install new CrawlDb -> clean up

作者:http://blog.csdn.net/amuseme_lu


相关文章阅读及免费下载:

Apache Nutch 1.3 学习笔记目录

Apache Nutch 1.3 学习笔记一

Apache Nutch 1.3 学习笔记二

Apache Nutch 1.3 学习笔记三(Inject)

Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)

Apache Nutch 1.3 学习笔记四(Generate)

Apache Nutch 1.3 学习笔记四(SegmentReader分析)

Apache Nutch 1.3 学习笔记五(FetchThread)

Apache Nutch 1.3 学习笔记五(Fetcher流程)

Apache Nutch 1.3 学习笔记六(ParseSegment)

Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)

Apache Nutch 1.3 学习笔记八(LinkDb)

Apache Nutch 1.3 学习笔记九(SolrIndexer)

Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)

Apache Nutch 1.3 学习笔记十(插件扩展)

Apache Nutch 1.3 学习笔记十(插件机制分析)

Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)

Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)

Apache Nutch 1.3 学习笔记十二(Nutch 2.0 的主要变化)

更多《Apache Nutch文档》,尽在开卷有益360 http://www.docin.com/book_360

原文地址:https://www.cnblogs.com/ibook360/p/2221470.html