JUC

一、JUC是什么

java.util.concurrent在并发编程中使用的工具类

二、Lock接口

1、lock是什么?

锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。

2、Lock接口的实现ReentrantLock可重入锁

class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...
 
   public void m() {
     lock.lock();  
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
}

3、创建线程方式

  1. 继承Thread,java是单继承,资源宝贵不推荐使用。

  2. 实现runnable方法
    新建类实现runnable接口

    class MyThread implements Runnable//新建类实现runnable接口
     
    new Thread(new MyThread,...)
    

    匿名内部类

    new Thread(new Runnable() {
        @Override
        public void run() {
     
        }
       }, "your thread name").start();
     
     这种方法不需要创建新的类,可以new接口
    

    lambda表达式

    new Thread(() -> {
     
     }, "your thread name").start();
     
      这种方法代码更简洁精炼
    

三、java8特性

1、lambda表达式

Lambda 是一个匿名函数,我们可以把 Lambda表达式理解为是一段可以传递的代码(将代码像数据一样进行传递)。可以写出更简洁、更灵活的代码。作为一种更紧凑的代码风格,使Java的语言表达能力得到了提升。

Lambda 表达式在Java 语言中引入了一个新的语法元素和操作符。这个操作符为 “->” , 该操作符被称
为 Lambda 操作符或剪头操作符。它将 Lambda 分为两个部分:
左侧:指定了 Lambda 表达式需要的所有参数

右侧:指定了 Lambda 体,即 Lambda 表达式要执行的功能

拷贝小括号(),写死右箭头->,落地大括号{...}

lambda表达式,必须是函数式接口,必须只有一个方法。如果接口只有一个方法java默认它为函数式接口。
为了正确使用Lambda表达式,需要给接口加个注解:@FunctionalInterface,如有两个方法,立刻报错。

2、接口里是否能有实现方法?

接口里在java8后容许有接口的实现,default方法默认实现

default int div(int x,int y) {
  return x/y;
 }

接口里default方法可以有几个?

多个

3、 静态方法实现:接口新增

public static int sub(int x,int y){
  return x-y;
}

四、线程间通信

1、线程间通信:1、生产者+消费者2、通知等待唤醒机制

package com.atguigu.thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
import org.omg.IOP.Codec;
 
 
class ShareDataOne//资源类
{
  private int number = 0;//初始值为零的一个变量
 
  public synchronized void increment() throws InterruptedException 
  {
     //1判断
     if(number !=0 ) {
       this.wait();
     }
     //2干活
     ++number;
     System.out.println(Thread.currentThread().getName()+"	"+number);
     //3通知
     this.notifyAll();
  }
  
  public synchronized void decrement() throws InterruptedException 
  {
     // 1判断
     if (number == 0) {
       this.wait();
     }
     // 2干活
     --number;
     System.out.println(Thread.currentThread().getName() + "	" + number);
     // 3通知
     this.notifyAll();
  }
}
 
/**
 * 
 * @Description:
 *现在两个线程,
 * 可以操作初始值为零的一个变量,
 * 实现一个线程对该变量加1,一个线程对该变量减1,
 * 交替,来10轮。 
 * @author xialei
 *
 *  * 笔记:Java里面如何进行工程级别的多线程编写
 * 1 多线程变成模板(套路)-----上
 *     1.1  线程    操作    资源类  
 *     1.2  高内聚  低耦合
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 
 */
public class NotifyWaitDemoOne
{
  public static void main(String[] args)
  {
     ShareDataOne sd = new ShareDataOne();
     new Thread(() -> {
       for (int i = 1; i < 10; i++) {
          try {
            sd.increment();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "A").start();
     new Thread(() -> {
       for (int i = 1; i < 10; i++) {
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "B").start();
  }
}
/*
 * * 
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 * 3 防止虚假唤醒用while
 * */

对标实现

五、NotSafeDemo

ArrayList,HashMap,HashSet集合类是不安全的。

解决方案:

  1. Vector

  2. Collections

  3. 写时复制

    List<String> list = new CopyOnWriteArrayList<>();
    

    CopyOnWriteArrayList是arraylist的一种线程安全变体,其中所有可变操作(add、set等)都是通过生成底层数组的新副本来实现的。

    CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,
    而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
    添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
    这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
    所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

    public boolean add(E e) {    //底层原理
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    

扩展类比

HashSet

Set<String> set = new HashSet<>();//线程不安全
Set<String> set = new CopyOnWriteArraySet<>();//线程安全

HashSet底层数据结构是什么?
HashMap

但HashSet的add是放一个值,而HashMap是放K、V键值对。

HashMap

Map<String,String> map = new HashMap<>();//线程不安全
Map<String,String> map = new ConcurrentHashMap<>();//线程安全

六、多线程锁

1、锁的8个问题

1 标准访问,先打印短信还是邮件

短信

2 停4秒在短信方法内,先打印短信还是邮件

短信

3 普通的hello方法,是先打短信还是hello

hello

4 现在有两部手机,先打印短信还是邮件

邮件

5 两个静态同步方法,1部手机,先打印短信还是邮件

短信

6 两个静态同步方法,2部手机,先打印短信还是邮件

短信

7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件

邮件

8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件

邮件

class Phone
{
 
 public  synchronized void sendSMS() throws Exception
 {
   
   System.out.println("------sendSMS");
 }
 public synchronized void sendEmail() throws Exception
 {
   System.out.println("------sendEmail");
 }
 
 public void getHello() 
 {
   System.out.println("------getHello");
 }
 
}
 public static void main(String[] args) throws Exception
 {
   Phone phone = new Phone();
   Phone phone2 = new Phone();
   
   new Thread(() -> {
    try {
     phone.sendSMS();
    } catch (Exception e) {
     e.printStackTrace();
    }
   }, "AA").start();
   
   Thread.sleep(100);
   
   new Thread(() -> {
    try {
     phone.sendEmail();
     //phone.getHello();
     //phone2.sendEmail();
    } catch (Exception e) {
     e.printStackTrace();
    }
   }, "BB").start();
 }
}

2、8锁分析

  • 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法

  • 锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法,加个普通方法后发现和同步锁无关,换成两个对象后,不是同一把锁了,情况立刻变化。

  • synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
    具体表现为以下3种形式。
    对于普通同步方法,锁是当前实例对象。
    对于静态同步方法,锁是当前类的Class对象。
    对于同步方法块,锁是Synchonized括号里配置的对象

  • 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。

  • 所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!

七、Callable接口

面试题:获得多线程的方法几种?

四种。继承Thread ,实现Runnable,以及jdk1.5出现的 实现callable接口 和 线程池。

1、Callable接口与runnable对比

 //创建新类MyThread实现runnable接口
class MyThread implements Runnable{
 @Override
 public void run() {
 
 }
}
//新类MyThread2实现callable接口
class MyThread2 implements Callable<Integer>{
 @Override
 public Integer call() throws Exception {
  return 200;
 } 
}

答:(1)是否有返回值
(2)是否抛异常
(3)落地方法不一样,一个是run,一个是call

2、Callable接口使用

FutureTask<Integer> ft = new FutureTask<Integer>(new MyThread());
new Thread(ft, "AA").start();
//MyThread是实现Callable接口的对象。

3、FutureTask

  • 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,
    当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

  • 一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

  • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。 一旦计算完成,就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

  • 只计算一次
    get方法放到最后

class MyThread implements Runnable{

    @Override
    public void run() {
    }
}
class MyThread2 implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        System.out.println(Thread.currentThread().getName()+"come in callable");
        return 200;
    }
}
public class CallableDemo {
    public static void main(String[] args) throws Exception {

        //FutureTask<Integer> futureTask = new FutureTask(new MyThread2());
        FutureTask<Integer> futureTask = new FutureTask(()->{
            System.out.println(Thread.currentThread().getName()+"  come in callable");
            TimeUnit.SECONDS.sleep(4);
            return 1024;
        });
        FutureTask<Integer> futureTask2 = new FutureTask(()->{
            System.out.println(Thread.currentThread().getName()+"  come in callable");
            TimeUnit.SECONDS.sleep(4);
            return 2048;
        });

        new Thread(futureTask,"zhang3").start();
        new Thread(futureTask2,"li4").start();

        //System.out.println(futureTask.get());
        //System.out.println(futureTask2.get());
        //1、一般放在程序后面,直接获取结果
        //2、只会计算结果一次

        while(!futureTask.isDone()){
            System.out.println("***wait");
        }
        System.out.println(futureTask.get());
        System.out.println(Thread.currentThread().getName()+" come over");
    }
}

八、JUC强大的辅助类

1、CountDownLatch减少计数

引入:main主线程必须要等前面6个线程完成全部工作后,自己才能结束。

例子:6个同学陆续离开教室后值班同学才可以关门。错误的实现:

/**
 * 
 * @Description:
 *  *让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
 * 
 * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
 * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
 */
public class CountDownLatchDemo
{
   public static void main(String[] args) throws InterruptedException
   {
         CountDownLatch countDownLatch = new CountDownLatch(6);
       
       for (int i = 1; i <=6; i++) //6个上自习的同学,各自离开教室的时间不一致
       {
          new Thread(() -> {
              System.out.println(Thread.currentThread().getName()+"	 号同学离开教室");
              countDownLatch.countDown();
          }, String.valueOf(i)).start();
       }
       countDownLatch.await();
       System.out.println(Thread.currentThread().getName()+"	****** 班长关门走人,main线程是班长");
          
   }
}

2、CyclicBarrier循环栅栏

/**
 * CyclicBarrier
 * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
 * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
 * 直到最后一个线程到达屏障时,屏障才会开门,所有
 * 被屏障拦截的线程才会继续干活。
 * 线程进入屏障通过CyclicBarrier的await()方法。
 * 
 * 集齐7颗龙珠就可以召唤神龙
 */
public class CyclicBarrierDemo
{
  private static final int NUMBER = 7;
  
  public static void main(String[] args)
  {
     //CyclicBarrier(int parties, Runnable barrierAction) 
     
     CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{System.out.println("*****集齐7颗龙珠就可以召唤神龙");}) ;
     
     for (int i = 1; i <= 7; i++) {
       new Thread(() -> {
          try {
            System.out.println(Thread.currentThread().getName()+"	 星龙珠被收集 ");
            cyclicBarrier.await();
          } catch (InterruptedException | BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       
       }, String.valueOf(i)).start();
     }
  }
}

3、Semaphore信号灯

/**
 * 
 * 在信号量上我们定义两种操作:
 * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
 *             要么一直等下去,直到有线程释放信号量,或超时。
 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
 * 
 * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
 */
public class SemaphoreDemo
{
  public static void main(String[] args)
  {
     Semaphore semaphore = new Semaphore(3);//模拟3个停车位
     
     for (int i = 1; i <=6; i++) //模拟6部汽车
     {
       new Thread(() -> {
          try 
          {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"	 抢到了车位");
            TimeUnit.SECONDS.sleep(new Random().nextInt(5));
            System.out.println(Thread.currentThread().getName()+"	------- 离开");
          } catch (InterruptedException e) {
            e.printStackTrace();
          }finally {
            semaphore.release();
          }
       }, String.valueOf(i)).start();
     }     
  }
}

九、ReentrantReadWriteLock读写锁

类似案例:

红蜘蛛软件、缓存;只能一个写,可以多个读。

讲课:只能老师操作ppt,学生只能读。

class MyCache {
    private volatile Map<String, Object> map = new HashMap<>();
    private ReadWriteLock rwLock = new ReentrantReadWriteLock();

    public void put(String key, Object value) {
        rwLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "	 正在写" + key);
            //暂停一会儿线程
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "	 写完了" + key);
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            rwLock.writeLock().unlock();
        }

    }

    public Object get(String key) {
        rwLock.readLock().lock();
        Object result = null;
        try {
            System.out.println(Thread.currentThread().getName() + "	 正在读" + key);
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            result = map.get(key);
            System.out.println(Thread.currentThread().getName() + "	 读完了" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            rwLock.readLock().unlock();
        }
        return result;
    }
}

public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();

        for (int i = 1; i <= 5; i++) {
            final int num = i;
            new Thread(() -> {
                myCache.put(num + "", num + "");
            }, String.valueOf(i)).start();
        }
        for (int i = 1; i <= 5; i++) {
            final int num = i;
            new Thread(() -> {
                myCache.get(num + "");
            }, String.valueOf(i)).start();
        }
    }
}

十、BlockingQueueDemo阻塞队列

栈与队列:

  • 栈:先进后出,后进先出
  • 队列:先进先出

当队列是空的,从队列中获取元素的操作将会被阻塞

当队列是满的,从队列中添加元素的操作将会被阻塞

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起

为什么需要BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

1、BlockingQueue架构图

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
  • LinkedTransferQueue:由链表组成的无界阻塞队列。
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。

2、BlockingQueue核心方法

十一、ThreadPool线程池

1、为什么用线程池

线程池的优势:

线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果

线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用;控制最大并发数;管理线程。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。

第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定

性,使用线程池可以进行统一的分配,调优和监控。

2、架构说明

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。

3、编码实现

3.1、Executors.newFixedThreadPool(int)

执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
 
//newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的是LinkedBlockingQueue

3.2、Executors.newSingleThreadExecutor()

一个任务一个任务的执行,一池一线程

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
 
//newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue

3.3、Executors.newCachedThreadPool()

执行很多短期异步任务,线程池根据需要创建新线程。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
//newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

4、线程池几个重要参数(7大参数)

1、corePoolSize:线程池中的常驻核心线程数
2、maximumPoolSize:线程池中能够容纳同时 执行的最大线程数,此值必须大于等于1
3、keepAliveTime:多余的空闲线程的存活时间 当前池中线程数量超过corePoolSize时,当空闲时间 达到keepAliveTime时,多余线程会被销毁直到 只剩下corePoolSize个线程为止
4、unit:keepAliveTime的单位
5、workQueue:任务队列,被提交但尚未被执行的任务
6、threadFactory:表示生成线程池中工作线程的线程工厂, 用于创建线程,一般默认的即可
7、handler:拒绝策略,表示当队列满了,并且工作线程大于 等于线程池的最大线程数(maximumPoolSize)时如何来拒绝 请求执行的runnable的策略

5、线程池底层工作原理

1、在创建了线程池后,线程池中的线程数为零。(理解为懒加载)

2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:

2.1如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;

2.2如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;

2.3如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;(去运行才到达的任务,并不是去执行阻塞队列的任务。阻塞队列的任务需要其他线程空闲后重新唤醒)

2.4如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执

行。

3、当一个线程完成任务时,它会从队列中取下一个任务来执行。

4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

​ 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。

​ 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

6、线程池用哪个?生产中如设置合理参数

一个都不用,我们工作中只能使用自定义的

为什么不用?

实际开发中使用:ThreadPoolExecutor

6.1、线程池的拒绝策略

等待队列已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。

这个是时候我们就需要拒绝策略机制合理的处理这个问题。

JDK内置的拒绝策略

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不 会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中 尝试再次提交当前任务。

  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。 如果允许任务丢失,这是最好的一种策略。

7、手写线程池

/**
 * 线程池
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                //new ThreadPoolExecutor.AbortPolicy()
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );
        //10个顾客请求
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "	 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }  
}

十二、java8回顾

1、函数式接口

java.util.function

java内置核心四大函数式接口

实例

//R apply(T t);函数型接口,一个参数,一个返回值
Function<String,Integer> function = t ->{return t.length();};
System.out.println(function.apply("abcd"));

//boolean test(T t);断定型接口,一个参数,返回boolean
Predicate<String> predicate = t->{return t.startsWith("a");};
System.out.println(predicate.test("a"));

// void accept(T t);消费型接口,一个参数,没有返回值
Consumer<String> consumer = t->{
    System.out.println(t);
};
consumer.accept("javaXXXX");

//T get(); 供给型接口,无参数,有返回值
Supplier<String> supplier =()->{return UUID.randomUUID().toString();};
System.out.println(supplier.get());

2、Stream流

流(Stream) 到底是什么呢?
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。“集合讲的是数据,流讲的是计算!”类似linux 的管道符。

特点

Stream 自己不会存储元素
Stream 不会改变源对象。相反, 他们会返回一个持有结果的新Stream。
Stream 操作是延迟执行的。这意味着 他们会等到需要结果的时候才执行。

执行流程

创建一个Stream:一个数据源(数组、集合)
中间操作:一个中间操作,处理数据源数据
终止操作:一个终止操作,执行中间操作链,产生结果

代码示例

//Stream流写法,把list转化为stream流
//Stream<User>
/**问题:
获取id为2的用户
年龄大于24
用户名全部转为大写
按照从大到小排序
输出第一个
*/

class user{
    private Integer id;
    private Integer age;
    private String username;
    
    //setter getter....
}

list.stream().filter(p -> {
    return p.getId() % 2 == 0;
}).filter(p -> {
    return p.getAge() > 24;
}).map(f -> {
    return f.getUserName().toUpperCase();
}).sorted((o1, o2) -> {
    return o2.compareTo(o1);
}).limit(1).forEach(System.out::println);

十三、分支合并框架

1、原理

Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并

2、相关类

ForkJoinPool 类比=> 线程池

ForkJoinTask 类比=> FutureTask

RecursiveTask 递归任务:继承后可以实现递归(自己调自己)调用的任务

3、实例代码

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class MyTask extends RecursiveTask<Integer>{
    private static final Integer ADJUST_VALUE = 10;
    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if((end - begin)<=ADJUST_VALUE){
           for(int i =begin;i <= end;i++){
                result = result + i;
           }
        }else{
            int middle = (begin + end)/2;
            MyTask task01 = new MyTask(begin,middle);
            MyTask task02 = new MyTask(middle+1,end);
            task01.fork();
            task02.fork();
            result =  task01.join() + task02.join();
        }
        return result;
    }
}
/**
 * 分支合并例子
 * ForkJoinPool
 * ForkJoinTask
 * RecursiveTask
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        MyTask myTask = new MyTask(0,100);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);

        System.out.println(forkJoinTask.get());

        forkJoinPool.shutdown();
    }
}

十四、异步回调(略)

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        //同步,异步,异步回调

        //同步
//        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
//            System.out.println(Thread.currentThread().getName()+"	 completableFuture1");
//        });
//        completableFuture1.get();

        //异步回调
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"	 completableFuture2");
            int i = 10/0;
            return 1024;
        });

        completableFuture2.whenComplete((t,u)->{
            System.out.println("-------t="+t);
            System.out.println("-------u="+u);
        }).exceptionally(f->{
            System.out.println("-----exception:"+f.getMessage());
            return 444;
        }).get();

    }
}
原文地址:https://www.cnblogs.com/xiaolaodi1999/p/13773448.html