多线程-java.util.concurrent-ForkJoinPool

     Java7引入了ForkJoinPool框架。这个框架的主要应用场景是把大任务拆解成小的任务并行执行。先看看ForkJoinPool的类结构和RecursiveTask的类结构:

        

     再看一下API文档

     我们主要关注文档中的几个方法,也是我们经常会用到的几个方法。(ps:ForkJoinPool适用于CPU计算密集型的任务)

     execute()、invoke()、submit()  这三个方法都是执行任务的方法。其中,要想获取返回值,我们就调用invoke()和submit()方法。

     我们先看一个例子,看看ForkJoinPool是怎么使用的。我们有一个业务场景,并发采集用户信息和配偶信息。先写一个类,这个类要继承RecursiveTask。然后实例化之后才能交给ForkJoinPool去执行。     

class TaskTest extends RecursiveTask<ResultBean> {
    @Override
    protected ResultBean compute() {

        RecursiveTask<String> userInfoTask = new RecursiveTask<String>() {
            @Override
            protected String compute() {
                System.out.println("开始采集用户信息");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //模拟获取用户信息
                return "{'username':'zs', 'gender': 'men'}";
            }
        };

        RecursiveTask<String> spouseInfoTask = new RecursiveTask<String>() {
            @Override
            protected String compute() {
                System.out.println("开始采集配偶信息");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "{'spouseName':'ch', 'gender':'women'}";
            }
        };
//执行任务用invokeAll()或者fork()
invokeAll(userInfoTask, spouseInfoTask);
//userInfoTask.fork();
//spouseInfoTask.fork(); String userInfoJoin
= userInfoTask.join(); String spouseInfoJoin = spouseInfoTask.join(); return new ResultBean(userInfoJoin, spouseInfoJoin); } }

 fork()方法,开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。

 join()方法,等待该任务线程处理完毕,获取返回结果。

ResultBean类。省略的setter和getter方法。

class ResultBean {
    /** 用户信息 */
    private String userInfo;
    /** 配偶信息 */
    private String spouseInfo;

    public ResultBean() {
    }

    public ResultBean(String userInfo, String spouseInfo) {
        this.userInfo = userInfo;
        this.spouseInfo = spouseInfo;
    }
}

 定义好了,写个main方法测试一下

public class ForkJoinTest {
    public static void main(String[] args) {
        TaskTest taskTest = new TaskTest();
        //1、invoke方法
//        ResultBean bean = ForkJoinPool.commonPool().invoke(taskTest);
//        System.out.println("结果:" + bean);

        //2、submit方法
        ForkJoinTask<ResultBean> submitResult = ForkJoinPool.commonPool().submit(taskTest);
        try {
            //直接调用getRawResult方法返回的值是null
            submitResult.getRawResult();
            //1、没有等待超时时间的
            ResultBean bean = submitResult.get();
            System.out.println("get:" + bean);
            //2、有等待超时时间的
            //submitResult.get(1000, TimeUnit.SECONDS);

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}

参考:

  【1】http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/

  【2】并发编程网,http://ifeve.com/tag/forkjoinpool/

      【3】廖雪峰官方网站,https://www.liaoxuefeng.com/wiki/1252599548343744/1306581226487842

原文地址:https://www.cnblogs.com/happyflyingpig/p/10144571.html