线程池

线程池的作用:
1、减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务;
2、可以根据系统的承受能力,调整线程池中工作线程的数据,防止因为消耗过多的内存导致服务器奔溃。
使用线程池,要根据系统的环境情况,手动或自动设置线程数目。少了系统运行效率不高,多了系统拥挤、占用内存多。用线程池控制数量,其他线程排队等候。一个任务执行完毕,再从队列中取最前面的任务开始执行。若任务重没有等待任务,线程池浙一资源处于等待。当一个新任务需要运行,如果线程池中有等待的工作线程,就可以开始运行,否则进入等待队列。
ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
    ThreadFactory threadFactory, RejectedExecutionHandler handler)

ThreadPoolExecutor类的另外三个构造器都是调用上面这个构造器进行初始化工作的。
int corePoolSize:核心池的大小。创建线程池后,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建线程池后,线程池钟的线程数为0,当有任务到来后就会创建一个线程去执行任务。
maximumPoolSize:池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务,maxmumPoolSize表示的就是workQueue满了,线程池中最多可以创建的线程数量。
keepAliveTime:表示线程没有任务执行时最多保持多久时间终止。默认情况下,只有当线程池中的数量大于corePoolSize时,才会起作用,直到线程池中的线程数不大于corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
unit:参数keepAliveTime的时间单位,有七种取值:

TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLSECONDS; //毫秒
TimeOut.MICROSECONDS; //微秒
TimeOut.NANOSECONDS; //纳秒

workQueue:一个阻塞队列,用来存储等待执行的任务。一般来说,这里的阻塞队列有一下几个选择:

ArrayBlockingQueue; //基于数组的先进先出队列,此队列创建时必须指定大小
LinkedBlockingQueue; //基于链表的先进先出队列,如果没有指定队列大小,默认Integer.MAX_VALUE
SynchronousQueue; //不会保存提交的任务,直接创建一个线程来执行新来的任务
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关

ThreadFactory:线程工厂,主要用来创建线程

class SimpleThreadFactory implements ThreadFactory {
   public Thread newThread(Runnable r) {
     return new Thread(r);
   }

handler:当拒绝处理任务时的策略,有四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认这种拒绝策略)。 
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

Executor
线程池的重点不是类怎么用,而是在合适的场景下使用合适的线程池,所谓合适的线程池就是ThreadPoolExecutor的构造方法传入不同的参数,构造出不同的线程池,以满足实际需求。
单线程线程池:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

示例:

//单线程线程池
/*	核心池大小--1 线程池容量--1 workqueue--LinkedBlockingQueue
 *     public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
 */
public class ThreadPoolExectorJava {
	public static void main(String[] args) {
		//注意:这里返回的是ExecutorService接口,不是ThreadPoolExecutor
		//从Executors得到的是ExecutorService或ScheduledExecutorService接口,而不是这两个接口的实现类
		ExecutorService tpe = Executors.newSingleThreadExecutor();
		tpe.execute(new ThreadPoolRunnable());
	}
}

单线程线程池运行的线程数是1,选择无界的LinkedBlockingQueue。不管传入多少任务都排序,前面一个任务执行完毕,再执行队列中的线程。这里第二个参数maximumPoolSize是没有意义的,因为maximumPoolSize描述的排队的任务多过workQueue的容量,线程池中对多只能容纳maximumPoolSize个任务,现在workQueue是无界的,也就是说排队的任务永远不会多过workQueue的容量,maximumPoolSize设置成多少都没有意义。
固定大小线程池:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

程序示例:

//固定大小线程池
public class ThreadPoolExectorJava {
	public static void main(String[] args) {
		ExecutorService tpe = Executors.newFixedThreadPool(3);
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
	}
}

固定大小的线程池和单线程的线程池差不多,无非是让线程池中能运行的线程手动制定了nThread。选择的LinkedBlockingQueue,所以maximumPoolSize是无意义的
无界线程池:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

程序示例:

//无界线程池
public class ThreadPoolExectorJava {
	public static void main(String[] args) {
		ExecutorService tpe = Executors.newCachedThreadPool();
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());
	}
}

无界线程池,就是不管多少任务提交进行,都直接运行。无界线程池采用SynchronousQueue,采用这个线程池就没有workQueue容量一说了,只要添加进去的线程就会被拿去用。无界线程池的线程数是没有上限的,所以以maximumPoolSize为主了,设置为一个近似无限大的Integer.MAX_VALUE。另,单线程线程池和固定大小线程池线程是不会自动回收的,也即是说保证提交进来的任务最后会被处理,但不保证什么时候处理。无界线程池是设置了回收时间的,由于corePoolSize为0,所以只要60秒没有被用到的线程都会被直接移除。
workQueue:
  workQueue就是排队策略。排队策略描述的是当前线程大于corePoolSize时,线程以什么样的方式等待被运行。
  排队有三种策略:直接提交、有界队列、无界队列。JDK使用了无界队列LinkedBlockingQueue作为workQueue而不是有界队列ArrayBlockingQueue,尽管后者可以对资源进行控制,但是有界队列相比无界队列有三个缺点:
  1、使用有界队列,corePoolSize、maximumPoolSize两个参数势必要根据实际场景不断调整以求达到一个最佳,这会给开发带来极大的麻烦,必须经过大量的性能测试。所以干脆用无界,永远添加到队列中,不会溢出,自然maximumPoolSize就没啥用了,只需要根据系统处理能出调整corePoolSize就可以;
  2、放置业务突刺,尤其是在Web应用中,某些时候突然大量请求的到来。使用无界队列,不管什么时候,至少保证所有任务能够被处理,有界的超过maximumPoolSize的任务就可能被丢弃掉。
  3、有界队列大小和maximumPoolSize也需要相互折中,这又是一个难搞的地方。
线程池容量的动态调整

setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小

线程池的关闭

shutdown(): 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但不再接受新任务
shutdownNow(): 立即终止线程池,并尝试打算正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务


线程工厂接口ThreadFactory

import java.util.concurrent.ThreadFactory;

public class ThreadFactorJava {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		//工厂类对象
		MyThreadFactory mtf = new MyThreadFactory();
		//线程类对象
		FactoryRunnable tpr = new FactoryRunnable();
		//创建线程
		Thread t = mtf.newThread(tpr);
		t.setName("线程一");
		Thread t2 = mtf.newThread(tpr);
		t2.setName("线程二");
		//执行线程
		t.start();
		t2.start();
	}

}
//创建类实现接口
class MyThreadFactory implements ThreadFactory{	
	public Thread newThread(Runnable r) {
		int i = 0;
		Thread  t = new Thread(r, "线程工厂创建" );
		return t;
	}	
}

//线程类
class FactoryRunnable implements Runnable{

	@Override
	public void run() {
		while(true) {
			System.out.println(Thread.currentThread().getName() + "执行");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}
	
}

线程池的使用

public class ThreadPoolRunnable implements Runnable{
	private static int i= 20;
	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true) {
			if(i > 0) {
				System.out.println(Thread.currentThread().getName() + "------" +(i--));
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}

}

public class ThreadPoolExectorJava {
	public static void main(String[] args) {
		//SynchronousQueue
		BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
		ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, workQueue);
		tpe.execute(new ThreadPoolRunnable());
		tpe.execute(new ThreadPoolRunnable());

	}
}

自定义线程池

package newJava.newRunnable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExectorJava {
	public static void main(String[] args) {
		// 注意:最大线程数 >= worjQueue + 核心线程数
		BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(3);
		ThreadPoolFactor tpf = new ThreadPoolFactor();
		// ThreadPoolFactor tpf = Executors.defaultThreadFactory();
		// 当线程装满等待列表,抛出异常
		// ThreadPoolExecutor.AbortPolicy handler = new
		// ThreadPoolExecutor.AbortPolicy();
		// 不抛出异常
		// RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
		RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
		ThreadPoolExecutor tpe = new ThreadPoolExecutor(3, 6, 1, TimeUnit.SECONDS, workQueue, tpf, handler);
		ThreadPoolRunnable tpr = new ThreadPoolRunnable();
		ThreadPoolRunnable2 tpr2 = new ThreadPoolRunnable2();
		// 设置了等待列表装满就抛出异常,这里会抛出异常
		// for(int i = 0; i < 10; i++) {
		// tpe.execute(tpf.newThread(tpr));
		// }
		// 这里不会抛出异常
		tpe.execute(tpf.newThread(tpr));
		tpe.execute(tpf.newThread(tpr));
		tpe.execute(tpf.newThread(tpr));
		tpe.execute(tpf.newThread(tpr));
		tpe.execute(tpf.newThread(tpr2));
		tpe.execute(tpf.newThread(tpr2));
		tpe.execute(tpf.newThread(tpr2));
		tpe.execute(tpf.newThread(tpr2));
		tpe.execute(tpf.newThread(tpr2));
		// 关闭线程
		tpe.shutdown();
	}
}

class ThreadPoolRunnable2 implements Runnable {
	private int i = 0;

	@Override
	public void run() {
		// TODO Auto-generated method stub
		b: while (true) {
			if (i < 5) {
				System.out.println("ThreadPoolRunnable2正在执行--------" + Thread.currentThread().getName());
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			} else {
				break b;
			}
		}

	}

}

原文地址:https://www.cnblogs.com/changzuidaerguai/p/9315800.html