ForkJoin(工作窃取)初步使用,计算偶数集合

ForkJoinPool

Java7引入

实现了ExecutorService接口,所以它也是一种线程池,做的工作就是,把一个任务拆分成若干个小任务执行,然后再把小任务执行的结果汇总。

两大核心就是分而治之(Divide and conquer)和工作窃取(Work Stealing)算法

虽说了ForkJoinPool会把大任务拆分成多个子任务,但是ForkJoinPool并不会为每个子任务创建单独的线程。相反,池中每个线程都有自己的双端队列(Deque)用于存储任务。这个双端队列对于工作窃取算法至关重要。

Work Stealing算法是Fork/Join框架的核心思想:

每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。
队列支持三个功能push、pop、poll
push/pop只能被队列所持有的线程调用,而poll可以被其他线程调用。
划分的子任务调用fork时,都会被push到自己的队列中。
默认情况下,工作线程从自己的双端队列获出任务并执行。
当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。
public class ForkJoinWorkerThread extends Thread {
    final ForkJoinPool pool;                // 工作线程所在的线程池
    final ForkJoinPool.WorkQueue workQueue; // 线程的工作队列(这个双端队列是work-stealing机制的核心)
    ...
}

ForkJoinPool静态块中加载的commonPool,使用时ForkJoinPool.commonPool().submit(...)

 private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        //默认先进后出工作模式 【用于控制WorkQueue的工作模式】
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

ForkJoinTask有以下三个核心方法:

  • fork():在任务执行过程中将大任务划分为多个小的子任务,调用子任务的fork()方法可以将任务放到线程池中异步调度。

  • join():调用子任务的join()方法等待任务返回的结果。这个方法类似于Thread.join(),区别在于前者不受线程中断机制的影响。

如果子任务中有运行时异常,join()会抛出异常,quietlyJoin()方法不会抛出异常也不会返回结果,需要你调用getException()或getRawResult()自己去处理异常和结果。

  • invoke():在当前线程同步执行该任务。该方法也不受中断机制影响。

如果子任务中有运行时异常,invoke()会抛出异常,quietlyInvoke()方法不会抛出异常也不会返回结果,需要你调用getException()或getRawResult()自己去处理异常和结果。

实例:给定整数数组,返回偶数数组集合

通常我们不会直接使用ForkJoinTask,而是使用它的两个抽象子类:
RecursiveAction:没有返回值的任务
RecursiveTask:有返回值的任务
大多数情况下,我们都是直接提交ForkJoinTask对象到ForkJoinPool中。

 public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);//定义并行级别,也就是线程数为2,默认Runtime.getRuntime().availableProcessors()
        List<Integer> integers = Stream.iterate(1,x->x.intValue()+1).limit(10).collect(Collectors.toList());
        System.out.println(1+" "+ integers.size()+" "+ integers);
        MyRecursiveTask myRecursiveTask = new MyRecursiveTask(1, integers.size(), integers);
        ForkJoinTask<List<Integer>> forkJoinTask = forkJoinPool.submit(myRecursiveTask);
        try {
            List<Integer> result = forkJoinTask.get();
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        //myRecursiveTask.getException().printStackTrace();
        forkJoinPool.shutdown();
    }

    @Data
    @AllArgsConstructor
    static class MyRecursiveTask extends RecursiveTask<List<Integer>>{

        private int start;
        private int end;
        private List<Integer> integers;

        @Override
        protected List<Integer> compute() {
            if ((end - start)<=2) {//这里处理的阈值是2,根据任务数据衡量
                //判断是否是偶数
                System.out.println("======"+start+"===="+end+"===");
                List<Integer> integers = this.integers.subList(start-1, end);
                System.out.println(integers);
                return integers.stream().filter(i -> i%2 == 0).collect(Collectors.toList());
            } else {
                int middle = (start + end) / 2;
                MyRecursiveTask taskLeft = new MyRecursiveTask(start, middle, integers);
                MyRecursiveTask taskRight = new MyRecursiveTask( middle + 1, end,integers);
                taskLeft.fork();
                taskRight.fork();
                ArrayList<Integer> result = new ArrayList<>();
                List<Integer> leftJoin = taskLeft.join();
                List<Integer> rightJoin = taskRight.join();
                result.addAll(leftJoin);
                result.addAll(rightJoin);
                return result;
            }

        }
    }

结果:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[2, 4, 6, 8, 10]

详细参考
https://blog.csdn.net/Holmofy/article/details/82714665

原文地址:https://www.cnblogs.com/brxHqs/p/13639396.html