深入并发之(五)线程池中run方法解析

线程池中run方法解析

概览

上篇我们说到线程池中从队列中去任务的地方时在Worker类中的方法,这篇我们就来分析一下,这个方法。

public  void  run() {
    runWorker(this);
}
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果线程池是stop状态,确保线程被中断,如果线程池不是,确保线程池没有被中断。
                //当我们清空中断标志时,第二种情况需要需要有一个recheck来应对shutdownNow方法。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这部分代码实际是十分容易理解,但是,其中关于中断的处理是我们需要注意的,也就是有注释的那一段,我们需要详细理解一下。

理解这部分,首先我们要对线程的中断有详细的了解。

线程的中断

首先我们要了解stop方法已经是一个过时的方法了,我们不应该再使用这种方式来中断线程。

然后,我们需要了解三个方法,这三个方法看起来像是英语词汇辨析,(⊙o⊙)…

  • isInterrupted方法
  • interrupted方法

这里我们先来分析前两个方法,这两个方法都是用来验证线程是否被中断的,那么,这两个方法有什么区别呢?

首先isInterrupted方法是Thread类的普通方法,会返回调用方法的类的状态,而interrupted方法是Thread的静态方法,返回的是调用方法的线程的状态。

另外,还有一个区别就是interrupted方法会清除线程的中断状态,也就是说,如果线程已经是中断状态,那么第一次调用,返回值为真,第二次调用,返回值就会为假。

还用一个方法interrupt是用来中断线程的,也就是将中断标志设置为true的方法。

了解了这三个方法,我们就会注意到,所谓的中断只是将中断标志设置一下,并没有真正的中断线程的运行,所以,一般来说,我们需要自己来检查线程的中断状态,并设计如何应对中断,也就是如何真正的结束线程。

需要注意的是,方法sleepwait以及join 会对中断标志有所处理,当线程中断标志为true时,将会抛出异常。这也并不难理解,当我们的线程进入这三种状态的时候,除了等待我们并没有任何方式让线程跳出,那么中断线程就是唯一的后悔药。

好了,已经了解了以上知识,我们可以回归正题了。

if ((runStateAtLeast(ctl.get(), STOP) ||
	(Thread.interrupted() &&
     runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    wt.interrupt();

由于,这里实际是处理shutdownNow方法,所以,我们先插入一段关于线程池关闭方法的分析。

我们知道,关闭线程池的方法有两个,一个是shutdown,一个是shutdownNow

shutdown方法会告诉线程池拒绝接受新的任务,但是,已经开始执行的以及进入队列中的任务将会完成执行。

shutdownNow方法也同样会告诉线程池拒绝接受新的任务,但是不同的是,他会试图将已经开始的任务以及队列中的任务取消。这种取消是通过中断线程来实现的,也就是说,如果我们的任务中没有针对线程中断作处理,那么,在实际的使用体验上,shutdownNowshutdown是相同的

当我们调用shutdownNow方法时,线程池将会变为stop状态。那么运行过程中,将会执行到上面那段代码。

我们需要确保当线程池状态是stop时,线程应该是中断状态的,同样的,如果线程池状态不是,那么线程的状态也不应该是中断的。

对于第二种情况,我们先清空中断状态,然后recheck,以防中间调用了shutdownNow方法,这样来确保满足上面的情况。

取任务部分代码

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先我们要明确,当我们需要到线程池中取任务的时候,当前Worker中的任务肯定是null

那么,我们接着来看这段代码,首先还是老样子,需要检查线程池的状态。如果状态大于shutdown,那么我就需要区别对待,如果是shutdown状态,根据前面我们谈到的线程池的关闭,如果队列为空了,那么当前线程实际就可以减掉了,如果状态已经大于stop,那么肯定会减掉。所以这里我们看到有一个decrementWorkerCount方法,实际上是CAS方法。

这里解释一下allowCoreThreadTimeOut变量的使用,这个变量默认值是false,也就是说无论你是否设置超时时间,核心线程是不会过期的;如果这个变量设置为true,那么核心线程也会因为空闲超时。这里的是否超时,实际是受keepAliveTime控制。

这样,我们就不难理解timed变量了,这个变量实际上是用来判定当前线程是否会超时的一个变量。

如果获取任务成功,那么就可以直接返回,否则,我们将会再次进入循环。

原文地址:https://www.cnblogs.com/qmlingxin/p/9763556.html