对于一个web server来说,简单的单线程程序或者每任务每线程的程序都有自身的局限性或缺点。而线程池为线程管理提供了一个良好的实现。在java1.5后,java.util.concurrent提供了一个灵活的线程池实现,而其也是作为Executor框架的一部分。在现在的java类库中,任务执行的首要选择不是Thread,而是Executor。
Executor接口###
public interface Executor {
void execute(Runnable command);
}
虽然Executor只是一个简单的接口,但却为整个框架创造了基础。这个框架可以用于异步任务执行,而且支持多类型的任务执行策略。且为任务提交和任务执行之间的解耦提供了标准的方法。
Executor基于生产者-消费者模式。提交任务的执行者是生产者,执行任务的线程是消费者。
执行策略###
将任务的提交与任务的执行体进行解耦,让我们可以简单地为一个类给定的任务制定执行策略。一个执行策略指明任务执行的“what,where,when,how”几个因素。
- 任务在什么(what)线程中执行
- 任务以什么(what)顺序执行
- 可以有多少个(how many)任务并发执行
- 可以有多少个(how many)任务进入等待执行队列
- 如果系统过载,需要放弃任务,应该挑选哪一个(which)任务?以及如何如何(how)通知应用程序知道?
- 在一个任务的执行前后,应该做什么(what)处理
任务执行的最佳策略取决于可用的计算资源和你对服务质量的需求。
使用Executors创建一个使用线程池的Web Server###
public class TaskExecutorWebServer {
private static int threadNum = 100;
private static Executor executor =
Executors.newFixedThreadPool(threadNum);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket connection = socket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
//handleRequest(connection);
}
};
executor.execute(task);
}
}
}
线程池###
线程池的好处:
- 重用存在的线程,而不是创建新的线程
- 在请求到达时,工作者线程通常已经存在,不需要重新创建线程,减少等待时间
通过适当地调整线程池的大小,你可以得到足够多的线程以保持处理器忙碌,同时可以防止过多的线程相互竞争资源,导致应用程序耗尽内存或者失败。
目前类库中提供了一个工具类Executors通过静态工厂方法来创建一个线程池。
- newFixedThreadPool创建一个定长的线程池,每提交一个任务就创建一个线程,直到达到池的最大长度。
- newCachedThreadPool创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要时,可以灵活地回收空闲的线程。池的长度没有限制。
- newSingleThreadExecutor创建一个单线程化的executor,只创建唯一的工作者线程执行任务,如果此线程异常结束,会有另一个取代它。
- newScheduledThreadPool创建一个定长的线程池,支持定时和周期性任务执行。
线程的生命周期###
由于Executor是异步执行任务,之前提交的任务状态都不是立即可见的,因而当我们需要取消或者停止线程时,我们就需要对其的生命周期进行管理。
ExecutorService接口扩展了Executor,添加了一些用于生命周期管理的方法(同时还有一些任务提交的便利方法)
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService暗示了生命周期有3种状态:运行(running)、关闭(shutting down)和终止(terminated)。
shutdown方法会启动一个平缓的关闭过程,停止接受新的任务,同时等待已经提交的任务完成,包括尚未开始执行的任务。
shutdownNow方法会启动一个强制的关闭过程,尝试取消所有运行中的任务和排在队列中尚未开始的任务。
调用awaitTermination等待ExecutorService到达终止状态,也可以轮询检查isTermianted判断ExecutorService是否已经终止。
可携带结果的任务:Callable和Future###
很多任务都会引起严重的计算延迟,比如执行数据库查询,从网络获取资源。对于这些任务,Callable是更佳的抽象,它调用call方法执行任务,然后等待返回值,并为可能抛出的异常预先做好了准备。
public interface Callable<V> {
V call() throws Exception;
}
Future描述了任务的生命周期,并提供相关方法来获得任务的结果、取消任务以及检验任务是否已经完成还是被取消。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
任务的状态决定了get方法的行为。如果任务已完成,get会立即返回,或者抛出一个Exception,如果任务没有完成,get会阻塞到它完成。如果任务抛出了异常,get会将该异常封装成ExecutionException,然后重新抛出;如果任务被取消,get会抛出CancellationException。当抛出了ExecutionException时,可以用getCause重新获得被封装的原始异常。
创建一个描述任务的Future有两种方法:
- ExecutorService中的所有submit方法返回一个Future
- 显示地为给定的Runnable或Callable实例化一个FutureTask,将它提交给Executor来执行或直接调用run方法
将Runnable或Callable提交给Executor,从提交线程暴露到最终执行任务的线程的过程是线程安全的。类似地,设置Future结果值的行为,也可以建立一个安全发布,以保证这个结果从计算它的线程暴露到通过get重获它的线程的过程是线程安全的。
CompletionService###
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似队列中的take和poll方法,在结果完整可用时获得这个结果。