Executors、ExecutorService、ThreadPoolExecutor

创建ExecutorService的四种方式:

使用Executors.newCachedThreadPool()的方式来创建ExecutorService

    /*
     * 线程池的生命周期很短,60秒就结束了
     * These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     */
    private static void userCachedThreadPool() throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        System.err.println(((ThreadPoolExecutor)executorService).getActiveCount());
        
        executorService.execute(() -> System.out.println("==============="));
        System.err.println(((ThreadPoolExecutor)executorService).getActiveCount());
        
        IntStream.range(0, 100).boxed().forEach(i -> executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " [" + i + "]");
        }));
        
        TimeUnit.SECONDS.sleep(1);
        System.err.println(((ThreadPoolExecutor)executorService).getActiveCount());
    }

使用Executors.newFixedThreadPool(10)的方式来创建ExecutorService

    /*
     * 始终有n个线程
     * new ThreadPoolExecutor(nThreads, nThreads,
     * 0L, TimeUnit.MILLISECONDS,
     * new LinkedBlockingQueue<Runnable>());
     * 
     * newFixedThreadPool()不会自动结束,需要显示的调用shutdown()
     */
    private static void useFixedSizePool() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        IntStream.range(0, 100).boxed().forEach(i -> executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " [" + i + "]");
        }));
        
        TimeUnit.SECONDS.sleep(1);
        System.err.println(((ThreadPoolExecutor)executorService).getActiveCount());
    }

使用Executors.newSingleThreadExecutor();的方式来创建ExecutorService

    /*
     * SingleThreadExecutor difference between one Thread
     * 1.Thread will die after finished work, but SingleThreadExecutor can always alive.
     * 2.Thread can not put the submitted runnable to the cache queue but SingleThreadExecutor can do this.
     * 
     * new FinalizableDelegatedExecutorService
     *       (new ThreadPoolExecutor(1, 1,
     *       0L, TimeUnit.MILLISECONDS,
     *       new LinkedBlockingQueue<Runnable>()));
     */
    private static void useSinglePool() throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        IntStream.range(0, 100).boxed().forEach(i -> executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " [" + i + "]");
        }));
        
        TimeUnit.SECONDS.sleep(1);
    }

注意:

 Executors.newSingleThreadExecutor()不能强制转换成ThreadPoolExecutor,因为它是通过new FinalizableDelegatedExecutorService实现的

使用Executors.newWorkStealingPool()的方式来创建ExecutorService

package com.dwz.executors;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
 * Executors.newWorkStealingPool()
 * 任务窃取
 * dos命令输入dxdiag可以查询电脑的相关详情信息
 */
public class ExecutorsExample2 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newWorkStealingPool();
        //获取当前cpu的数量
        Optional.of(Runtime.getRuntime().availableProcessors()).ifPresent(System.out::println);
        
        List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(i -> 
            (Callable<String>) () -> {
                System.out.println("Thread -> " + Thread.currentThread().getName());
                sleep(2);
                return "Task-" + i;
            }
        ).collect(Collectors.toList());
        
        //结果不会立刻返回,只是不阻塞任务,继续执行之后的逻辑
//        List<Future<String>> futures = executorService.invokeAll(callableList);
        executorService.invokeAll(callableList).stream().map(future -> {
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException();
            } 
        }).forEach(System.out::println);
    }
    
    private static void sleep(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
原文地址:https://www.cnblogs.com/zheaven/p/13444495.html