关于Java线程池(二)

一.为什么要用线程池

(1)减少资源的开销 ;

(2)减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

(3)提高响应速度 ,每次请求到来时,由于线程的创建已经完成,故可以直接执行任务,因此提高了响应速度。

提高线程的可管理性 ,线程是一种稀缺资源,若不加以限制,不仅会占用大量资源,而且会影响系统的稳定性。 

因此,线程池可以对线程的创建与停止、线程数量等等因素加以控制,使得线程在一种可控的范围内运行,不仅能保证系统稳定运行,而且方便性能调优。

线程池作用:

限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;

少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。

一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。

当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

使用线程池的理由是减少并发线程的数量。

二.Executor框架中的所有类可以分成三类:

(1)任务:任务有两种类型:Runnable和Callable。

(2)任务执行器: 

Executor框架最核心的接口是Executor,它表示任务的执行器。 

Executor的子接口为ExecutorService。 

ExecutorService有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。

(3)执行结果: 

Future接口表示异步的执行结果,它的实现类为FutureTask。

ExecutorService接口:

提供了管理终止线程池的方法。线程池的创建都是工厂方法。不要直接去new线程池,因为线程池的创建还要做很多的准备工作。

Executors.newCachedThreadPool():可根据任务需要动态创建线程,来执行任务。若线程池中有空闲的线程将重用该线程来执行任务。没有空闲的则创建新线程来完成任务。

理论上池子里可以放int最大值个线程。

Executors.newFixedThreadPool(int threads):创建固定大小的线程池。池中的线程数是固定的。若所有线程处于饱和状态,新任务将排队等待。

Executors.newScheduledThreadPool():创建具有延迟效果的线程池。可将带运行的任务延迟指定时长后在运行。

Executors.newSingleThreadExecutor():创建单线程的线程池。池中仅有一个线程。所有未运行任务排队等待。

不可变的:这个类的实例是不变的,所以,不需要外部的同步,这样的例子包括String,Long和BigInteger。

无条件的线程安全:这个类的实例是可变的,但是这个类有着足够的内部同步,所以,它的实例可以被并发使用,无需任何外部同步。其例子包括Random和ConcurrentHashMap。

非线程安全:这个类的实例是可变的。为了并发使用它们,客户必须利用自己选择的外部同步包围每个方法调用。

这样的例子包括通用的集合实现,例如ArrayList和HashMap。

ScheduledExecutorService:能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

ThreadPoolExecutor:ExecutorService的默认实现。

ScheduledThreadPoolExecutor:继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

Executor:基于生产者—消费者,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程相当于消费者(执行完这些工作单元)

Executor执行的任务有4个生命周期阶段:创建,提交,开始和完成。

ExecutorService接口:生命周期,运行,关闭(shutdown)和已终止。

Executor框架使用Runnable作为其基本的任务表示形式。

三.5种线程池,4种拒绝策略,3种阻塞队列

1、FixedThreadPool 定长线程池:

它是一种固定大小的线程池;

corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;

keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;

阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;

由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;

由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

2、CachedThreadPool 可缓存线程池:

它是一个可以无限扩大的线程池;

它比较适合处理执行时间比较小的任务;

corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;

keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;

采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

3、SingleThreadExecutor  单一线程池:

它只会创建一条工作线程处理任务;

采用的阻塞队列为LinkedBlockingQueue;

4、ScheduledThreadPool  可调度的线程池

例子:

三种阻塞队列:

BlockingQueue<Runnable> workQueue = null;

workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出队列,有界

workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出队列,无界

workQueue = new SynchronousQueue<>();//无缓冲的等待队列,无界

四种拒绝策略:

RejectedExecutionHandler rejected = null;

rejected = new ThreadPoolExecutor.AbortPolicy();//默认,队列满了丢任务抛出异常

rejected = new ThreadPoolExecutor.DiscardPolicy();//队列满了丢任务不异常

rejected = new ThreadPoolExecutor.DiscardOldestPolicy();//将最早进入队列的任务删,之后再尝试加入队列

rejected = new ThreadPoolExecutor.CallerRunsPolicy();//如果添加到线程池失败,那么主线程会自己去执行该任务

五种线程池:

ExecutorService threadPool = null;

threadPool = Executors.newCachedThreadPool();//有缓冲的线程池,线程数 JVM 控制

threadPool = Executors.newFixedThreadPool(3);//固定大小的线程池

threadPool = Executors.newScheduledThreadPool(2);

threadPool = Executors.newSingleThreadExecutor();//单线程的线程池,只有一个线程在工作

threadPool = new ThreadPoolExecutor();//默认线程池,可控制参数比较多   

public static void main (String[] args) throws Exception {
    testThreadPoolExecutor();
}

public static void testThreadPoolExecutor() throws Exception {
    //基础参数
    int corePoolSize=2;//最小活跃线程数
    int maximumPoolSize=5;//最大活跃线程数
    int keepAliveTime=5;//指定线程池中线程空闲超过 5s 后将被回收
    TimeUnit unit = TimeUnit.SECONDS;//keepAliveTime 单位

    //阻塞队列
    BlockingQueue<Runnable> workQueue = null;
    workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出队列,有界
    workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出队列,无界
    workQueue = new SynchronousQueue<>();//无缓冲的等待队列,无界

    //拒绝策略
    RejectedExecutionHandler rejected = null;
    rejected = new ThreadPoolExecutor.AbortPolicy();//默认,队列满了丢任务抛出异常
    rejected = new ThreadPoolExecutor.DiscardPolicy();//队列满了丢任务不异常
    rejected = new ThreadPoolExecutor.DiscardOldestPolicy();//将最早进入队列的任务删,之后再尝试加入队列
    rejected = new ThreadPoolExecutor.CallerRunsPolicy();//如果添加到线程池失败,那么主线程会自己去执行该任务

    //使用的线程池
    ExecutorService threadPool = null;
    threadPool = Executors.newCachedThreadPool();//有缓冲的线程池,线程数 JVM 控制
    threadPool = Executors.newFixedThreadPool(3);//固定大小的线程池
    threadPool = Executors.newScheduledThreadPool(2);
    threadPool = Executors.newSingleThreadExecutor();//单线程的线程池,只有一个线程在工作
    threadPool = new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            unit,
            workQueue,
            rejected);//默认线程池,可控制参数比较多
    //执行无返回值线程
    TaskRunnable taskRunnable = new TaskRunnable();
    threadPool.execute(taskRunnable);
    List<Future<String>> futres = new ArrayList<>();
    for(int i=0;i<10;i++) {
        //执行有返回值线程
        TaskCallable taskCallable = new TaskCallable(i);
        Future<String> future = threadPool.submit(taskCallable);
        futres.add(future);
    }
    for(int i=0;i<futres.size();i++){
        String result = futres.get(i).get();
        System.out.println(i+" result = "+result);
    }
}

/**
 * 返回值的线程,使用 threadpool.execut() 执行
 */
public static class TaskRunnable implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " runnable result!");
    }
}

/**
 * 有返回值的线程,使用 threadpool.submit() 执行
 */
public static class TaskCallable implements Callable<String>{
    public TaskCallable(int index){
        this.i=index;
    }
    private int i;
    @Override
    public String call() throws Exception {
        int r = new Random().nextInt(5);
        try {
            Thread.sleep(r);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //System.out.println("callable result!");
        return Thread.currentThread().getName()+" callable index="+i +",sleep="+r;
    }
}

四. 线程池的处理流程

 

1、一个线程从被提交(submit)到执行共经历以下流程:

(1)线程池判断核心线程池里是的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下一个流程;

(2)线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的任务储存在这个工作队列里。如果工作队列满了,则进入下一个流程。

(3)线程池判断其内部线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已满了,则交给饱和策略来处理这个任务。

2、线程池在执行execute方法时,主要有以下四种情况:

(1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)

(2)如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue

(3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)

(4)如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

(5)线程池采取上述的流程进行设计是为了减少获取全局锁的次数。在线程池完成预热(当前运行的线程数大于或等于corePoolSize)之后,几乎所有的excute方法调用都执行步骤2。

五.关于死锁

当一个对象被一个线程修改的时候,可以阻止另一个线程观察到对象内部不一致的状态。

对象被创建的时候处于一致的状态,当有方法访问它的时候,它被锁定了,这些方法观察到对象的状态,并且可能会引起状态转变,即把对象从一种一致的状态转换到另一种一致状态。

正确地使用同步可以保证没有任何方法会看到对象处于不一致的状态中,

还可以保证进入同步方法或者同步代码块的每个线程,都看到由同一个锁保护的之前所有的修改效果。

当多个线程共享可变数据的时候,每个读或者写数据的线程都必须执行同步。

如果没有同步,就无法保证一个线程所做的修改可以被另一个线程获知。

为了避免死锁和数据破坏,千万不要从同步区域内部调用外来方法。

闭锁是一种同步工具类。CountDownLatch,FutureTask也可以用做闭锁。

栅栏:它能阻塞一组线程直到某个事件发生。

栅栏与闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。

闭锁用于等到事件,而栅栏用于等待其他线程。

(1)可变状态是至关重要的,所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程安全性。

(2)不可变对象一定是线程安全的;

(3)用锁来保护每个可变变量;

(4)当保护同一个不变性条件中的所有变量时,要使用同一个锁。

(5)如果从多个线程中访问一个可变变量时没有同步机制,那么程序会出现问题。

围绕任务执行来设计应用程序。

Java中没有一种安全的抢占方法来停止线程,只有一些协作的机制。

线程死锁是两个以上的线程互相都要求对方已经占有的资源而导致无法继续运行。

死锁必须满足4个条件:

(1)互斥条件。线程使用的资源必须至少有一个是不能共享的。

(2)请求与保持条件。至少有一个线程必须持有一个资源并且正在等待获取一个当前被其他线程持有的资源。

(3)非剥夺条件。分配的资源不能从相应的线程中被强制剥夺。

(4)循环等待条件。第一个线程等待其他线程,后者又在等待第一个线程。

原文地址:https://www.cnblogs.com/ZJOE80/p/12562201.html