Heritrix 3.1.0 源码解析(三十五)

本文接下来先分析CandidatesProcessor处理器,我们称之为候选处理器,该处理器的功能是对网页抽取的外链进行过滤,通过过滤的链接则添加到Frontier边界部件的BdbWorkQueue工作队列;CandidatesProcessor处理器对CrawlURI candidate对象的过滤功能是通过CandidateChain处理器链处理的,该处理器链包括两大处理器,分别为org.archive.crawler.prefetch.CandidateScoper处理器与org.archive.crawler.prefetch.FrontierPreparer处理器

CandidatesProcessor处理器的处理方法如下:

 /* (non-Javadoc)
     * @see org.archive.modules.Processor#innerProcess(org.archive.modules.CrawlURI)
     */
    @Override
    protected void innerProcess(final CrawlURI curi) throws InterruptedException {
        // Handle any prerequisites when S_DEFERRED for prereqs
        //处理先决条件
        if (curi.hasPrerequisiteUri() && curi.getFetchStatus() == S_DEFERRED) {
            CrawlURI prereq = curi.getPrerequisiteUri();
            prereq.setFullVia(curi); 
            sheetOverlaysManager.applyOverlaysTo(prereq);
            try {
                KeyedProperties.clearOverridesFrom(curi); 
                KeyedProperties.loadOverridesFrom(prereq);
                //Candidate处理器链
                getCandidateChain().process(prereq, null);
                
                if(prereq.getFetchStatus()>=0) {
                    //System.out.println("prereq:"+prereq.toString());
                    frontier.schedule(prereq);
                } else {
                    curi.setFetchStatus(S_PREREQUISITE_UNSCHEDULABLE_FAILURE);
                }
            } finally {
                KeyedProperties.clearOverridesFrom(prereq); 
                KeyedProperties.loadOverridesFrom(curi);
            }
            return;
        }

        // Don't consider candidate links of error pages
        //通常为先决条件
        if (curi.getFetchStatus() < 200 || curi.getFetchStatus() >= 400) {
            curi.getOutLinks().clear();
            return;
        }
        //遍历外链
        for (Link wref: curi.getOutLinks()) {
            
            CrawlURI candidate;
            try {
                //根据外链构造CrawlURI对象
                candidate = curi.createCrawlURI(curi.getBaseURI(),wref);
                // at least for duration of candidatechain, offer
                // access to full CrawlURI of via
                candidate.setFullVia(curi); 
            } catch (URIException e) {
                loggerModule.logUriError(e, curi.getUURI(), 
                        wref.getDestination().toString());
                continue;
            }

            sheetOverlaysManager.applyOverlaysTo(candidate);
            try {
                KeyedProperties.clearOverridesFrom(curi); 
                KeyedProperties.loadOverridesFrom(candidate);
                //从种子CrawlURI curi跳转过来的CrawlURI candidate设置为种子
                if(getSeedsRedirectNewSeeds() && curi.isSeed() 
                        && wref.getHopType() == Hop.REFER
                        && candidate.getHopCount() < SEEDS_REDIRECT_NEW_SEEDS_MAX_HOPS) {
                    candidate.setSeed(true);                     
                }
                getCandidateChain().process(candidate, null); 
                if(candidate.getFetchStatus()>=0) {
                    //seed
                    if(checkForSeedPromotion(candidate)) {
                        /*
                         * We want to guarantee crawling of seed version of
                         * CrawlURI even if same url has already been enqueued,
                         * see https://webarchive.jira.com/browse/HER-1891
                         */
                        candidate.setForceFetch(true);
                        //System.out.println("candidate addSeed:"+candidate.toString());
                        getSeeds().addSeed(candidate);
                    } else {
                        //System.out.println("candidate:"+candidate.toString());
                        frontier.schedule(candidate);
                    }
                    //候选链接记录
                    curi.getOutCandidates().add(candidate);
                }
                
            } finally {
                KeyedProperties.clearOverridesFrom(candidate); 
                KeyedProperties.loadOverridesFrom(curi);
            }
        }
        curi.getOutLinks().clear();
    }

我在代码里面已经加了注释,该方法首先是判断当前CrawlURI curi对象是否存在先决条件,如果存在,则将先决条件进入CandidateChain处理器链,如果符合条件(prereq.getFetchStatus()>=0),则将该先决条件添加到Frontier边界部件的BdbWorkQueue工作队列

后面部分是遍历CrawlURI curi对象的外链,根据当前CrawlURI curi对象和外链链接构建CrawlURI candidate对象,然后同样进入CandidateChain处理器链,通过过滤的CrawlURI candidate对象同样添加到Frontier边界部件的BdbWorkQueue工作队列,最后清空外链

CrawlURI candidate对象是怎么创建的,调用当前CrawlURI curi对象的CrawlURI createCrawlURI(UURI baseUURI, Link link)方法

 /**
     * Utility method for creation of CandidateURIs found extracting
     * links from this CrawlURI.
     * @param baseUURI BaseUURI for <code>link</code>.
     * @param link Link to wrap CandidateURI in.
     * @return New candidateURI wrapper around <code>link</code>.
     * @throws URIException
     */
    public CrawlURI createCrawlURI(UURI baseUURI, Link link)
    throws URIException {
        UURI u = (link.getDestination() instanceof UURI)?
            (UURI)link.getDestination():
            UURIFactory.getInstance(baseUURI,
                link.getDestination().toString());
        CrawlURI newCaURI = new CrawlURI(u, 
                extendHopsPath(getPathFromSeed(),link.getHopType().getHopChar()),
                getUURI(), link.getContext());
        newCaURI.inheritFrom(this);
        return newCaURI;
    }

新的CrawlURI candidate对象的String pathFromSeed属性是由其父级CrawlURI curi对象的String pathFromSeed属性和当前链接的Hop hop属性构建出来的

/**
     * Extend a 'hopsPath' (pathFromSeed string of single-character hop-type symbols),
     * keeping the number of displayed hop-types under MAX_HOPS_DISPLAYED. For longer
     * hops paths, precede the string with a integer and '+', then the displayed 
     * hops. 
     * 
     * @param pathFromSeed
     * @param hopChar
     * @return
     */
    public static String extendHopsPath(String pathFromSeed, char hopChar) {
        if(pathFromSeed.length()<MAX_HOPS_DISPLAYED) {
            return pathFromSeed + hopChar;
        }
        int plusIndex = pathFromSeed.indexOf('+');
        int prevOverflow = (plusIndex<0) ? 0 : Integer.parseInt(pathFromSeed.substring(0,plusIndex));
        return (prevOverflow+1)+"+"+pathFromSeed.substring(plusIndex+2)+hopChar; 
    }

里面的boolean checkForSeedPromotion(CrawlURI curi)方法检查CrawlURI curi对象是否seed种子(从种子URL跳转的)

/**
     * Check if the URI needs special 'discovered seed' treatment.
     * 
     * @param curi
     */
    protected boolean checkForSeedPromotion(CrawlURI curi) {
        if (curi.isSeed() && curi.getVia() != null
                && curi.flattenVia().length() > 0) {
            // The only way a seed can have a non-empty via is if it is the
            // result of a seed redirect. Returning true here schedules it 
            // via the seeds module, so it may affect scope and be logged 
            // as 'discovered' seed.
            //
            // This is a feature. This is handling for case where a seed
            // gets immediately redirected to another page. What we're doing is
            // treating the immediate redirect target as a seed.
            
            // And it needs rapid scheduling.
            //设置调度等级
            if (curi.getSchedulingDirective() == SchedulingConstants.NORMAL) {
                curi.setSchedulingDirective(SchedulingConstants.MEDIUM);
            }
            return true; 
        }
        return false;
    }

CandidateChain处理器链的第一个处理器为CandidateScoper,该处理器继承自Scoper类,对当前CrawlURI caUri对象的范围判断是通过调用DecideRule scope成员的DecideResult decisionFor(CrawlURI uri)方法返回的结果进项判断的,代码比较简单(关于DecideRule类,我在前面的文章已经有过分析,里面是通过迭代调用DecideRule类型集合的成员的方法,这里不再重复)

/**
     * Schedule the given {@link CrawlURI CrawlURI} with the Frontier.
     * @param caUri The CrawlURI to be scheduled.
     * @return true if CrawlURI was accepted by crawl scope, false
     * otherwise.
     */
    protected boolean isInScope(CrawlURI caUri) {
        boolean result = false;
        //System.out.println(this.getClass().getName()+":"+"scope name:"+scope.getClass().getName());
        DecideResult dr = scope.decisionFor(caUri);
        if (dr == DecideResult.ACCEPT) {
            result = true;
            if (fileLogger != null) {
                fileLogger.info("ACCEPT " + caUri); 
            }
        } else {
            outOfScope(caUri);
        }
        return result;
    }

CandidateChain处理器链的第二个处理器为FrontierPreparer,该处理器的功能为当前CrawlURI uri对象在进入边界部件Frontier之前设置相关策略(该处理器前面文章已解析,这里不再重复)

/**
     * Apply all configured policies to CrawlURI
     * 
     * @param curi CrawlURI
     */
    public void prepare(CrawlURI curi) {
        
        // set schedulingDirective
        curi.setSchedulingDirective(getSchedulingDirective(curi));
            
        // set canonicalized version
        curi.setCanonicalString(canonicalize(curi));
        
        // set queue key
        curi.setClassKey(getClassKey(curi));
        
        // set cost
        curi.setHolderCost(getCost(curi));
        
        // set URI precedence
        getUriPrecedencePolicy().uriScheduled(curi);


    }

接下来要分析的处理器为DispositionProcessor,我们可以称之为后置处理器,其主要功能为更新服务器信息和设置队列延迟时间

 @Override
    protected void innerProcess(CrawlURI puri) {
        CrawlURI curi = (CrawlURI)puri;
        
        // Tally per-server, per-host, per-frontier-class running totals
        CrawlServer server = serverCache.getServerFor(curi.getUURI());

        String scheme = curi.getUURI().getScheme().toLowerCase();
        if (scheme.equals("http") || scheme.equals("https") &&
                server != null) {
            // Update connection problems counter
            if(curi.getFetchStatus() == S_CONNECT_FAILED || curi.getFetchStatus() == S_CONNECT_LOST ) {
                server.incrementConsecutiveConnectionErrors();
            } else if (curi.getFetchStatus() > 0){
                server.resetConsecutiveConnectionErrors();
            }

            // Update robots info
            try {
                if ("/robots.txt".equals(curi.getUURI().getPath()) && curi.getFetchStatus() != S_DEFERRED) {
                    // shortcut retries  w/ DEEMED when ignore-all
                    if (metadata.getRobotsPolicy() instanceof IgnoreRobotsPolicy) {
                        if(curi.getFetchStatus() < 0 && curi.getFetchStatus()!=S_DEFERRED) {
                            // prevent the rest of the usual retries
                            curi.setFetchStatus(S_DEEMED_NOT_FOUND);
                        }
                    }
                    
                    // Update server with robots info
                    // NOTE: in some cases the curi's status can be changed here
                    server.updateRobots(curi);
                }
            }
            catch (URIException e) {
                logger.severe("Failed get path on " + curi.getUURI());
            }
        }
        
        // set politeness delay
        curi.setPolitenessDelay(politenessDelayFor(curi));
        
        // consider operator-set force-retire
        if (getForceRetire()) {
            curi.setForceRetire(true);
        }
        
        // TODO: set other disposition decisions
        // success, failure, retry(retry-delay)
    }

计算队列延迟时间的方法如下

/**
     * Update any scheduling structures with the new information in this
     * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
     * the same host to be visited within the appropriate politeness window.
     * 
     * @param curi
     *            The CrawlURI
     * @return millisecond politeness delay
     */
    protected long politenessDelayFor(CrawlURI curi) {
        long durationToWait = 0;
        Map<String,Object> cdata = curi.getData();
        if (cdata.containsKey(A_FETCH_BEGAN_TIME)
                && cdata.containsKey(A_FETCH_COMPLETED_TIME)) {

            long completeTime = curi.getFetchCompletedTime();
            long durationTaken = (completeTime - curi.getFetchBeginTime());
            durationToWait = (long)(getDelayFactor() * durationTaken);

            long minDelay = getMinDelayMs();
            if (minDelay > durationToWait) {
                // wait at least the minimum
                durationToWait = minDelay;
            }

            long maxDelay = getMaxDelayMs();
            if (durationToWait > maxDelay) {
                // wait no more than the maximum
                durationToWait = maxDelay;
            }
            
            long respectThreshold = getRespectCrawlDelayUpToSeconds() * 1000;
            if (durationToWait<respectThreshold) {
                // may need to extend wait
                CrawlServer s = getServerCache().getServerFor(curi.getUURI());
                String ua = curi.getUserAgent();
                if (ua == null) {
                    ua = metadata.getUserAgent();
                }
                Robotstxt rep = s.getRobotstxt();
                if (rep != null) {
                    long crawlDelay = (long)(1000 * rep.getDirectivesFor(ua).getCrawlDelay());
                    crawlDelay = 
                        (crawlDelay > respectThreshold) 
                            ? respectThreshold 
                            : crawlDelay;
                    if (crawlDelay > durationToWait) {
                        // wait at least the directive crawl-delay
                        durationToWait = crawlDelay;
                    }
                }
            }
            
            long now = System.currentTimeMillis();
            int maxBandwidthKB = getMaxPerHostBandwidthUsageKbSec();
            if (maxBandwidthKB > 0) {
                // Enforce bandwidth limit
                ServerCache cache = this.getServerCache();
                CrawlHost host = cache.getHostFor(curi.getUURI());
                long minDurationToWait = host.getEarliestNextURIEmitTime()
                        - now;
                float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor
                long processedBytes = curi.getContentSize();
                host
                        .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth)
                                + now);

                if (minDurationToWait > durationToWait) {
                    durationToWait = minDurationToWait;
                }
            }
        }
        return durationToWait;
    }

如果我们需要更改队列延迟时间,可以在配置文件crawler-beans.cxml里面设置相关参数

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/07/3065205.html

原文地址:https://www.cnblogs.com/chenying99/p/3065205.html