Heritrix 3.1.0 源码解析(三十六)

接下来本文还要继续分析Heritrix3.1.0系统中的CrawlController类及BdbFrontier类,因为本人觉得前面部分对相关逻辑还没用理清头绪,更重要的原因是由于每篇文章的关注点不同,本人不能在同一篇文章将相关类的所有关注点一一道来

本文要分析的是,Heritrix3.1.0系统是怎样控制采集任务的启动、暂停及停止等相关状态(如果具备状态机的背景知识,也许会更容易理解相关逻辑)

CrawlController对象的状态为枚举类型,共有如下状态

public static enum State {
        NASCENT, RUNNING, EMPTY, PAUSED, PAUSING, 
        STOPPING, FINISHED, PREPARING 
    }

    transient private State state = State.NASCENT;

其初始状态为State.NASCENT

(细心的读者会发现,CrawlController对象里面还有另外一个枚举类型CrawlStatus 

/**
     * Crawl exit status.
     */
    private transient CrawlStatus sExit = CrawlStatus.CREATED;

由于该枚举类型仅用于描述状态,不参与执行逻辑的处理,所以本文对该成员不予深究)

当我们执行采集任务的相关命令时,CrawlController对象的状态同步改变,事件发布方法如下 

/**
     * 改变状态   发布事件
     * Send crawl change event to all listeners.
     * @param newState State change we're to tell listeners' about.
     * @param message Message on state change.
     */
    protected void sendCrawlStateChangeEvent(State newState, 
            CrawlStatus status) {
        if(this.state == newState) {
            // suppress duplicate state-reports
            return;
        }
        this.state = newState; 
        CrawlStateEvent event = new CrawlStateEvent(this,newState,status.getDescription());
        appCtx.publishEvent(event); 
    }

在上面方法中,除了设置CrawlController对象的状态,同时发布事件,事件类型为CrawlStateEvent(这里的CrawlController对象作为observer模式的subject角色)

CrawlJob类实现了ApplicationListener接口,作为该事件监听者之一(相当于observer模式的observer角色),事件监听方法如下

 /** 
     * 事件监听程序
     * Log note of all ApplicationEvents.
     * 
     * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
     */
    public void onApplicationEvent(ApplicationEvent event) {
        if(event instanceof CrawlStateEvent) {
            getJobLogger().log(Level.INFO, ((CrawlStateEvent)event).getState() + 
                    (ac.getCurrentLaunchId() != null ? " " + ac.getCurrentLaunchId() : ""));
        }
        //event.getSource();
        synchronized (this) {
            //当正在执行synchronized boolean teardown()方法(里面调用CrawlController对象的void requestCrawlStop()方法)
//并且还没有执行完成时 needTeardown=true
//doTeardown()方法后,needTeardown重置为false,保证方法只执行一次
if (needTeardown && event instanceof StopCompleteEvent) { doTeardown(); } } if(event instanceof CheckpointSuccessEvent) { getJobLogger().log(Level.INFO, "CHECKPOINTED "+((CheckpointSuccessEvent)event).getCheckpoint().getName()); } }

Heritrix3.1.0系统里面,有多种事件监听者,它们分别监听不同的事件,当相关事件发生时,执行自身的事件处理方法,我们用eclipse开发工具可以看到,有如下事件监听类(实现了ApplicationListener接口)

那么在Heritrix3.1.0系统里面又有哪些可发布的事件类型呢

在Heritrix3.1.0系统里面又有哪些类发布了事件呢

某个问题如果我们穷追不舍的话,总是会越来越多,这里就此打住,有兴趣的读者可以去深究这些事件的来龙去脉(本人预计,事件监听者大多是打印日志功能)

现在回到主题,当我们执行采集任务的 launch命令时(CrawlJob对象的void launch()方法),CrawlController对象的状态开始变为State.PREPARING,再改变为State.PAUSED

State.NASCENT——>State.PREPARING——>State.PAUSED

/** 
     * CrawlJob对象的void launch()方法 launch命令
     * Operator requested crawl begin
     */
    public void requestCrawlStart() {
        hasStarted = true; 
        //改变状态为State.PREPARING
        sendCrawlStateChangeEvent(State.PREPARING, CrawlStatus.PREPARING);
        
        if(recoveryCheckpoint==null) {
            // only announce (trigger scheduling of) seeds
            // when doing a cold (non-recovery) start
            getSeeds().announceSeeds();
        }
        
        setupToePool();

        // A proper exit will change this value.
        this.sExit = CrawlStatus.FINISHED_ABNORMAL;
        
        if (getPauseAtStart()) {
            // frontier is already paused unless started, so just 
            // 'complete'/ack pause
            //改变状态为State.PAUSED
            completePause();
        } else {
            getFrontier().run();
        }
    }

上面方法中,后面部分的判断为在执行采集任务的 launch命令时,初始状态是否启动BdbFrontier对象的void run()方法(启动采集任务)

当我们执行采集任务的 Unpause命令时,CrawlController对象的状态变为State.RUNNING(预先状态为state== State.PAUSING || state == State.PAUSED)

/**
     * Unpause命令
     * Resume crawl from paused state
     */
    public void requestCrawlResume() {
        if (state != State.PAUSING && state != State.PAUSED) {
            // Can't resume if not been told to pause
            return;
        }
        
        assert toePool != null;
        
        Frontier f = getFrontier();
        f.unpause();
        //改变状态
        sendCrawlStateChangeEvent(State.RUNNING, CrawlStatus.RUNNING);
    }

上面方法中,首先执行BdbFrontier对象的void unpause()方法,然后改变CrawlController对象的状态

当我们执行采集任务的 pause命令时,CrawlController对象的状态变为State.PAUSING(预先状态与上面相反,state != State.PAUSING && state != State.PAUSED)

/**
     * pause命令
     * Stop the crawl temporarly.
     */
    public synchronized void requestCrawlPause() {
        if (state == State.PAUSING || state == State.PAUSED) {
            // Already about to pause
            return;
        }
        sExit = CrawlStatus.WAITING_FOR_PAUSE;
        getFrontier().pause();
        //改变状态为State.PAUSING
        sendCrawlStateChangeEvent(State.PAUSING, this.sExit);
        // wait for pause to come via frontier changes
    }

上面方法中,首先执行BdbFrontier对象的void pause()方法,然后改变CrawlController对象的状态

当我们执行采集任务的terminate命令时,CrawlController对象的状态变为State.STOPPING(预先状态为state != State.STOPPING && state != State.FINISHED)

 /**
     * terminate命令/teardown命令
     * Operator requested for crawl to stop.
     */
    public synchronized void requestCrawlStop() {
        if(state == State.STOPPING) {
            // second stop request; nudge the threads with interrupts
            getToePool().cleanup();
        }
        requestCrawlStop(CrawlStatus.ABORTED);
    }

进一步调用synchronized void requestCrawlStop(CrawlStatus message)方法

/**
     * Operator requested for crawl to stop.
     * @param message 
     */
    public synchronized void requestCrawlStop(CrawlStatus message) {
        if (state == State.NASCENT) {
            this.sExit = message;
            this.state = State.FINISHED;
            this.isStopComplete = true;
        }
        if (state == State.STOPPING || state == State.FINISHED ) {
            return;
        }
        if (message == null) {
            throw new IllegalArgumentException("Message cannot be null.");
        }
        if(this.sExit != CrawlStatus.FINISHED) {
            // don't clobber an already-FINISHED with alternate status
            this.sExit = message;
        }
        beginCrawlStop();
    }

进一步调用void beginCrawlStop()方法

/**
     * Start the process of stopping the crawl. 
     */
    public void beginCrawlStop() {
        LOGGER.fine("Started.");
        //改变状态为State.STOPPING
        sendCrawlStateChangeEvent(State.STOPPING, this.sExit);
        Frontier frontier = getFrontier();
        if (frontier != null) {
            frontier.terminate();
        }
        LOGGER.fine("Finished."); 
    }

上面方法中,首先改变CrawlController对象的状态,然后执行BdbFrontier对象的void terminate()方法

当我们执行采集任务的teardown命令时,我们先从CrawlJob对象的synchronized boolean teardown()方法开始分析

/**
     * Ensure a fresh start for any configuration changes or relaunches,
     * by stopping and discarding an existing ApplicationContext.
     * 
     * @return true if teardown is complete when method returns, false if still in progress
     */
    public synchronized boolean teardown() {
        CrawlController cc = getCrawlController();
        if (cc != null) {
            cc.requestCrawlStop();
            needTeardown = true;
            
            // wait up to 3 seconds for stop
            for(int i = 0; i < 11; i++) {
                if(cc.isStopComplete()) {
                    break;
                }
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
            //判断CrawlController cc的成员属性(boolean isStopComplete=true)
            //设置needTeardown = false
            //关闭PathSharingContext ac对象
            if (cc.isStopComplete()) {
                doTeardown();
            }
        }
        
        assert needTeardown == (ac != null);
        return !needTeardown; 
    }

同样首先调用CrawlController对象的synchronized void requestCrawlStop()方法(二次调用会强制中断ToePool toePool中的线程)

/**
     * terminate命令/teardown命令
     * Operator requested for crawl to stop.
     */
    public synchronized void requestCrawlStop() {
        if(state == State.STOPPING) {
            // second stop request; nudge the threads with interrupts
            getToePool().cleanup();
        }
        requestCrawlStop(CrawlStatus.ABORTED);
    }

我们不能孤立的来分析CrawlController对象的状态,因为在执行命令调用的相关方法里面,都会执行相应的BdbFrontier对象的相关方法,而在BdbFrontier对象里面同时会回调CrawlController对象的方法,从而使CrawlController对象的状态改变

BdbFrontier对象同样也维持自身的状态,这个状态成员同样也是枚举类型,不过跟CrawlController对象的状态成员不是同一个

BdbFrontier对象的状态成员如下(State targetState = State.PAUSE为自身的状态,State lastReachedState = null为最后发送到CrawlController对象的状态记录)

 /** last Frontier.State reached; used to suppress duplicate notifications */
    State lastReachedState = null;
    /** Frontier.state that manager thread should seek to reach */
    volatile State targetState = State.PAUSE;

这里的State成员定义如下(在Frontier接口里面)

/**
     * Enumeration of possible target states. 
     */
    public enum State { 
        RUN,  // juggle/prioritize/emit; usual state
        EMPTY, // running/ready but no URIs queued/scheduled
        HOLD, // NOT YET USED enter a consistent, stable, checkpointable state ASAP
        PAUSE, // enter a stable state where no URIs are in-progress; unlike
               // HOLD requires all in-process URIs to complete
        FINISH  // end and cleanup; may not return to any other state after
                  // this state is requested/reached
    }

当CrawlController对象调用的方法分别如下,调用这些方法同时会改变BdbFrontier对象的状态(State targetState)

public void run() {
        requestState(State.RUN);
    }
    
    /* (non-Javadoc)
     * @see org.archive.crawler.framework.Frontier#requestState(org.archive.crawler.framework.Frontier.State)
     */
    public void requestState(State target) {
        targetState = target;
    }
    
    public void pause() {
        requestState(State.PAUSE);
    }

    public void unpause() {
        requestState(State.RUN);
    }

    public void terminate() {
        requestState(State.FINISH);
    }

 在CrawlController对象初始化时,会在一个线程里面调用自身的void managementTasks()方法,该方法不断的根据CrawlController对象状态(State targetState成员属性)来修改ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)成员的锁定属性,并且调用void reachedState(State justReached)方法(在该方法里面回调CrawlController对象的方法从而使其状态作相应的变化)

/**
     * Main loop of frontier's managerThread. Only exits when State.FINISH 
     * is requested (perhaps automatically at URI exhaustion) and reached. 
     * 
     * General strategy is to try to fill outbound queue, then process an
     * item from inbound queue, and repeat. A HOLD (to be implemented) or 
     * PAUSE puts frontier into a stable state that won't be changed
     * asynchronously by worker thread activity. 
     */
    protected void managementTasks() {
        assert Thread.currentThread() == managerThread;
        try {
            loop: while (true) {
                try {
                    State reachedState = null; 
                    switch (targetState) {
                    case EMPTY:
                        reachedState = State.EMPTY; 
                    case RUN:
                        // enable outbound takes if previously locked
                        while(outboundLock.isWriteLockedByCurrentThread()) {
                            outboundLock.writeLock().unlock();
                        }
                        if(reachedState==null) {
                            reachedState = State.RUN; 
                        }
                        reachedState(reachedState);
                        
                        Thread.sleep(1000);
                        
                        if(isEmpty()&&targetState==State.RUN) {
                            requestState(State.EMPTY); 
                        } else if (!isEmpty()&&targetState==State.EMPTY) {
                            requestState(State.RUN); 
                        }
                        break;
                    case HOLD:
                        // TODO; for now treat same as PAUSE
                    case PAUSE:
                        // pausing
                        // prevent all outbound takes
                        outboundLock.writeLock().lock();
                        // process all inbound
                        while (targetState == State.PAUSE) {
                            if (getInProcessCount()==0) {
                                reachedState(State.PAUSE);
                            }
                            
                            Thread.sleep(1000);
                        }
                        break;
                    case FINISH:
                        // prevent all outbound takes
                        outboundLock.writeLock().lock();
                        // process all inbound
                        while (getInProcessCount()>0) {
                            Thread.sleep(1000);
                        }

                        finalTasks(); 
                        // TODO: more cleanup?
                        reachedState(State.FINISH);
                        break loop;
                    }
                } catch (RuntimeException e) {
                    // log, try to pause, continue
                    logger.log(Level.SEVERE,"",e);
                    if(targetState!=State.PAUSE && targetState!=State.FINISH) {
                        requestState(State.PAUSE);
                    }
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 
        
        // try to leave in safely restartable state: 
        targetState = State.PAUSE;
        while(outboundLock.isWriteLockedByCurrentThread()) {
            outboundLock.writeLock().unlock();
        }
        //TODO: ensure all other structures are cleanly reset on restart
        
        logger.log(Level.FINE,"ending frontier mgr thread");
    }

 void reachedState(State justReached)方法如下,回调CrawlController对象的方法

/**
     * The given state has been reached; if it is a new state, generate
     * a notification to the CrawlController. 
     * 
     * TODO: evaluate making this a generic notification others can sign up for
     */
    protected void reachedState(State justReached) {
        if(justReached != lastReachedState) {
            controller.noteFrontierState(justReached);
            lastReachedState = justReached;
        }
    }

CrawlController对象的void noteFrontierState(Frontier.State reachedState) 方法(我们可以看到,参数为Frontier.State类型)

/**
     * Receive notification from the frontier, in the frontier's own 
     * manager thread, that the frontier has reached a new state. 
     * 
     * @param reachedState the state the frontier has reached
     */
    public void noteFrontierState(Frontier.State reachedState) {
        switch (reachedState) {
        case RUN: 
            LOGGER.info("Crawl running.");
            sendCrawlStateChangeEvent(State.RUNNING, CrawlStatus.RUNNING);
            break;
        case EMPTY: 
            LOGGER.info("Crawl empty.");
            if(!getRunWhileEmpty()) {
                this.sExit = CrawlStatus.FINISHED;
                beginCrawlStop();
            }
            sendCrawlStateChangeEvent(State.EMPTY, CrawlStatus.RUNNING);
            break; 
        case PAUSE:
            if (state == State.PAUSING) {
                completePause();
            }
            break;
        case FINISH:
            completeStop();
            break;
        default:
            // do nothing
        }
    }

如果发送过来的状态为Frontier.State.FINISH,则进一步调用void completeStop()方法

/**
     * Called when the last toethread exits.
     */
    protected void completeStop() {
        LOGGER.fine("Entered complete stop.");

        statisticsTracker.getSnapshot(); // ???
        
        this.reserveMemory = null;
        if (this.toePool != null) {
            this.toePool.cleanup();
        }
        this.toePool = null;

        LOGGER.fine("Finished crawl.");

        try {
            appCtx.stop(); 
        } catch (RuntimeException re) {
            LOGGER.log(Level.SEVERE,re.getMessage(),re);
        }
        
        sendCrawlStateChangeEvent(State.FINISHED, this.sExit);

        // CrawlJob needs to be sure all beans have received FINISHED signal before teardown
        this.isStopComplete = true;
        appCtx.publishEvent(new StopCompleteEvent(this)); 
    }

该方法里面修改CrawlController对象的状态,中断ToePool toePool中的线程,并且发布StopCompleteEvent事件

这里我们还可以看到this.isStopComplete = true属性的修改,表示在CrawlJob对象在执行doTeardown()方法前保证this.isStopComplete = true

这样我们就不难理解前面的synchronized boolean teardown()方法和事处理件方法void onApplicationEvent(ApplicationEvent event)里面的判断了

doTeardown()方法为同步方法,在synchronized boolean teardown()方法和事件处理方法void onApplicationEvent(ApplicationEvent event)只能同时一个调用该方法 

// ac guaranteed to be null after this method is called
    protected synchronized void doTeardown() {
        needTeardown = false;

        try {
            if (ac != null) { 
                ac.close();
            }
        } finally {
            // all this stuff should happen even in case ac.close() bugs out
            ac = null;
            
            xmlOkAt = new DateTime(0);
            
            if (currentLaunchJobLogHandler != null) {
                getJobLogger().removeHandler(currentLaunchJobLogHandler);
                currentLaunchJobLogHandler.close();
                currentLaunchJobLogHandler = null;
            }

            getJobLogger().log(Level.INFO,"Job instance discarded");
        }
    }

在上面方法里面设置同时设置needTeardown = false,则事件处理方法接收到StopCompleteEvent事件时不会再调用该方法

(思考:假如synchronized boolean teardown()方法处于阻塞,如果事件处理方法void onApplicationEvent(ApplicationEvent event)接收到StopCompleteEvent事件先执行doTeardown()方法,synchronized boolean teardown()方法还会不会再次执行该方法) 

--------------------------------------------------------------------------- 

本系列Heritrix 3.1.0 源码解析系本人原创 

转载请注明出处 博客园 刺猬的温驯 

本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/12/3073654.html

原文地址:https://www.cnblogs.com/chenying99/p/3073654.html