13-JUC(下)

1. 同步器

1.1 CountDownLatch

  • 当一个或多个线程调用 await() 时,这些线程会阻塞;
  • 其它线程调用 countDown() 会将计数器减 1(调用该方法的线程不会阻塞);
  • 当计数器的值变为 0 时(减少计数),因 await() 阻塞的线程会被唤醒,继续执行。
// 案例:秦灭六国
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // [减少计数] 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒
        CountDownLatch cd = new CountDownLatch(6);

        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "被灭了 ...");
                cd.countDown();
            }, String.valueOf(i)).start();
        }

        cd.await(); // 等待计数器归零,然后再向下执行。
        System.out.println(Thread.currentThread().getName() + "一统天下 ...");
    }
}

1.2 CyclicBarrier

CyclicBarrier 的字面意思是可循环(Cyclic) 使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫“同步点”)时被阻塞(线程进入屏障是通过 await() 方法),直到最后一个线程到达屏障时,屏障才会开门。只有等屏障开了,所有被屏障拦截的线程才会继续干活。

示例代码:

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("七龙珠集齐!召唤神龙!");
        });

        for (int i = 1; i <= 7; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"星龙珠被收集");
                try {
                    cyclicBarrier.await(); // 线程阻塞
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"星龙珠合成...");
            }, String.valueOf(i)).start();
        }
    }
}

打印结果:

2星龙珠被收集
5星龙珠被收集
4星龙珠被收集
1星龙珠被收集
3星龙珠被收集
7星龙珠被收集
6星龙珠被收集
七龙珠集齐!召唤神龙!
6星龙珠合成...
4星龙珠合成...
5星龙珠合成...
2星龙珠合成...
7星龙珠合成...
3星龙珠合成...
1星龙珠合成...

1.3 Semaphore

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。在信号量上我们定义 2 种操作:

  • acquire(获取):当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减 1),要么一直等下去,直到有线程释放信号量或超时。
  • release(释放):实际上会将信号量的值加 1,然后唤醒等待的线程。

示例代码:

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 3 个停车位
        Semaphore sp = new Semaphore(3);
        // 6 辆汽车
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    sp.acquire();
                    System.out.println(Thread.currentThread().getName() + "号车驶入停车位");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + "号车驶出停车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    sp.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}

打印结果:

1号车驶入停车位
3号车驶入停车位
2号车驶入停车位
1号车驶出停车位
3号车驶出停车位
4号车驶入停车位
2号车驶出停车位
6号车驶入停车位
5号车驶入停车位
6号车驶出停车位
5号车驶出停车位
4号车驶出停车位

2. 读写锁

class MyCache {
    private volatile Map<String, Object> map = new HashMap<>();
    
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
    public void put(String key, Object value) {
        lock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始写入 ...");
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写入完毕 ...");
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public Object get(String key) {
        lock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始读入 ...");
            Object value = map.get(key);
            System.out.println(Thread.currentThread().getName() + "读入完毕 ...");
            return value;
        } finally {
            lock.readLock().unlock();
        }
        
    }
}

3. 阻塞队列

3.1 概念

  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。

3.2 用处

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。

为什么需要 BlockingQueue?

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。

在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

3.3 架构&种类

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:同步队列,不存储元素的阻塞队列,也即单个元素的队列
  • LinkedTransferQueue:由链表组成的无界阻塞队列
  • LinkedBlockingDeque:由链表组成的双向阻塞队列

3.4 示例代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 抛出异常
        // throwExTest();
        // 2. 特殊值
        // retBoolTest();
        // 3. 阻塞
        // blockingTest();
        // 4. 超时退出
        timeoutTest();
    }

    private static void timeoutTest() throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        System.out.println(queue.offer("a")); // true
        System.out.println(queue.offer("b")); // true
        System.out.println(queue.offer("c")); // true
        System.out.println(queue.offer("d", 3, TimeUnit.SECONDS)); // false
        System.out.println(queue.poll()); // a
        System.out.println(queue.poll()); // b
        System.out.println(queue.poll()); // c
        System.out.println(queue.poll(4, TimeUnit.SECONDS)); // null
    }

    private static void blockingTest() throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.put("a");
        queue.put("b");
        queue.put("c");
        // queue.put("d"); -> blocking ...
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        // System.out.println(queue.take()); -> blocking ...
    }


    private static void retBoolTest() {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        System.out.println(queue.offer("a")); // true
        System.out.println(queue.offer("b")); // true
        System.out.println(queue.offer("c")); // true
        System.out.println(queue.peek()); // a
        System.out.println(queue.offer("d")); // false
        System.out.println(queue.poll()); // a
        System.out.println(queue.poll()); // b
        System.out.println(queue.poll()); // c
        System.out.println(queue.poll()); // null
    }

    private static void throwExTest() {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        // 1. 添加
        System.out.println(queue.add("a")); // true
        System.out.println(queue.add("b")); // true
        System.out.println(queue.add("c")); // true
        System.out.println(queue.element()); // a
        // System.out.println(queue.add("d")); -> IllegalStateException: Queue full

        // 2. 移除
        System.out.println(queue.remove());
        System.out.println(queue.remove());
        System.out.println(queue.remove());
        // System.out.println(queue.remove()); -> NoSuchElementException
    }
}

4. AQS

5. 线程池

5.1 引入

10 年前单核 CPU 电脑,假的多线程,CPU 需要来回切换;现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。

线程池的优势:线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果任务数量超过了线程最大数量,超出数量的任务排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

线程池的主要特点:线程复用;控制最大并发数;管理线程

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。

5.2 使用

5.2.1 架构说明

Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类。

5.2.2 有关 API

  • Executors.newSingleThreadExecutor():一个任务一个任务的执行,一池一线程。
  • Executors.newFixedThreadPool(N):执行长期任务性能好,创建一个有 N 个固定的线程的线程池。
  • Executors.newCachedThreadPool():执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强。

示例代码:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3); // 固定个数
        ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); // 单个
        ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 可扩展
        for (int i = 1; i <= 500; i++) {
            threadPool3.execute(()->{
                System.out.println(Thread.currentThread().getName() + "处理任务");
            });
        }
        threadPool.shutdown();
        threadPool2.shutdown();
        threadPool3.shutdown();
    }
}

5.2.3 底层源码

ThreadPoolExecutor

public ThreadPoolExecutor(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {

    if (corePoolSize < 0 || maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize:线程池中的常驻核心线程数(惰性加载)
  2. maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须 >= 1。
  3. keepAliveTime:多余的空闲线程的存活时间。当池中线程数量超过 corePoolSize 并且线程空闲时间达到 keepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止。
  4. unit:keepAliveTime 的单位
  5. workQueue:任务队列,被提交但尚未被执行的任务。
  6. threadFactory:表示生成线程池中工作线程的线程工厂。用于创建线程,一般默认的即可。
  7. handler:拒绝策略。表示当队列满了,并且工作线程大于等于线程池的 maximumPoolSize 时如何来拒绝请求执行的 Runnable 的策略。

在工作中单一的/固定数的/可变的 3 种创建线程池的方法哪个用的多?注意,哪个都不用!

5.3 工作原理

1. 在创建了线程池后,线程池中的线程数为 0。

2. 当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:

  • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
  • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
  • 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个新来的任务
  • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

4. 当一个线程无事可做超过一定的时间(keepAliveTime) 时,线程会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以,当线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

5.4 拒绝策略

等待队列已经排满了,再也塞不下新任务了。同时,线程池中的 max 线程也达到了,无法继续为新任务服务。这个时候我们就需要拒绝策略机制合理的处理这个问题。

示例代码:20 个任务

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        try {
            for (int i = 0; i < 20; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

JDK内置的拒绝策略(以下均实现了 RejectedExecutionHandle<I>

  • AbortPolicy(默认):直接抛出 RejectedExecutionException 阻止系统正常运行。
    pool-1-thread-1
    pool-1-thread-4
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-2
    pool-1-thread-4
    pool-1-thread-5
    java.util.concurrent.RejectedExecutionException: Task cn.edu.nuist.threadpool
        .ThreadPoolTest$$Lambda$1/821270929@85ede7b rejected from java.util.concurrent
        .ThreadPoolExecutor@5674cd4d
        [Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
    
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-2
    main <- 将某些任务回退到调用者
    pool-1-thread-3
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-5
    main <- 将某些任务回退到调用者
    pool-1-thread-4
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-4
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-1
    pool-1-thread-4
    
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
    pool-1-thread-1
    pool-1-thread-4
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-4
    pool-1-thread-5
    
  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-4
    pool-1-thread-4
    pool-1-thread-4
    pool-1-thread-4
    pool-1-thread-5
    pool-1-thread-4
    pool-1-thread-3
    pool-1-thread-4
    pool-1-thread-5
    pool-1-thread-3
    

5.5 如何设计线程池

5.5.1 回答思路

摘自公众号:yes的练级攻略

这种设计类问题还是一样,先说下理解,表明你是知道这个东西的用处和原理的,然后开始 BB。基本上就是按照现有的设计来说,再添加一些个人见解。

线程池讲白了就是存储线程的一个容器,池内保存之前建立过的线程来重复执行任务,减少创建和销毁线程的开销,提高任务的响应速度,并便于线程的管理。

我个人觉得如果要设计一个线程池的话得考虑池内工作线程的管理、任务编排执行、线程池超负荷处理方案、监控等方面。

要将初始化线程数、核心线程数、最大线程池都暴露出来可配置,包括超过核心线程数的线程空闲消亡相关配置。

然后任务的存储结构也得可配置,可以是无界队列也可以是有界队列,也可以根据配置,分多个队列来分配不同优先级的任务,也可以采用 stealing 的机制来提高线程的利用率。

再提供配置来表明此线程池是 IO 密集型还是 CPU 密集型来改变任务的执行策略。

超负荷的方案可以有多种,包括丢弃任务、拒绝任务并抛出异常、丢弃最旧的任务或自定义等等。

至于监控的话,线程池设计要埋好点,暴露出用于监控的接口,如已处理任务数、待处理任务数、正在运行的线程数、拒绝的任务数等等信息。

我觉得基本上这样答就差不多了,等着面试官的追问就好。注意不需要跟面试官解释什么叫核心线程数之类的,都懂的没必要。

当然这种开放型问题还是仁者见仁智者见智,我这个不是标准答案,仅供参考。建议把线程池相关的关键字都要说出来,表面你对线程池的内部原理的理解是透彻的。

5.5.2 IO/CPU 密集型

https://blog.csdn.net/youanyyou/article/details/78990156

  • CPU 密集型(CPU-bound)
    • CPU 密集型也叫计算密集型,指的是系统的硬盘、内存性能相对 CPU 要好很多,此时,系统运作大部分的状况是 CPU Loading 100%,CPU 要读/写 I/O(硬盘/内存),I/O 在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU Loading 很高。
    • 在多重程序系统中,大部份时间用来做计算、逻辑判断等 CPU 动作的程序称之 CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于 CPU bound 的程序。CPU bound 的程序一般而言 CPU 占用率相当高。这可能是因为任务本身不太需要访问 I/O 设备,也可能是因为程序是多线程实现因此屏蔽掉了等待 I/O 的时间。
  • IO 密集型(I/O bound)
    • IO 密集型指的是系统的 CPU 性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是 CPU 在等 I/O (硬盘/内存) 的读/写操作,此时 CPU Loading 并不高。
    • I/O bound 的程序一般在达到性能极限时,CPU 占用率仍然较低。这可能是因为任务本身需要大量 I/O 操作,而 pipeline 做得不是很好,没有充分利用处理器能力。

CPU密集型 vs IO密集型

我们可以把任务分为计算密集型和 IO 密集型。

计算密集型任务的特点是要进行大量的计算,消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等等,全靠 CPU 的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,所以,要最高效地利用 CPU,计算密集型任务同时进行的数量应当等于 CPU 的核心数。计算密集型任务由于主要消耗 CPU 资源,因此,代码运行效率至关重要。Python 这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用 C 语言编写。

涉及到网络、磁盘 IO 的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 IO 操作完成(因为 IO 的速度远远低于 CPU 和内存的速度)。对于 IO 密集型任务,任务越多,CPU 效率越高,但也有一个限度。常见的大部分任务都是 IO 密集型任务,比如 Web 应用。IO 密集型任务执行期间,99% 的时间都花在 IO 上,花在 CPU 上的时间很少,因此,用运行速度极快的 C 语言替换用 Python 这样运行速度极低的脚本语言,完全无法提升运行效率。对于 IO 密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C 语言最差。

总之,计算密集型程序适合 C 语言多线程,I/O 密集型适合脚本语言开发的多线程。

6. 分支合并框架

Fork:把一个复杂任务进行分拆,大事化小;Join:把分拆任务的结果进行合并。

  • ForkJoinPool:分支合并池
  • ForkJoinTask:类比“FutureTask”
  • RecursiveTask:递归任务,继承后可以实现递归调用的任务

public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask = new MyTask(0, 100);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
        System.out.println(forkJoinTask.get());
    }
}

class MyTask extends RecursiveTask<Integer> {
    public static final int CALCULATE_RANGE = 10;
    private int begin;
    private  int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if ((end - begin) <= CALCULATE_RANGE) {
            for (int i = begin; i <= end; i++) {
                result += i;
            }
        } else {
            int mid = (begin + end) >> 1;
            MyTask task1 = new MyTask(begin, mid);
            MyTask task2 = new MyTask(mid+1, end);
            task1.fork();
            task2.fork();
            result = task1.join() + task2.join();
        }
        return result;
    }
}

7. 异步回调

public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {

        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread().getName()+"	 runAsync");
        });
        completableFuture1.get(); // 阻塞直至获取结果

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"	 supplyAsync");
            int i = 10/0;
            return 1024;
        });

        // 异步回调
        completableFuture2.whenComplete((correctRet, exceptionInfo) -> {
            System.out.println("correctRet=" + correctRet);
            System.out.println("exceptionInfo=" + exceptionInfo);
        }).exceptionally(e -> {
            System.out.println("exception:" + e.getMessage());
            return 1101;
        }).get();
    }
}
原文地址:https://www.cnblogs.com/liujiaqi1101/p/14793222.html