ForkJoin

累加器--RecursiveTask有返回值

package com.dwz.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;
/**
 *    累加器--RecursiveTask有返回值
 */
public class ForkJoinRecursiveTask {
    
    private final static int MAX_THRESHOLD = 3;
    
    private static class CalculatedRecursiveTask extends RecursiveTask<Integer> {
        private final int start;
        
        private final int end;
        
        public CalculatedRecursiveTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            if(end - start <= MAX_THRESHOLD) {
                return IntStream.rangeClosed(start, end).sum();
            } else {
                int middle = (start + end) / 2;
                CalculatedRecursiveTask leftTask = new CalculatedRecursiveTask(start, middle);
                CalculatedRecursiveTask rightTask = new CalculatedRecursiveTask(middle + 1, end);
                leftTask.fork();
                rightTask.fork();
                
                return leftTask.join() + rightTask.join();
            }
        }
        
        public static void main(String[] args) {
            final ForkJoinPool     forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculatedRecursiveTask(0, 12));
            try {
                Integer result = future.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

累加器--RecursiveAction无返回值

package com.dwz.forkjoin;

import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/**
 *    累加器--RecursiveAction无返回值
 */
public class ForkJoinRecursiveAction {
    private final static int MAX_THRESHOLD = 3;
    private final static AtomicInteger SUM = new AtomicInteger(0);
    
    private static class CalculateRecursiveAction extends RecursiveAction {
        private final int start;
        private final int end;
        
        private CalculateRecursiveAction(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            if((start - end) <= MAX_THRESHOLD) {
                SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
            } else {
                int middle = (start + end)/2;
                CalculateRecursiveAction leftAction = new CalculateRecursiveAction(start, middle);
                CalculateRecursiveAction rightAction = new CalculateRecursiveAction(middle + 1, end);
                leftAction.fork();
                rightAction.fork();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.submit(new CalculateRecursiveAction(0, 10));
        forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
        Optional.of(SUM).ifPresent(System.out::println);
    }
}
原文地址:https://www.cnblogs.com/zheaven/p/13398208.html