JUC基础

JUC基础

多线程编程基础

多线程编程步骤(上)

  • 第一步 创建资源类,在资源类创建属性和操作方法
  • 第二步 创建多个线程,调用资源类的操作方法

多线成编程步骤(中) //线程间通信

  • 第一步 创建资源类,在资源类创建属性和操作方法
  • 第二步 操作方法中分为 判断 干活 通知
  • 第三步 创建多个线程,调用资源类的操作方法

多线成编程步骤(下) //防止虚假唤醒

  • 第一步 创建资源类,在资源类创建属性和操作方法
  • 第二步 操作方法中分为 判断 干活 通知
  • 第三步 创建多个线程,调用资源类的操作方法
  • 第四步 需要看情况将判断放入while循环中,为了防止虚假唤醒

1.JUC是什么

java.util.concurrent包

2.Lock接口

2.1 复习Synchronized

Synchronized卖票的例子

3个售票员(线程) 卖出30张票(资源类)

//资源类
class SaleTicket{

    private int number = 30;

    public synchronized void sale(){
        if (number >0){
            System.out.println(Thread.currentThread().getName() + ": 卖出:" + (number-- )+ "还剩下:" + number);
        }
    }
}

public class Test01 {
    public static void main(String[] args) {
        //创建资源对象
        final SaleTicket saleTicket = new SaleTicket();

        //创建多个线程,并调用资源类的方法
        Thread t1 = new Thread( ()->{
            for (int i = 0; i < 40; i++) {
                saleTicket.sale();
            }
        },"售票员1");


        Thread t2 = new Thread( ()->{
            for (int i = 0; i < 40; i++) {
                saleTicket.sale();
            }
        },"售票员2");


        Thread t3 = new Thread( ()->{
            for (int i = 0; i < 40; i++) {
                saleTicket.sale();
            }
        },"售票员3");

        t1.start();
        t2.start();
        t3.start();
    }
}

2.2 什么是Lock接口

java.util.concurrent.locks
为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。
Lock 比 synchronized 功能更强大。

接口Lock
所有已知实现类:
ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock

2.2.1 Lock 和 synchronized 的比较

  1. Lock 是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现。
  2. synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁。而Lock发生异常时,需要通过 try- catch-finally 来 主动 lock.unLock() 释放锁。
  3. Lock可以让等待锁的线程响应中断(使用lock.lockInterruptibly() 请求lock并被阻塞时可中断),而synchronized却不行,使用synchronized时,等待的线程会以只等待下去,不能够响应中断。
  4. 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  5. Lock可以提高多个线程进行读操作的效率。

2.3 创建线程的多种方式

java.util.Thread 类 因为它实现了Runnable接口 ,且构造器中也可以传入Runnable实现,因此有有 2.3.1 和 2.3.2两种创建线程的方式

//实现了Runnable类
public class Thread implements Runnable  {
    
    //构造器可传入Runnable的实现
    public Thread(Runnable target) {
        init(null, target, "Thread-" + nextThreadNum(), 0);
   }
}   

2.3.1 继承Thread类

class Thread1 extends Thread{
    @Override
    public void run() {
        System.out.println(getName());
    }
}

public class ThreadTest {
    public static void main(String[] args) {
        Thread1 thread1 = new Thread1();
        thread1.start();
    }
}

2.3.2 实现Runnable接口

public class RunnableTest {
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
				 System.out.println(Thread.currentThread().getName());
            }
        });
        thread.start();
    }
}

2.3.3 使用Callable接口

7

2.3.4 使用线程池

12

2.4 使用Lock实现卖票的例子

class LTicket{

    private int number = 30;

    private final Lock lock = new ReentrantLock();

    public void  sale(){
        lock.lock();
        try {
            if (number > 0){
                System.out.println(Thread.currentThread().getName() + ": 卖出:" + (number-- )+ "还剩下:" + number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}


public class Test02 {

    public static void main(String[] args) {
        LTicket lTicket = new LTicket();

        new Thread(()-> {
            for (int j = 0; j < 40; j++) {
                lTicket.sale();
            }
        },"售票员1").start();

        new Thread(()-> {
            for (int j = 0; j < 40; j++) {
                lTicket.sale();
            }
        },"售票员2").start();

        new Thread(()-> {
            for (int j = 0; j < 40; j++) {
                lTicket.sale();
            }
        },"售票员3").start();
        
    }
}

3.线程间通信

3.1 Synchroinzed 线程间通信

wait() notify() notifyAll()

虚假唤醒问题案例

问题: 有4个线程,一个初始变量值为0, 2个线程对这个变量进行+1,2个线程对这个变量进行-1. 要求加一减一操作交替执行。

错误代码:

//定义资源类
class Share{
    private int num = 0;

    public synchronized void incr() throws InterruptedException {
        //判断
        if (num != 0){           //错误之处  需要使用循环判断  while(num != 0)
            this.wait();  //调用wait()线程等待释放锁,等待被其它线程notify()唤醒,若唤醒则仍从此处开始执行,因此需要加上while循环判断,否则会造成虚假唤醒现象
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + " :: " + num);
        //通知
        this.notifyAll();
    }

    public synchronized void decr() throws InterruptedException {
        //判断
        if (num != 1){    //错误之处  需要使用循环判断  while(num != 0)
            this.wait();
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + " :: " + num);
        //通知
        this.notifyAll();
    }

}

public class Test03 {
    public static void main(String[] args) {
        Share share = new Share();

        //创建多线程
        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"AA").start();

        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"BB").start();

        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"CC").start();

        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"DD").start();
    }
}

3.2 Lock 线程间通信

await() signal() signalAll()

//创建资源类
class Share1{
    private  int number = 0;

    //创建Lock
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    //+1
    public void incr() throws InterruptedException {
        lock.lock();
        try {
            //判断
            while (number != 0){
                condition.await();
            }
            //干活
            number++;
            System.out.println(Thread.currentThread().getName() + " :: " + number);

            //通知
            condition.signalAll();
        }  finally {
            lock.unlock();
        }
    }

    //-1
    public void decr() throws InterruptedException {
        lock.lock();
        try {
            //判断
            while (number != 1){
                condition.await();
            }
            //干活
            number--;
            System.out.println(Thread.currentThread().getName() + " :: " + number);

            //通知
            condition.signalAll();
        }  finally {
            lock.unlock();
        }
    }

}



public class Test04 {
    public static void main(String[] args) {

        Share1 share1 = new Share1();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    share1.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"AA").start();


        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    share1.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"BB").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    share1.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"CC").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    share1.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"DD").start();
    }
}

4.线程间定制化调用通信

要求: 创建三个线程, 打印AA 5次,打印 BB 10次, 打印CC 15次 按照 AA -> BB -> CC 顺序执行 且循环10次。

//资源类
class ThreadPrint{
	//创建锁
    private Lock lock = new ReentrantLock();
    
    //创建三个条件,相当于三个队列,分别对应三个线程
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();
    private int flag = 1;

	
    public void printAA() throws InterruptedException {
        lock.lock();
        try {
            while (flag != 1){
                c1.await();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println("AA");
            }
            flag = 2;
            c2.signal();
        } finally {
            lock.unlock();
        }
    }

    public void printBB() throws InterruptedException {
        lock.lock();
        try {
            while (flag != 2){
                c2.await();
            }
            for (int i = 0; i < 10; i++) {
                System.out.println("BB");
            }
            flag = 3;
            c3.signal();
        } finally {
            lock.unlock();
        }
    }

    public void printCC() throws InterruptedException {
        lock.lock();
        try {
            while (flag != 3){
                c3.await();
            }
            for (int i = 0; i < 15; i++) {
                System.out.println("CC");
            }
            flag = 1;
            c1.signal();
        } finally {
            lock.unlock();
        }
    }

}

public class Test05 {
    public static void main(String[] args) {
        ThreadPrint threadPrint = new ThreadPrint();

		//创建三个线程,分别负责打印AA ,BB ,CC
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    threadPrint.printAA();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    threadPrint.printBB();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    threadPrint.printCC();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
}

5.集合的线程安全

5.1 ArrayList集合线程不安全演示

报错异常错误 为: java.util.ConcurrentModificationException

ArrayList是线程不安全的

public class Test06 {
    public static void main(String[] args) {

        ArrayList<String> list = new ArrayList<>();

        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                //向集合添加内容
                list.add(UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

5.1.1 解决方案-Vector

因为在方法上都加上了 synchronized关键字, 所以是线程安全的

public class Test06 {
    public static void main(String[] args) {

        Vector<String> list = new Vector<>();

        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                //向集合添加内容
                list.add(UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

5.1.2 解决方案- Collections

public class Test06 {
    public static void main(String[] args) {

        List<String> list = Collections.synchronizedList(new ArrayList<>());

        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                //向集合添加内容
                list.add(UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

5.1.3 解决方案- CopyOnWriteArrayList

java.util.concurrent 包下的 CopyOnWriteArrayList

public class Test06 {
    public static void main(String[] args) {

        List<String> list = new CopyOnWriteArrayList();

        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                //向集合添加内容
                list.add(UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

5.2 HashSet线程不安全演示

public class Test06 {
    public static void main(String[] args) {

        Set<String> set = new HashSet<>();

        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                //向集合添加内容
                set.add(UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
}

5.2.1 解决方案- CopyOnWriteArraySet

public class Test06 {
    public static void main(String[] args) {

        Set<String> set = new CopyOnWriteArraySet<>();

        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                //向集合添加内容
                set.add(UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
}

5.3 HashMap 线程不安全演示

public class Test06 {
    public static void main(String[] args) {

        Map<String,String> map = new HashMap<>();

        for (int i = 0; i < 30; i++) {
            String key = String.valueOf(i);
            new Thread(()->{
                //向集合添加内容
                map.put(key,UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

5.3.1 解决方案 - ConcurrentHashMap

public class Test06 {
    public static void main(String[] args) {

        Map<String,String> map = new ConcurrentHashMap<>();

        for (int i = 0; i < 30; i++) {
            String key = String.valueOf(i);
            new Thread(()->{
                //向集合添加内容
                map.put(key,UUID.randomUUID().toString().substring(0,8));

                //从集合获取内容
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

6.多线程锁

6.1 Synchronized 8锁

synchronized 实现同步的基础: Java中的每一个对象都可以作为锁

具体表现为以下3中形式:

  • 对于普通同步方法,锁是当前实例对象
  • 对于静态同步方法,锁是当前类的Class对象
  • 对于同步方法块,锁是Synchronized括号里配置的对象

6.2 公平锁与非公平锁

公平锁 效率低

非公平锁 效率高

6.3 可重入锁

可重入锁也叫递归锁 synchronized (隐式) 和 lock(显式)两个都是可重入锁

详见 JUC进阶 1

6.4 死锁

产生死锁的原因:

  • 系统资源不足
  • 进程运行推进顺序不合适
  • 资源分配不当

验证是否是死锁:

  1. jps -l 查看进程号
  2. jstack 根据进程号查看错误

7.Callable接口

Runnable接口有实现类 FutrueTask

FutrueTask 的构造方法可以传入 Callable 实现

//实现Runnable接口
class MyThread1 implements Runnable{

    @Override
    public void run() {
        System.out.println("Runnable方式");
    }
}


//实现Callable接口

class MyThread2 implements Callable{

    @Override
    public Object call() throws Exception {
        System.out.println("Callable方式");
        return 200;
    }
}


public class Test08 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //Runnable接口创建线程
        new Thread(new MyThread1(),"AA").start();


        //Callable接口创建线程

        FutureTask futureTask = new FutureTask<>(new MyThread2());

        new Thread(futureTask,"BB").start();
        System.out.println(futureTask.get());
    }
}

8.JUC强大的辅助类

8.1 减少计数CountDownLatch

CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作
使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。

  • CountDownLatch 主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞.
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程会阻塞)
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

案例演示:

public class Test09 {
    //6个同学陆续离开之后,班长才能锁门
    public static void main(String[] args) throws InterruptedException {

        //创建计数器  设置初始值
        CountDownLatch countDownLatch = new CountDownLatch(6);


        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "号同学离开了");
                //计数器减1
                countDownLatch.countDown();
                },String.valueOf(i+1)).start();
        }

        //等待
        countDownLatch.await();

        System.out.println("班长锁门");
    }
}

8.2 循环栅栏CyclicBarrier

CyclicBarrier(int parties, Runnable barrierAction)

每当一个线程执行 CyclicBarrier.await() 实例方法后
值会加1 直到该值为parties时,则回调执行barrierAction

案例演示: 集齐7颗龙珠可以召唤神龙

//集齐7颗龙珠可以召唤神龙
public class Test10 {

    //创建固定值
    private  static  final int number = 7;

    public static void main(String[] args) {

        //创建CyclicBarrier
        CyclicBarrier cyclicBarrier = new CyclicBarrier(number,()->{
            System.out.println("集齐7颗龙珠可以召唤神龙!!");
        });

        for (int i = 1; i <= 7; i++) {
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName() + "星龙珠");
                   //等待  达到number为7,就会调用 CyclicBarrier构造器中的Runnable实现方法
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

8.3 信号灯 Semaphore

Semaphore 的构造方法中传入的第一个参数是最大信号量(可看成最大线程池)。使用acquire方法获得许可证,release方法释放许可。 当Semaphore(1) 时可做成一把同步锁

案例演示: 抢车位,6部汽车 3个停车位

//抢车位,6部汽车 3个停车位
public class Test11 {
    public static void main(String[] args) {

        //创建Semaphore,设置许可数量  模拟3个停车位
        Semaphore semaphore = new Semaphore(3);

        //模拟6辆汽车
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    // 抢占
                    semaphore.acquire();

                    System.out.println(Thread.currentThread().getName() + " 抢到了车位");

                    //设置随机停车时间
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));

                    System.out.println(Thread.currentThread().getName() + "----离开了车位");


                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //释放
                    semaphore.release();
                }

            },String.valueOf(i)).start();
        }

    }
}

9. ReetrantReadWriteLock读写锁

9.1 读写锁概述及案例

读锁: 共享锁 所谓独占即为独自占有,别的线程既不能获取到该锁的写锁,也不能获取到对应的读锁。

写锁: 独占锁 所谓共享即是所有线程都可以共同持有该读锁。

两个重点:

  1. 当一个线程获取了写锁(未释放),其他线程既不可以获取到写锁,也不能获取到读锁。
  2. 当一个线程获取到了读锁(未释放),其他线程可以获取到读锁,但是不能获取到写锁

读写操作 和 写写操作需要互相等待 但是读读操作则可以并行执行

优点:
在读操作次数远大于写操作次数的情况下, 读写锁可以发挥出最大的功效提升性能。

读写锁的访问约束情况。

非阻塞 阻塞
阻塞 阻塞

案例: 读多写少的情况下,比较可重入锁与读写锁的性能

//模拟ReentrantLock  与  ReentrantReadWriteLock  在读多写少情况下  两者之间性能上的差距
public class ReadWriteLockDemo {
    
    //创建普通锁
    private static Lock lock = new ReentrantLock();
    
    //创建读写锁
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    
    //创建一个需要读写的变量
    private int value;


    //实现读操作
    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();

            //模拟读操作,读操作的耗时越多,读写锁的优势会越明显
            Thread.sleep(1000);
            return value;
        } finally {
            lock.unlock();
        }
    }

    //实现写操作
    public void handleWrite(Lock lock,int index) throws InterruptedException {
        try {
            lock.lock();
            //模拟写操作
            Thread.sleep(1000);
            value=index;
        } finally {
            lock.unlock();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        
        //创建计数器 用来最终统计多线程的执行时间
        CountDownLatch countDownLatch = new CountDownLatch(20);

        ReadWriteLockDemo demo = new ReadWriteLockDemo();
        
        //记录多线程执行的起始时间点
        long start = System.currentTimeMillis();

        //创建18个线程 进行读
        for (int i = 0; i < 18; i++) {
            new Thread(()->{

                try {
                    //使用读锁    
                    demo.handleRead(readLock);           
//  使用普通锁       demo.handleRead(lock);
                    
                    //计数器-1
					countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }


        //创建2个线程  进行写
        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                try {
                     //使用写锁
                    demo.handleWrite(readLock,new Random().nextInt());
//  使用普通锁       demo.handleWrite(lock,new Random().nextInt());
                     //计数器-1
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
        
        //main线程阻塞,等待coutDownLatch减完成
        countDownLatch.await();
		
        //打印多线程执行 总耗时:
        System.out.println(System.currentTimeMillis() - start);
        
    }
}

最终程序执行结果:

普通锁(可重入锁): 由于所有的读写线程之间必须相互等待,因此整个程序执行时间长达20多秒

读写分离锁: 由于读线程是完全并行,而写线程会阻塞读读线程,写线程之间也会互相阻塞。因此整个程序执行时间大约2秒钟

9.2 读写锁降级

前提说明: 写权限 要大于 读权限 (举例: 在linux中 w>r)

锁降级概念: 所谓锁降级是指写锁降级为读锁, 能够保证修改完的数据立即可见性。

锁降级 :如果当前线程拥有某个对象的写锁,再获取该对象的读锁,随后释放写锁的过程 。
(获取写锁 --> 获取读锁 ---> 释放写锁 )

不是锁降级: 如果当前线程拥有某个对象的写锁,先释放该写锁后,再去获取该对象的读锁。该过程不是锁降级,这种情况可以理解为锁竞争,当释放了写锁后可能会有别的线程抢到该写锁并对数据做修改,而之后再获取读锁读的时候并不是第一次修改后的值。
(获取写锁---> 释放写锁 ----> 获取读锁)

错误的锁降级案例:
// (获取写锁---> 释放写锁 ----> 获取读锁)
public class Test12 {

    private static int value = 0;

    public static void main(String[] args) {

        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        Lock readLock  = readWriteLock.readLock();
        Lock writeLock  = readWriteLock.writeLock();


        //创建线程A
        new Thread(()->{
            writeLock.lock();
            value = 1;
            System.out.println(Thread.currentThread().getName() + "修改值为: " + 1);
            writeLock.unlock();
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "读取值为: "+ value);
            readLock.unlock();
        },"A").start();


        new Thread(()->{
            writeLock.lock();
            value = 2;
            System.out.println(Thread.currentThread().getName() + "修改值为: " + 2);
            writeLock.unlock();
        },"B").start();
    }
}
/** console:
    A修改值为: 1
    B修改值为: 2
    A读取值为: 2
**/

正确的锁降级案例:

//(获取写锁  -->  获取读锁 ---> 释放写锁)
public class Test12 {

    private static int value = 0;

    public static void main(String[] args) {

        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        Lock readLock  = readWriteLock.readLock();
        Lock writeLock  = readWriteLock.writeLock();


        //锁降级

        //创建线程A
        new Thread(()->{
            writeLock.lock();
            value = 1;
            System.out.println(Thread.currentThread().getName() + "修改值为: " + 1);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "读取值为: "+ value);

            writeLock.unlock();

            readLock.unlock();
        },"A").start();


        new Thread(()->{
            writeLock.lock();
            value = 2;
            System.out.println(Thread.currentThread().getName() + "修改值为: " + 2);
            writeLock.unlock();
        },"B").start();
    }
}

/**console:
        A修改值为: 1
        A读取值为: 1
        B修改值为: 2
**/

10. BlockingQueue阻塞队列

10.1 阻塞队列概述

阻塞队列 特点:

  • 线程间共享的
  • 队列空时 获取元素的线程会阻塞 直到队列中有元素 该线程会被唤醒
  • 队列满时 添加元素的线程会阻塞 直到队列中出现空位时 该线程会被唤醒

10.2 阻塞队列的架构

接口: BlockingQueue implements Collections , Queue

实现类:ArrayBlockingQueue,DelayQueue,LinkedBlockingDeque,LinkedBlockingQeque,
PriorityBlockingQueue,SynchronousQueue

10.3 阻塞队列分类

  • ArrayBlockingQueue(常用) 由数组结构组成的有界阻塞队列
  • LinkedBlockingQeque(常用) 由链表结构组成的有界(但大小默认为 integer.MAX_VALUE) 阻塞队列。
  • DelayQueue 使用优先级队列实现的延迟无界阻塞队列
  • PriorityBlockingQueue 支持优先级排序的无界阻塞队列
  • SynchronousQueue 不存储元素的阻塞队列,也即单个元素的队列
  • LinkedBlockingDeque 链表组成的双向队列

10.4 阻塞队列核心方法

  • add(e) remove element 会抛出异常组
        //创建阻塞队列
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);


        //add(e)  remove  element     会抛出异常组
// System.out.println(blockingQueue.element());   //java.util.NoSuchElementException
        System.out.println(blockingQueue.add("a"));  //true
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.element()); //a  查看第一个元素

//        System.out.println(blockingQueue.add("d"));  //Exception Queue full

        System.out.println(blockingQueue.remove());  //a 取第一个元素
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());  //java.util.NoSuchElementException
  • offer(e) poll peek() 特殊值

    //        //创建阻塞队列
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
    
    
            System.out.println(blockingQueue.peek());  //null
            System.out.println(blockingQueue.offer("a"));   //true
            System.out.println(blockingQueue.offer("b"));   //true
            System.out.println(blockingQueue.offer("c"));   //true
            System.out.println(blockingQueue.offer("d"));   //false
    
            System.out.println(blockingQueue.poll());   //a
            System.out.println(blockingQueue.poll());   //b
            System.out.println(blockingQueue.poll());   //c
            System.out.println(blockingQueue.poll());   //null
    
  • put(e) take() 阻塞

    //        //创建阻塞队列
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
    
    
            blockingQueue.put("a");
            blockingQueue.put("b");
            blockingQueue.put("c");
    //        blockingQueue.put("d");  //第四个 会阻塞
    
            System.out.println(blockingQueue.take());       //a
            System.out.println(blockingQueue.take());       //b
            System.out.println(blockingQueue.take());       //c
    //        System.out.println(blockingQueue.take());       //会阻塞
    
  • offer(e,time,unit) poll(time,unit) 超时

    //        //创建阻塞队列
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
    
            System.out.println(blockingQueue.offer("a"));
            System.out.println(blockingQueue.offer("b"));
            System.out.println(blockingQueue.offer("c"));
            System.out.println(blockingQueue.offer("d",3L, TimeUnit.SECONDS)); //阻塞3s 后返回false
    
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS)); //阻塞3s 后返回null
    

11. ThreadPool线程池

Java中的线程池 通过Executor框架实现的。该框架中用到了(I)Executor, (C)Executors工具类,(I)ExecutorService,(C)ThreadPooleExecutor

11.1 Executors工具类创建线程池

  • ExecutorService newFixedThreadPool(int nThreads) 一池N线程 固定长度的线程池
//一池5线程
ExecutorService threadPool = Executors.newFixedThreadPool(5);

//10个请求
try{
    for (int i = 1; i <= 10; i++) {
        //执行  传入Runnable实现
        threadPool.execute(()->{
            System.out.println(Thread.currentThread().getName() + "处理请求");
        });
    }
}catch (Exception e){
    e.printStackTrace();
}finally {
    //关闭
    threadPool.shutdown();
}

/**
pool-1-thread-1处理请求
pool-1-thread-4处理请求
pool-1-thread-1处理请求
pool-1-thread-3处理请求
pool-1-thread-1处理请求
pool-1-thread-1处理请求
pool-1-thread-2处理请求
pool-1-thread-3处理请求
pool-1-thread-4处理请求
pool-1-thread-5处理请求
**/
  • ExecutorService newSingleThreadExecutor() 一池一线程

    //1池1线程
    ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
    
    
    //10个请求
    try{
        for (int i = 1; i <= 10; i++) {
            //执行
           threadPool2.execute(()->{
                System.out.println(Thread.currentThread().getName() + "处理请求");
            });
        }
    }catch (Exception e){
        e.printStackTrace();
    }finally {
        //关闭
        threadPool2.shutdown();
    }
    /**
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    pool-2-thread-1处理请求
    **/
    
  • ExecutorService newCachedThreadPool() 可扩容,遇强则强

    //1池可扩容线程
    ExecutorService threadPool3 = Executors.newCachedThreadPool();
    
    //10个请求
    try{
        for (int i = 1; i <= 10; i++) {
            //执行  传入Runnable实现
            threadPool3.execute(()->{
                System.out.println(Thread.currentThread().getName() + "处理请求");
            });
        }
    }catch (Exception e){
        e.printStackTrace();
    }finally {
        //关闭
        threadPool3.shutdown();
    }
    
    /**
    pool-3-thread-1处理请求
    pool-3-thread-5处理请求
    pool-3-thread-4处理请求
    pool-3-thread-3处理请求
    pool-3-thread-2处理请求
    pool-3-thread-8处理请求
    pool-3-thread-7处理请求
    pool-3-thread-6处理请求
    pool-3-thread-10处理请求
    pool-3-thread-9处理请求
    
    **/
    

11.2 ThreadPoolExecutor参数说明

以上三种创建线程池的方式底层都是 new ThreadPoolExecutor()

ThreadPoolExecutor 构造中参数说明:

    public ThreadPoolExecutor(
        					  int corePoolSize,   //常驻(核心)线程数量
                             
                              int maximumPoolSize, //最大支持线程数量
        
                              long keepAliveTime, //线程存活时间(超过常驻线程数量的线程存活时间)
        
                              TimeUnit unit, //规定时间的单位
        
                              BlockingQueue<Runnable> workQueue,//阻塞队列  请求会放到该队列中
        
                              ThreadFactory threadFactory,//线程工厂  用于创建线程
        
                              RejectedExecutionHandler handler//拒绝策略 
                             )

拒绝策略

当任务请求数量超出 (maximumPoolSize + workQueue) 会有以下4种处理方式:

  • AbortPolicy(默认) 直接抛出异常
  • CallerRunsPolicy: 该策略既不抛弃任务也不抛出异常,而是回退给调用者,从而降低新任务的流量。
  • DiscardOldestPolicy :抛弃队列中等待最久的任务,然后把新来的任务加入到队列中并优先执行(加塞)
  • DiscardPolicy: 该策略默默丢弃无法处理的任务,不做任何处理不抛异常,如果允许任务丢失,这是最好的一种策略。

11.3 自定线程池

实际开发中 阿里强制不允许使用Executors去创建线程池 而是通过new ThreadPoolExecutor() 的方式。

说明:Executors返回的线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadPool : 允许的请求队列长度为Integer.MAX_VALUE, 可能会堆积大量请求,从而导致OOM
  • CachedThreadPoolScheduledThreadPool: 允许创建的线程数量为Integer.MAX_VALUE, 可能会创建大量的线程,从而导致OOM

自定义案例:

public class Test14 {
    public static void main(String[] args) {
		
        //创建线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        //10个请求
        try{
            for (int i = 1; i <= 10; i++) {
                //执行
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName() + "处理请求");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //关闭
            threadPool.shutdown();
        }

    }
}

12. Fork/Join分支合并框架

目标:会使用就行

案例:计算 0+1+2+....+100 的结果

class MyTask extends RecursiveTask<Integer>{

    //拆分差值不能超过10
    private static final Integer VALUE = 10;

    private int begin; //拆分开始值
    private int end;  //拆分结束值
    private int result; //返回结果


    //创建有参构造
    public MyTask(int begin,int end){
        this.begin = begin;
        this.end = end;
    }

    //拆分合并的逻辑过程
    @Override
    protected Integer compute() {
        //判断
        if((end - begin) <= VALUE){
            //相加操作
            for (int i = begin; i <= end; i++) {
                result = result +i;
            }
        }else {
            //做拆分
            int middle = (end + begin)/2;

            //拆分左边
            MyTask task01 = new MyTask(begin,middle);
            task01.fork();

            //拆分右边
            MyTask task02 = new MyTask(middle+1,end);
            task02.fork();

            //合并结果
            result = task01.join() + task02.join();
        }
        return result;
    }
}

public class Test15 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //创建任务对象
        MyTask myTask = new MyTask(0,100);

        //创建分支合并池对象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);

        //获取最终合并之后的结果
        Integer res = forkJoinTask.get();
        System.out.println(res);


        //关闭
        forkJoinPool.shutdown();
    }
}
万般皆下品,唯有读书高!
原文地址:https://www.cnblogs.com/s686zhou/p/15026986.html