java高级---->Thread之CompletionService的使用

  CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理。今天我们通过实例来学习一下CompletionService的用法。

CompletionService的简单使用

使用submit()方法执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果。

 一、CompletionService的submit方法

public class CompletionServiceTest {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CompletionService<String> completionService = new ExecutorCompletionService<String>(service);
        for (int i = 0; i < 5; i++) {
            completionService.submit(new ReturnAfterSleepCallable(i));
        }
        System.out.println("after submit");
        for (int i = 0; i < 5; i++) {
            System.out.println("result: " + completionService.take().get()); // 这个方法是阻塞的
        }
        System.out.println("after get");
        service.shutdown();
    }

    private static class ReturnAfterSleepCallable implements Callable<String> {
        int sleep;

        public ReturnAfterSleepCallable(int sleep) {
            this.sleep = sleep;
        }

        @Override
        public String call() throws Exception {
            TimeUnit.SECONDS.sleep(sleep);
            return System.currentTimeMillis() + ",sleep=" + String.valueOf(sleep);
        }
    }
}

运行的结果如下:

after submit
result: 1501052486631,sleep=0
result: 1501052487632,sleep=1
result: 1501052488632,sleep=2
result: 1501052489632,sleep=3
result: 1501052490633,sleep=4
after get

 官方文档上的说明:

Submits a value-returning task for execution and returns a Future representing the pending results of the task. Upon completion, this task may be taken or polled. 

二、使用CompletionService的take方法

take()方法取得最先完成任务的Future对象,谁执行时间最短谁最先返回。

package com.linux.thread;

import java.util.Random;
import java.util.concurrent.*;

public class RunMain1 {
    public static void main(String[] args) {
        try {
            ExecutorService executorService = Executors.newCachedThreadPool();
            ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
            for (int i = 0; i < 10; i++) {
                completionService.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        int sleepValue = new Random().nextInt(5);
                        System.out.println("sleep = " + sleepValue + ", name: " + Thread.currentThread().getName());
                        TimeUnit.SECONDS.sleep(sleepValue);
                        return "huhx: " + sleepValue + ", " + Thread.currentThread().getName();
                    }
                });
            }
            for (int i = 0; i < 10; i++) {
                System.out.println(completionService.take().get());
            }
            executorService.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

一次的运行结果如下:

sleep = 4, name: pool-1-thread-1
sleep = 0, name: pool-1-thread-2
sleep = 0, name: pool-1-thread-3
huhx: 0, pool-1-thread-3
huhx: 0, pool-1-thread-2
sleep = 1, name: pool-1-thread-4
sleep = 4, name: pool-1-thread-5
sleep = 3, name: pool-1-thread-6
sleep = 4, name: pool-1-thread-7
sleep = 0, name: pool-1-thread-8
huhx: 0, pool-1-thread-8
sleep = 4, name: pool-1-thread-9
sleep = 3, name: pool-1-thread-10
huhx: 1, pool-1-thread-4
huhx: 3, pool-1-thread-6
huhx: 3, pool-1-thread-10
huhx: 4, pool-1-thread-1
huhx: 4, pool-1-thread-5
huhx: 4, pool-1-thread-7
huhx: 4, pool-1-thread-9

官方文档上的说明:

Retrieves and removes the Future representing the next completed task, waiting if none are yet present. 

三、使用CompletionService的poll方法

方法poll的作用是获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,方法poll是无阻塞的。

package com.linux.thread;

import java.util.concurrent.*;

public class RunMain2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<String> service = new ExecutorCompletionService<String>(executorService);
        service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("3 seconds pass.");
                return "3秒";
            }
        });
        System.out.println(service.poll());
        executorService.shutdown();
    }
}

运行的结果如下:

null
3 seconds pass.

 官方文档上的说明:

Retrieves and removes the Future representing the next completed task or null if none are present. 

友情链接

原文地址:https://www.cnblogs.com/huhx/p/baseusejavaCompletionService.html