ForkJoin、并行流计算、串行流计算对比

ForkJoin


什么是 ForkJoin

ForkJoin 是一个把大任务拆分为多个小任务来分别计算的并行计算框架

ForkJoin 特点:工作窃取

这里面维护的都是双端队列,因此但其中一个线程完成自己的计算任务之后,可以从其他线程任务队列另一端“窃取”任务进行计算,从而提高计算效率!

ForkJoin 执行流程

伪代码:

if(任务数小){
	直接计算
}else{
	将问题划分为独立的部分
	分叉新的子任务来解决每个部分
	加入所有子任务进行计算
	将子结果进行合并
}

ForkJoinPool: 核心

ForkJoinTask:

RecursiveTask:递归任务


package juc.forkJoin;

import java.util.concurrent.RecursiveTask;

/*
求和计算的任务!
普通求和    ForkJoin    Stream并行流
如何使用ForkJoin
    1、ForkJoinPool 通过它来执行
    2、计算任务 ForkJoinPool.execute(ForkJoinTask task)
    3、ForkJoinTask 是一个接口,execute方法传入参数应为 ForkJoinTask 的子类 如 RecursiveTask

 */
public class ForkJoinDemo extends RecursiveTask<Long> {
    private Long start;//开始值
    private Long end;//结束值
    private Long temp=10000L;//阈值,用于区分是否用ForkJoin来进行划分

    public ForkJoinDemo(Long start,Long end){
        this.start=start;
        this.end=end;
    }

    @Override
    protected Long compute() {
        if ((end-start)<=temp){//小于等于阈值,则直接进行计算
            Long sum=0L;
            for (Long i = start; i <= end; i++) {
                sum+=i;
            }
            return sum;
        }else {//大于阈值使用ForkJoin进行划分
            //任务拆分点
            Long middle=(start+end)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork();
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork();

            return task1.join()+task2.join();

        }
    }
}

测试类:

package juc.forkJoin;

import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //计算结果:500000000500000000
        test1();
        test2();
        test3();
        test4();
    }
    //普通方法
    public static void test1(){
        Long startTime=System.currentTimeMillis();
        Long sum=0L;
        for (Long i = 1L; i <= 10_0000_0000L; i++) {
            sum+=i;
        }
        Long endTime=System.currentTimeMillis();
        System.out.println("计算结果:"+sum);
        System.out.println("普通方法耗时:"+(endTime-startTime));
    }
    //ForkJoin方法
    public static void test2() throws ExecutionException, InterruptedException {
        Long startTime=System.currentTimeMillis();

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
        ForkJoinTask<Long> submit = pool.submit(task);
        Long result = submit.get();

        Long endTime=System.currentTimeMillis();
        System.out.println("计算结果:"+result);
        System.out.println("ForkJoin方法耗时:"+(endTime-startTime));
    }
    //Stream并行流方法
    public static void test3(){
        Long startTime=System.currentTimeMillis();

        //Stream并行流       parallel()并行流     sequential()串行流
        OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(Long::sum);
        Long result = reduce.getAsLong();

        Long endTime=System.currentTimeMillis();
        System.out.println("计算结果:"+result);
        System.out.println("Stream并行流方法耗时:"+(endTime-startTime));
    }
    //Stream串行流方法
    public static void test4(){
        Long startTime=System.currentTimeMillis();

        OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).sequential().reduce(Long::sum);
        Long result = reduce.getAsLong();

        Long endTime=System.currentTimeMillis();
        System.out.println("计算结果:"+result);
        System.out.println("Stream串行流方法耗时:"+(endTime-startTime));
    }
}

运行结果:

原文地址:https://www.cnblogs.com/code-xu/p/14304065.html