谷粒商城分布式高级(十)—— 商城业务异步 & 线程池

一、线程回顾

1、初始化线程的4种方式



注意:
(1)我们以后在业务代码里面,以上三种启动线程的方式都不用。【将所有的多线程异步任务都交给线程池执行】

测试代码
package com.atguigu.gulimall.search.thread;

import java.util.concurrent.*;

public class ThreadTest {

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start....");
/**
* (1)继承Thread
* (2)实现Runnable接口
* (3)实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)
* (4)线程池
*/

/*Thread01 thread01 = new Thread01();
thread01.start();//启动线程*/

/*Runnable01 runnable01 = new Runnable01();
new Thread(runnable01).start();*/

/*FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();
//阻塞等待整个线程执行完成,获取返回结果
Integer integer = futureTask.get();
System.out.println("返回结果...."+integer);*/

//我们以后在业务代码里面,以上三种启动线程的方式都不用。【将所有的多线程异步任务都交给线程池执行】
//new Thread()->System.out.println("hello").start();

//当前系统中池只有一两个,每个异步任务,提交给线程池让他自己去执行
service.execute(new Runnable01());

System.out.println("main....end....");
}


/**
* 继承Thread
*/
public static class Thread01 extends Thread{
@Override
public void run(){
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:"+i);
}
}

/**
* 实现Runnable接口
*/
public static class Runnable01 implements Runnable{
@Override
public void run(){
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:"+i);
}
}

/**
* 实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)
*/
public static class Callable01 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:"+i);
return i;
}
}
}

2、线程池的七大参数

(1)corePoolSize:[5] —— 核心线程数【一直存在除非(allowCoreThreadTimeOut)】;线程池,创建好以后就准备就绪的线程数量,就等待来接受异步任务去执行。
           5 个 Thread thread = new Thread(); thread.start();
(2)maximumPoolSize —— 最大线程数量; 控制资源
(3)keepAliveTime —— 存活时间。当线程数大于核心线程数的时候,线程在最大多长时间没有接到新任务就会终止释放,最终线程池维持在 corePoolSize 大小
(4)unit —— 时间单位
(5)BlockingQueue<Runnable> workQueue —— 阻塞队列。用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize 大小,就会放在这里等待空闲线程执行。
                       只要有线程空闲,就会去队列里面取出新的任务继续执行。
(6)threadFactory —— 创建线程的工厂,比如指定线程名等
(7)handler —— 拒绝策略,如果线程满了,线程池就会使用拒绝策略

  
  运行流程:
  1、线程池创建,准备好 core 数量的核心线程,准备接受任务
  2、新的任务进来,用 core 准备好的空闲线程执行。
    (1)core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行
    (2)阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量
    (3)max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终保持到 core 大小
    (4)如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策略进行处理
  3、所有的线程创建都是由指定的 factory 创建的
 
  面试举例:
    一个线程池 core 7; max 20; queue 50; 100并发进来怎么分配的
    答:7个会立即得到执行,50个会进入队列,再开13个进行执行。剩下的30个就是用拒绝策略
      如果不想抛弃还要执行。CallerRunsPolicy;

3、常见的4种线程池

1)newCachedThreadPool
      —— 创建一个可缓存线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程。若无可回收,则新建线程
(2)
newFixedThreadPool
     —— 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
(3)newScheduledThreadPool
     —— 创建一个定长线程池,支持定时及周期性任务执行
(4)newSingleThreadExecutor
     —— 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行

4、开发中为什么使用线程池

(1)降低资源的消耗
    通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
(2)提高响应速度
    因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务 的状态,当任务来时无需创建新的线程就能执行
(3)提高线程的可管理性
    线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来 的系统开销。
    无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

二、CompletableFuture 异步编排

 

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。
(1)runAsync

 (2)supplyAsync

2、计算完成时回调方法

3、handle方法

4、线程串行化方法

  

5、两任务组合 —— 都要完成

(1)runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。

(2)thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。

 

(3)thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

6、两任务组合 —— 一个完成

(1)runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

(2)acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

(3)applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

7、多任务组合



(1)allOf:等待所有任务完成

  (2)anyoff:只要有有一个完成

原文地址:https://www.cnblogs.com/javahr/p/15743307.html