线程池2



ExecutorCompletionService有三个成员变量:


  executor:执行task的线程池,创建CompletionService必须指定;


  aes:主要用于创建待执行task;


  completionQueue:存储已完成状态的task,默认是基于链表结构的阻塞队列LinkedBlockingQueue。



  • take在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,等待,直到存在这样的task;

  • poll在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,不等待,直接返回null。


ExecutorCompletionService

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion.
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}
实例:
package com._ThreadPool._2;

import java.math.BigInteger;
import java.util.concurrent.*;

public class MyCompletionService {

    public static void main(String[] args) {

        //创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        //创建一个CompletionService
        CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService);

        for(int i = 1 ; i < 9 ; i ++){
            long start = i * 1000000;
            long end = start + 1000000;
            completionService.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    System.out.println("开始计算" + start + "" + end);
                    long sum = 0;
                    for(long j = start ; j <=end ; j++){
                        sum += j;
                    }
                    System.out.println(start + "" + end + "结果是" + sum);
                    return sum;
                }
            });


        }
        long result = 0;
        for (int i = 1 ; i < 9 ;i ++){
            try {
                result += completionService.take().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        System.out.println(result);

    }
}

结果:

开始计算3000000到4000000
开始计算2000000到3000000
开始计算1000000到2000000
开始计算5000000到6000000
开始计算4000000到5000000
3000000到4000000结果是3500003500000
4000000到5000000结果是4500004500000
2000000到3000000结果是2500002500000
开始计算6000000到7000000
5000000到6000000结果是5500005500000
开始计算7000000到8000000
1000000到2000000结果是1500001500000
开始计算8000000到9000000
7000000到8000000结果是7500007500000
8000000到9000000结果是8500008500000
6000000到7000000结果是6500006500000
40000040000000
 
原文地址:https://www.cnblogs.com/da-peng/p/9822014.html