JUC并发编程

JDK并发包

1.JUC简介

在jdk1.5提供了java.util.concurrent包,简称JUC,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量任务框架;还提供了设计用于多线程上下文中的 Collection 实现等;

2.概述

从JDK官方文档可以看到JUC主要包括3部分组成

  • java.util.concurrent:包下主要是包括并发相关的接口与类,阻塞队列、线程池等,里面包含 59 个类或者接口

  • java.util.concurrent.atomic: 该包下主要包括原子性操作相关的类,比如常用的AtomicInteger、AtomicBoolean、AtomicIntegerArry等,里面包含18个类或者接口

  • java.util.concurrent.locks:该包下主要包括同步控制的锁相关的类,比如ReentrantLock、ReadWriteLock、ReentranReadWriteLock等,里面包含12个类或者接口

image-20210624114532631

3. 同步控制(java.util.concurrent.locks)

同步控制是并发程序必不可少的重要手段,之前学习的synchronized关键字就是一种最简单的控制方法,他决定了一个线程是否可以访问临界区资源。同时object.wait()和object.notify()方法起到了线程的等待和通知的作用。

下面具体聊聊JUC工具包的使用,首先进入java.util.concurrent.locks包下,主要包括3个接口和9个类

image-20210624122325131

1. Lock 接口

Lock锁接口里面定义了6个方法,3个类实现了Lock接口,分别是可重入锁ReentrantLock、读锁ReadLock、写锁writeLock

image-20210624140034337

image-20210624140315249

1. 可重入锁ReentrantLock

jdk1.5中新增了ReetrantLock类,重入锁ReentrantLock可以完全替代synchronized关键字,并且在扩展功能上也更加强大,而且在使用上也更加灵活。

重入锁使用java.util.concurrent.locks.ReentrantLock类来实现,ReentrantLock类的构造方法如下,ReentrantLock默认是非公平锁。

image-20210624141447496

从源码中我们也可以发现ReentrantLockb本身没有任何代码逻辑,实现都在其内部类Sync中。那么NonfairSync和FariSync又是什么呢?

image-20210624144700920

从源代码中发现类ReentrantLock中有3个内部类,sync、NonfairSync和FariSync,而NonfairSync和FairSync有继承了抽象类Sync,分别对应非公平锁和公平锁。

image-20210624145231473

image-20210624152849793 image-20210624152916949

2.锁实现的基本原理:

Sync的父类AbstractQueuedSynchrnized经常被称作队列同步器(AQS),该类的父类是AbstractOwnableSynchronizer。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

  • 需要一个state变量,标记锁的状态。state变量至少有两个值:0、1。对state变量的操作,需要确保线程安全,也就是会用到CAS

  • 需要记录当前是哪个线程持有锁

  • 需要底层支持对一个线程进行阻塞或唤醒操作

  • 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS

3. 中断响应

对于synchronized来说,如果一个线程在等待锁,那么结果就只有两种情况,要么获得锁继续执行,要么继续等待。而使用重入锁,则提供了另外一种可能,那就是线程可以被中断。如果一个线程在等待锁,那么它依然可以接收到一个通知,被告知无需再等待,可以停止工作了,这种情况对于处理死锁有一定帮助的。提供了lockInterruptibly()方法来实现。

image-20210625154308286

2. Condition接口

如果理解了object.wait()和object.notify()方法的话,那么也就很容易理解condition对象了。wait、notify是和关键字synchronized合作使用的,而condition是和重入锁使用的。condition主要方法如下

image-20210624150712348

和Object.wait()和notify()方法一样,当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await调用后,这个线程会释放这把锁。同样调用Condition.signal()方法时,也需要线程获取到重入锁。

public class ConditionDemo {
​
    public static void main(String[] args) {
​
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
​
        new Thread(()->{
            lock.lock();
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + "-->开始执行");
​
                condition.await();//让出线程,等待主线程调用signalAll()唤醒
                System.out.println(Thread.currentThread().getName() + "-->被唤醒又开始执行");
​
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        },"子线程").start();
​
        //主线程
        System.out.println(Thread.currentThread().getName() + "-->开始执行");
        try {
            TimeUnit.SECONDS.sleep(2);
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "-->子线程执行await后,主线程执行");
                condition.signalAll();
​
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
​
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.ReadWriteLock接口

ReadWriteLock是JDK5中提供的读写分离锁,读写分离锁可以有效的帮助减少锁竞争,以提升系统性能。ReadWriteLock接口中只有如下两个方法,分别是读锁readLock和writeLock写锁。

image-20210624150745352

读写锁允许多个线程同时读,但是考虑到数据的完整性,写写操作和读写操作之间依然是需要互相等待和持有锁的。总的来说,读写锁的访问约束如下

 
非阻塞 阻塞
阻塞 阻塞
    • 读-读不互斥:读读之间不阻塞

    • 读-写互斥:读阻塞写,写也会阻塞读

    • 写-写互斥:写写阻塞

public class ReadWriteLockDemo {
​
    /**
     * 测试读写锁
     */
    public static void main(String[] args) {
        ReadWriteNumber readWriteNumber = new ReadWriteNumber();
  //读线程     
        for (int i = 0; i < 10; i++) {
            new  Thread(()->{
                readWriteNumber.setNumber(new Random().nextInt(100));
            },"写线程"+String.valueOf(i)).start();
        }
//写线程
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                readWriteNumber.getNumber();
            },"读线程"+ String.valueOf(i)).start();
        }
​
    }
}
​
class ReadWriteNumber{
    private volatile  int number = 0;
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
​
    //写方法
    public void setNumber(int number)
    {
        readWriteLock.writeLock().lock();
        try {
            TimeUnit.SECONDS.sleep(1);
            this.number = number;
            System.out.println(Thread.currentThread().getName() + "-->写入后的数字是: " + this.number);
        } catch (Exception e)
        {
            e.printStackTrace();
        }   finally {
            readWriteLock.writeLock().unlock();
        }
    }
​
    //读方法
    public void getNumber()
    {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "-->读取到的数字是:" +  number);
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}

4. 原子操作(java.util.concurrent.atomic)

 

5.并发工具包(java.util.concurrent)

1. Semaphore:允许多个线程同时访问

无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量semaphore却可以指定多个线程,同时访问某一个资源。

image-20210624174047354 image-20210624174121780

/**
* 模拟高并发抢车位,模拟车库里面有3个车位,现在有6个车子抢占车位.同一时刻最多只能有3辆车同时抢到车位,一个车出停车场后,另外一辆车才能进来
*
*/
public class SemaphoreDemo1 {
​
    public static void main(String[] args) {
​
        //只允许有3个线程能同时操作统一资源
        Semaphore semaphore = new Semaphore(3);
​
        //创建一个6条线程的线程池
​
        ThreadPoolExecutor threadPool =   new ThreadPoolExecutor(3,6,3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
​
        for (int i = 1; i <= 6; i++) {
            
            threadPool.execute(() -> {
                try {
                    int ParkTime = new Random().nextInt(10);
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到了车位");
                    TimeUnit.SECONDS.sleep(ParkTime);
                    System.out.println(Thread.currentThread().getName() + "停车" + ParkTime + "秒后离开了停车场");
​
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        threadPool.shutdown();
    }
}

image-20210624173756975

2. CountDownLatch:倒计时器

    • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。

    • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

      主要方法如下:

      image-20210624164344461

      最常见的场景就是模拟火箭发射,在火箭发射前需要对各个设备、仪器的检查,只有各个检查都没有问题以后才能发射

      public class CountDownLacthDemo implements Runnable{
      ​
          static CountDownLatch downLatch = new CountDownLatch(10);
      ​
          @Override
          public void run() {
              //模拟检测任务
              try {
                  Thread.sleep(new Random().nextInt(10)*1000);
                  System.out.println(Thread.currentThread().getName() + "检查完毕");
      ​
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }finally {
                  downLatch.countDown();
              }
          }
      ​
          public static void main(String[] args) throws InterruptedException {
              ExecutorService exec = Executors.newFixedThreadPool(10);
      ​
              CountDownLacthDemo demo = new CountDownLacthDemo();
      ​
              for (int i = 0; i < 10; i++) {
                  exec.submit(demo);
              }
              //等到检查
              downLatch.await();
              System.out.println("发射");
              exec.shutdown();//关闭线程池
      ​
          }
      }

3.CyclicBarrier: 循环栅栏

CyclicBarrier 字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。

image-20210625140957795

1.主要方法
//设置parties、count及barrierCommand属性。   
CyclicBarrier(int):   
  
//当await的数量到达了设定的数量后,首先执行该Runnable对象。   
CyclicBarrier(int,Runnable):   
  
//通知barrier已完成线程   
await():
2. 使用场景
    1. 10个工程师一起来公司应聘,招聘方式分为笔试和面试,首先要等人到齐后,开始笔试;笔试结束后,再一起参加面试。

public class CyclicBarrierDemo {
​
    /**
     * 场景:
     * 10个工程师一起来公司应聘,招聘方式分为笔试和面试,首先要等人到齐后,开始笔试;笔试结束后,再一起参加面试。
     */
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{
            System.out.println("***********下面进行面试测试**************");
​
        });
        CyclicBarrier interviewCycle = new CyclicBarrier(10,()->{
            System.out.println("*********谢谢大家参加公司的招聘工作*********");
        });
​
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName() + "笔试结束");
                    cyclicBarrier.await();
​
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                    System.out.println(Thread.currentThread().getName() + "面试结束");
                    interviewCycle.await();
​
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, "工程师"+String.valueOf(i)).start();
        }
        System.out.println("***********下面进行笔试测试**************");
    }
}

image-20210625145633103

以上代码使用了2个CyclicBarrier来表笔试和面试的同步点,只有所有人的笔试完成(cyclicBarrier这是一个同步点)后才能进行面试工作,所有人的面试完成(interviewCycle这是一个同步点点)后才能宣布结束。同时使用构造函数CyclicBarrier(int parties,Runnable barrierAction)

image-20210625144401754

该构造函数表示线程等待的数量等于parties时,才会执行barrierAction这个方法。以上例子,当调用10次cyclicBarrier.awati()后,即第10个线程的打印出“笔试结束”这一句后就会执行cyclicBarrier.barrierAction方法里面的打印语句“下面进行面试测试”,同理调用10次interviewCycle.await()方法后,第10个线程打印出“面试结束”这一句后就会执行interviewCycle.barrierAction方法里面的打印语句“谢谢大家参加公司的招聘工作”。

image-20210625145834329

以上例子创建了2各不同的CyclicBarrier对象来实现,不能体现出CyclicBarrier可循环重复使用的性质,是否可以只创建一个cyclicBarrier对象就可以完成以上实例呢?

public class CyclicBarrierDemo3 {
​
    /**
     * 场景:
     * 10个工程师一起来公司应聘,招聘方式分为笔试和面试,首先要等人到齐后,开始笔试;笔试结束后,再一起参加面试。
     */
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
​
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName() + "笔试结束");
                    cyclicBarrier.await();
​
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                    System.out.println(Thread.currentThread().getName() + "面试结束");
                    cyclicBarrier.await();
​
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, "工程师"+String.valueOf(i)).start();
        }
        System.out.println("***********下面进行笔试测试**************");
        cyclicBarrier.await();
        System.out.println("***********下面进行面试测试**************");
        cyclicBarrier.await();
        System.out.println("***********谢谢大家参加公司的招聘工作**************");
    }
}

image-20210625145558073

特别注意的 CyclicBarrier cyclicBarrier = new CyclicBarrier(11);为什么是11而不是10呢?因为main主线程也有调用cyclicBarrier.await();如果是10的话会造成程序一直处于运行状态,CyclicBarrier一直处于阻塞状态。

    1. CountDownLatch和CyclicBarrier综合应用

      场景:模拟10个考生参加考试的过程,具体流程如下

      • 1.首先考生在考场外等待考试开始的铃声响起

      • 2.考场main考试开始铃声响起

      • 3.考生听到铃声进入考场开始答题

      • 4.考生考试结束交卷

      • 5.等待全部考生交卷后考场考试结束

public class CyclicBarrierDemo2 {
    
    public static void main(String[] args) {
​
        CyclicBarrier studentCyclicBarrier = new CyclicBarrier(11);
​
        CountDownLatch examCoundownLatch = new CountDownLatch(1);
​
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                try {
​
                    System.out.println(Thread.currentThread().getName() + "在等待考试开始的铃声响起");
                    examCoundownLatch.await();//等待考场主线程执行完毕
​
                    System.out.println("考生听到铃声" + Thread.currentThread().getName() + "开始答题");
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
​
                    System.out.println("考生" +Thread.currentThread().getName()+ "答题完毕,开始交卷");
                    studentCyclicBarrier.await();
​
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },"考生" + String.valueOf(i)).start();
        }
​
        //main-->考场主线程
        try {
​
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("考场" + Thread.currentThread().getName() +"铃声即将响起");
                System.out.println("考场" + Thread.currentThread().getName() + "铃声响起");
            } finally {
                examCoundownLatch.countDown();
            }
            studentCyclicBarrier.await();
​
            System.out.println("考生都已经交卷,"+Thread.currentThread().getName()+"考试结束");
​
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

image-20210625150915665

4.线程池

为了避免系统频繁的创建和销毁线程,可以让创建的线程进行复用。简而言之,在使用线程池后,创建线程变成了从线程池获取空闲线程,关闭线程变成了向线程池归还线程。线程池主要掌握3大方法、7大参数、4种拒绝策略

image-20210625174757806

1. 7大参数

image-20210625174648714

在这7个参数中,大多数都很简单,只有workQueue和handler需要进行详细的说明

    • workQueue: 是指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于Runnable对象。

      • SynchronousQueue: 直接提交队列

        image-20210625235603023

        SynchronousQueue没有容量,提交的任务不会被真实的保存,而总是将新任务提交给线程执行。如果没有空闲的进程(即线程数大于corePoolSize),则尝试创建新的线程,如果线程数量已经大于最大值maxmumPoolSize,则执行拒绝策略。

        线程数<= corePoolSize

        image-20210625223149385 线程数 <= maxPoolSize

        image-20210625223650002 线程数 > maxPoolSize

        image-20210625223835956

      • ArrayBlockingQueue: 有边界阻塞队列

        ArrayBlockingQueue的构造函数必须带一个参数,表示该队列的最大容量。

        image-20210625224246744

        线程数小于等于corePoolSize时,优先创建新的线程

        corePoolSize < 线程数 时,会将线程放进队列中

        如果队列已满无法,无法加入,而且线程数 <= maxPoolSize时,创建新的线程

        线程数 > maxPoolSize + capacity(队列容量), 则执行拒绝策略

        image-20210625230039963 image-20210625230112357

      • LinkedBlockingQueue: 无边界阻塞队列

        与ArryBlockingQueue队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。

        image-20210625234945271

        如果不指定阻塞队列的容量的前提下,默认容量为Integer.MAX_VALUE.

        • 线程数 <= corePoolSize时,线程池生成新的线程执行任务

        • 线程数 > corePoolSize时,任务直接进入队列,如果任务创建速度 > 处理的速度,无界队列会快速增长,直到资源耗尽

        image-20210625231047546

        如果指定了阻塞队列的长度,线程数 > maxPoolSize + capacity的话,会执行拒绝策略

        image-20210625235241666

      • PriorityBlockingQueue: 优先任务队列

        它是一个特殊的无界队列,可以根据任务自身的优先级顺序先后执行。ArrayBlockingQueue和LinkedBlockingQueue队列都是按照先进先出算法处理任务的。无参构造的话初始容量为11

        image-20210625232650445

      注意报错

      java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
          at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
          at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
          at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
          at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
          at com.sean.thread.pool.TestPool.main(TestPool.java:27)
2. 3大方法

《Java开发手册》中明确提出不建议使用Exectors去创建线程池,而是通过ThreadPoolExecutor去创建

image-20210628101937962

image-20210628101412410

image-20210628101509867

image-20210628101553306

image-20210628102117874

3. 4种拒绝策略

JDK内置提供了四种拒绝策略:

    • AbortPolicy: 该策略会直接抛出异常,阻止系统正常工作

    • CallerRunsPolicy: 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

    • DiscardOledestPolicy: 该策略丢弃最老的一个请求,也就是将被执行的一个任务,并尝试再次提交当前任务

    • DiscardPolicy:该策略默默的丢弃无法处理的任务,不予处理。

4. 自定义创建线程:ThreadFactory

线程池的主要作用是为了线程复用,也就避免了线程的频繁创建。但是最开始的那些线程从何而来?答案就是ThreadFactory.

ThreadFactory是一个接口,里面就只有一个方法,用来创建线程

image-20210628140810725

自定义线程池可以自定义线程的名称、组以及优先级等信息

/**
 * 自定义线程工厂
 */
class NamedThreadFactory implements ThreadFactory{
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    NamedThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        if (null == name || name.isEmpty()) {
            name = "pool";
        }
        namePrefix = name + "-窗口-";
    }
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
        return thread;
    }

}

线程池的execute()和submit()方法的区别 (https://www.jianshu.com/p/6bcb61fd801b)

    • 接收参数不一样

    • submit有返回值,返回一个Future对象,而execute没有返回值

    • submit方便Exception处理

public class RunnableTestMain {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        
        /**
         * execute(Runnable x) 没有返回值。可以执行任务,但无法判断任务是否成功完成。
         */
        pool.execute(new RunnableTest("Task1")); 
        
        /**
         * submit(Runnable x) 返回一个future。可以用这个future来判断任务是否成功完成。请看下面:
         */
        Future future = pool.submit(new RunnableTest("Task2"));
        
        try {
            if(future.get()==null){//如果Future's get返回null,任务完成
                System.out.println("任务完成");
            }
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
            //否则我们可以看看任务失败的原因是什么
            System.out.println(e.getCause().getMessage());
        }

    }

}

 class RunnableTest implements Runnable {
    
    private String taskName;
    
    public RunnableTest(final String taskName) {
        this.taskName = taskName;
    }

    @Override
    public void run() {
        System.out.println("Inside "+taskName);
        throw new RuntimeException("RuntimeException from inside " + taskName);
    }

}

 

 

 

原文地址:https://www.cnblogs.com/seanRay/p/14944707.html