多线程批量执行等待全部结果

来自:http://blog.csdn.net/wxwzy738/article/details/8497853

       http://blog.csdn.net/cutesource/article/details/6061229

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class Test17 {
    public static void main(String[] args) throws Exception {
        Test17 t = new Test17();
        t.count1();
        t.count2();
    }
//使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理
    public void count1() throws Exception{
        ExecutorService exec = Executors.newCachedThreadPool();
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
        for(int i=0; i<10; i++){
            Future<Integer> future =exec.submit(getTask());
            queue.add(future);
        }
        int sum = 0;
        int queueSize = queue.size();
        for(int i=0; i<queueSize; i++){
            sum += queue.take().get();
        }
        System.out.println("总数为:"+sum);
        exec.shutdown();
    }
//使用CompletionService(完成服务)保持Executor处理的结果
    public void count2() throws InterruptedException, ExecutionException{
        ExecutorService exec = Executors.newCachedThreadPool();
        CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
        for(int i=0; i<10; i++){
            execcomp.submit(getTask());
        }
        int sum = 0;
        for(int i=0; i<10; i++){
//检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
            Future<Integer> future = execcomp.take();
            sum += future.get();
        }
        System.out.println("总数为:"+sum);
        exec.shutdown();
    }
    //得到一个任务
    public Callable<Integer> getTask(){
        final Random rand = new Random();
        Callable<Integer> task = new Callable<Integer>(){
            @Override
            public Integer call() throws Exception {
                int i = rand.nextInt(10);
                int j = rand.nextInt(10);
                int sum = i*j;
                System.out.print(sum+"	");
                return sum;
            }
        };
        return task;
    }
    /**
     * 执行结果:
        6    6    14    40    40    0    4    7    0    0    总数为:106
        12    6    12    54    81    18    14    35    45    35    总数为:312
     */
}

先看一下新建一个ThreadPoolExecutor的构建参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

看这个参数很容易让人以为是线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果 还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。

但实际情况不是这样,具体流程如下:

1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

内部结构如下所示:

从中可以发现ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。

其实可以通过Executes来学学几种特殊的ThreadPoolExecutor是如何构建的。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool就是一个固定大小的ThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool比较适合没有固定大小并且比较快速就能完成的小任务,没必要维持一个Pool,这比直接new Thread来处理的好处是能在60秒内重用已创建的线程。

其他类型的ThreadPool看看构建参数再结合上面所说的特性就大致知道它的特性

原文地址:https://www.cnblogs.com/sunxucool/p/4562842.html