并发编程

1.并发工具的使用及原理【上】

时长:57min

计划:2020

1.并发工具的使用及原理【下】

时长:1h37min

计划:4/10/2020 12:48 ---13:20

3.1.11.CountDownLatch

6.1.测试代码

package com.alipay.dsg.web;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName CountDownLatchDemo
 * @Description
它的作用:等待1个或多个线程执行完以后,才进行后续的事情,可以使用它
 
* @Author wf
 * @Date 2020/4/9 12:49
 * @Version 1.0
 */
public class CountDownLatchDemo extends Thread{
    static CountDownLatch countDownLatch = new CountDownLatch(1);
//    public static void main(String[] args) throws InterruptedException {
//        CountDownLatch countDownLatch = new CountDownLatch(3);
//        new Thread(()->{
//            System.out.println("Thread1");
//            countDownLatch.countDown();//
减减操作 3-1 =2
//        }).start();
//        new Thread(()->{
//            System.out.println("Thread2");
//            countDownLatch.countDown();//
减减操作 2-1=1
//        }).start();
//        new Thread(()->{
//            System.out.println("Thread3");
//            countDownLatch.countDown();//
减减操作 1-1=0
//            //
当减到0,释放main线程
//        }).start();
//
//        countDownLatch.await(); //
阻塞main线程
//    }
   
public static void main(String[] args) {
        //阻塞1000个线程
       
for(int i =0; i<1000;i++){
            new CountDownLatchDemo().start();
        }
        countDownLatch.countDown();
    }

    @Override
    public void run() {
        try {
            countDownLatch.await();//阻塞
       
} catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ThreadName:"+Thread.currentThread().getName());
    }
}

使用场景:如使用缓存,初始化加载缓存,当所有缓存预热完成,才进行后续缓存操作。

         计数器。

6.1.1.实现原理

countdownLatch.await,countdown()

内部使用共享锁。

1.await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

2.AQS.acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

A.NonFairSync.tryAcquireShared

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

3.AQS.doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);//尝试获取锁
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                   
failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//阻塞
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.1.16.并发编程-线程池原理【上】

时长:1h6min

学习内容:

  》线程池的基本使用

  》线程池的实现原理分析

  》线程池的使用注意事项

  》Callable/Future使用及原理分析

8.1.线程池

需求:  

  我们可以参照,数据库的连接池,进行理解。

  可以理解为一个缓冲区,对象缓存到那里,可以进行获取,使用完后又归还到缓冲区。

它的核心概念:

  》复用已有资源

  》控制资源总数

Thread线程理解:

  它是一个异步处理的api.当我们要使用线程时,使用方式如下:

new Thread().start();

  这里存在的问题,是对象的创建,如果需要创建大量的对象,通过new的方式,

  对资源消耗较大,造成线程资源【线程数】不可控。当线程数超过对应的资源的限制,就会导致大量的上下文切换

比如,8核心的cpu,同一时刻只能运行8个线程,如果创建80个线程,会导致线程不时地上下文切换,反而降低程序执行性能。

  还有一个问题,可能需要频繁创建和销毁线程对象,浪费资源,影响性能。

  针对,线程存在两个问题:

  》线程资源【线程数】不可控

  》需要频繁创建和销毁线程对象

  

  基于数据库连接池的设计,提出线程"池化"的改进。即产生线程池技术

8.1.1.线程池的优势

  》限流----线程数量可控---通过参数设置,允许创建的最大线程数

  》 降低频繁创建和销毁线程对象的开销

  》对于任务的响应速度更快----复用已有线程

8.1.2.java中提供的线程池

8.1.2.1.创建线程池的工厂类Executors

常用如下:

Executors.newSingleThreadExecutor()//只有一个核心线程对象的线程池
Executors.newFixedThreadPool(3);  //固定线程数的线程池
Executors.newCachedThreadPool//可以实现动态调整线程数的线程池,可以无限创建线程对象,每一空闲线程,会在60s之后回收
Executors.newScheduledThreadPool(3)//处理定时任务
Executors.newWorkStealingPool();//fork/join线程池

区分不同场景,应该使用哪一种池?

 1.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2.newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3.newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
A.ThreadPoolExecutor

keepAliveTim是怎么监控线程,进行回收的?

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                          int maximumPoolSize,//最大线程数
                          long keepAliveTime,//超时时间
                          TimeUnit unit,    //超时时间单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler) {//拒绝策略
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

8.1.3.线程池的实现原理

8.1.3.1.使用示例如下

package com.wf.concurrent;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName ThreadPoolDemo
 * @Description TODO
 * @Author wf
 * @Date 2020/4/28 12:22
 * @Version 1.0
 */
public class ThreadPoolDemo implements Runnable{
    static ExecutorService service = Executors.newFixedThreadPool(3);
    public static void main(String[] args) {
        for(int i=0; i<100; i++){
            service.execute(new ThreadPoolDemo());
        }
        service.shutdown();
        //正常情况下,循环100次,会创建100个线程对象,可以发现这里只有3个线程实例
        //一次性,只有3个线程进行任务处理,当处理完成后,线程实例,又归还到池中
        //这里有97个任务,未执行,会先放置到任务队列里面
    }

    @Override
    public void run() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("当前线程的名称:"+Thread.currentThread().getName());
    }
}

8.1.3.2.原理分析

分析入口:execute方法,如下所示:

1.ThreadPoolExecutor#execute方法
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

int c = ctl.get();//默认线程池中线程数量为0,11100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {//0小于核心数3【传参】
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//32位

private static int ctlOf(int rs, int wc) { return rs | wc; }

private static final int RUNNING    = -1 << COUNT_BITS;

private static final int COUNT_BITS = Integer.SIZE - 3;//29

public static final int SIZE = 32;

计算:

Rs=-1<<(32-3)=-1<<29= 11100000000000000000000000000000

wc = 0

 

Rs | wc = 11100000000000000000000000000000

 ctl表示含义

高3位代表当前线程状态,低29位代表当前线程池的线程数量。

 

默认情况:线程状态为Running,线程数量为0.

 privatestaticint workerCountOf(int c) { return c & CAPACITY; }

计算:

C=11100000000000000000000000000000

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

capacity= 1<<29 -1 =00100000000000000000000000000000 -1= 00011111111111111111111111111111

C & capacity=11100000000000000000000000000000 & 00011111111111111111111111111111=00000000000000000000000000000000=0

 A.方法逻辑结构图

 

   【1】addWorker方法

 privateboolean addWorker(Runnable firstTask, boolean core) {

    retry:
    for (;;) { //自旋,增加工作线程数
        int c = ctl.get();//11100000000000000000000000000000
        int rs = runStateOf(c);    //取得运行状态,running=c

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&      //rs>=0
            ! (rs == SHUTDOWN &&
               firstTask == null && //提交任务为null
               ! workQueue.isEmpty()))     //工作队列非空
            return false;

        for (;;) {//内层自旋
            int wc = workerCountOf(c);//获取工作线程数,首次为0
            if (wc >= CAPACITY ||   //数据数超限
                wc >= (core ? corePoolSize : maximumPoolSize))//core传参true/false
                return false;
            if (compareAndIncrementWorkerCount(c))//线程数加1
                break retry;//跳出方法第一行 
            c = ctl.get();  // Re-read ctl 第二次进入,数量为1
            if (runStateOf(c) != rs)       //非运行状态,下一次迭代
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//把任务放到Worker中
        final Thread t = w.thread; //Worker构造器中new Thread,状态为-1
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;//重入锁
            mainLock.lock();//加锁
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN || //rs<0,表示running状态
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size(); //一个Worker对应一个线程
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;    //Worker保存成功标识
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();   //启动线程,执行runWorker方法
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);//添加失败,创建线程失败,回滚
    }
    return workerStarted;
}

        

  private static int runStateOf(int c) { return c & ~CAPACITY; }

计算:

C=11100000000000000000000000000000

~capacity=~00011111111111111111111111111111=11100000000000000000000000000000

C & ~capacity=11100000000000000000000000000000

 private static final int SHUTDOWN = 0 << COUNT_BITS;

计算:

0<<29=0

 privateboolean compareAndIncrementWorkerCount(int expect) {

   return ctl.compareAndSet(expect, expect + 1);

} //原子操作cas,加1

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);  //移除worker
        decrementWorkerCount();//数量减1
        tryTerminate();     //尝试终止
    } finally {
        mainLock.unlock();
    }
}
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
2.Worker类:
private final class Worker
    extends AbstractQueuedSynchronizer    //继承AQS,为何
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {//获得锁,状态改为1
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);//释放锁,状态改为0
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

为什么Worker要自己去实现AQS,而不使用重入锁?

 

在new Worker时,会设置state默认为-1.

       当要释放锁时,状态改为state=0.

       而获得锁时,状态改为state=1.

因为Worker实现Runnable接口,是一个线程任务。所以当,thread执行run方法时,实际上会执行Worker的run方法,内部调用runWorker方法。

A.runworker方法:
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;//取得第一个任务
    w.firstTask = null;
    w.unlock();//释放锁,允许中断,调用aqs.release(1),设置状态为1
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();//获得锁,防止在shutdown时不终止正在执行的任务。
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//核心逻辑,task是通过execute传递过来的任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

注意

    wc.lock()加锁,一方面是为了支持并发。另一方面,是为了在shutdown时不去终止正在运行的线程。

    我们在使用线程池时,一次执行多个任务,等所有的任务执行完成后,才进行中断线程。

即执行service.shutdown().

    可以由于任务时间较长,某几个任务还在执行,程序就已经执行到shutdown.

    如果没有wc.lock的作用,就会终止正在执行或还未执行的任务中断掉。这是不合理的。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);//移除已经执行的任务
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}
B.shutdown方法
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//关闭线程安全检查
    advanceRunState(SHUTDOWN);//更新线程状态为shutdown
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

private void checkShutdownAccess() {
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkPermission(shutdownPerm);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                security.checkAccess(w.thread);
        } finally {
            mainLock.unlock();
        }
    }
}
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;//更新线程状态为shutdown

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {//尝试获得锁
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

销毁线程,有两种方式:

shutdownNow()-------强制关闭。

shutdown()----友善关闭。

3.1.17.线程池的实现原理【下】

时长:1h10min

计划:2020/4/28 22:42-23:50

8.2.线程池原理分析【二】

8.2.1.提交任务创建线程执行任务

如果有:corePoolSize=3,maxPoolSize=5.【线程池刚启动,还未被预热】

就会创建:3个核心线程,2个最大线程0

创建线程,首先会创建Worker实例。

创建完成之后,会立马执行我们传入的任务。

如果已经预热。工作线程数小于核心线程数,这个条件不能再满足。这时就需要,把

任务把放到阻塞队列中,然后去从队列中取任务。然后执行。

8.2.1.1.runWorker再分析
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {//核心逻辑
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
 
1.从阻塞队列获取任务
  while (task != null || (task = getTask()) != null) {//很关键

    可以获得当前传入的任务,也可以从阻塞队列中去获取。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //状态不为running,或队列为空,均结束方法
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();//线程数减1
            return null;    //null会终止runWorker循环,回收线程
        }

        int wc = workerCountOf(c);

        //允许回收核心线程标识【可以进行传参设置】,或工作线程大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {//核心逻辑
            Runnable r = timed ?//timed为true,poll超时处理
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();//不允许超时,take获取
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
2.线程什么时候被回收?

    在Thread的应用中,当run方法执行结束后,线程会自动销毁。

而这里run方法,内部调用runWorker()方法。

    runWorker方法,内部有一个while循环,需要让这个循环结束,才能结束方法。

8.3.线程池的参数

8.3.1.相关问题思考

1.不建议使用Executors.newXXX创建线程池,是为什么?【阿里开发手册中说明】

原因:

    这种方式,使用默认构造,进行创建。它的参数也是使用默认值,这种不清楚设置参数的含义下,使用这种方式,是存在风险的。

    因为线程的使用,会影响cpu的资源及性能的。

2.线程池大小的如何设置【常见面试题】

    取决于硬件环境和软件环境。

    硬件环境:主要cpu核心数

    软件环境:线程的执行情况---

io密集型【线程用于io远程通信,设置更多线程数】,可以设置为cpu核心的2倍。

cpu密集型【线程主要用于计算,执行要快,要求cpu利用率高,以cpu核心数为准】,设置最大线程数=cpu核心数+1

    在并发编程,书中给出公式:

    (线程等待时间 + 线程cpu时间)/cpu时间* cpu核心数。

3.线程池的初始化

    线程池是可以进行预热的。使用方式如下:

4.线程池的关闭

有两种方式:

service.shutdown();
service.shutdownNow();//立马终止

8.4.Callable/FutureTask原理

8.4.1.submit与execute的区别

submit提交任务,可以有两种类型:

Callable类型和Runnable类型。

submit可以实现一个带返回值的线程。

对于异常的处理,execute会抛出异常,而execute不会抛出异常,但会在get时拿到异常。

8.4.1.1.使用示例
 
package com.wf.concurrent;

import javax.sound.midi.Soundbank;
import java.util.concurrent.*;

/**
 * 类名称:FutureDemo
 * 类描述:TODO
 * 创建人:Administrator
 * 创建时间:2020/4/29 0:09
 * Version 1.0
 */
public class FutureDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("execute call");
        Thread.sleep(5000);
        return "Hello call";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureDemo futureDemo = new FutureDemo();
        FutureTask futureTask = new FutureTask(futureDemo);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());//阻塞获取结果
        
        //线程池的实现方式
        ExecutorService  executorService = Executors.newFixedThreadPool(3);
        Future<String> futureRes = executorService.submit(new FutureDemo());
        System.out.println(futureRes.get());
        

        executorService.shutdown();

    }

}
8.4.2.1.猜想实现原理

1.阻塞:LockSupport.unpark

 2.使用状态机制【这里使用这种方式】

8.4.2.2.源码分析

1.FutureTask类图

2.FutureTask#run方法

FutureTask是一个Runnable实现类,它一定存在run方法,代码如下:

public void run() {
    if (state != NEW ||//设置失败
        !UNSAFE.compareAndSwapObject(this, runnerOffset, 
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;    //通过构造器传过来的task
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();    //调用call
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);    //设置call方法返回结果
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));//runner
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}
3.java.util.concurrent.FutureTask#get()
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)    //当前任务未完成,
        s = awaitDone(false, 0L);//阻塞
    return report(s);
}

private int awaitDone(boolean timed, long nanos)//传参false,0L
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {//自旋
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)    //第一次进入
            q = new WaitNode();    //创建阻塞队列
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {//等待超时时间处理
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);//阻塞
    }
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;    //取得返回值
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
原文地址:https://www.cnblogs.com/wfdespace/p/12659104.html