fork/join并发编程

Fork & Join 的具体含义

Fork 一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux 平台中,函数 fork()用来创建子进程,使得系统进程可以多一个执行分支。在 Java 中也沿用了类似的命名方式。

而 Join() 的含义和 Thread 类的 join 类似,表示等待。也就是使用 fork() 后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此 join 就是表示等待。

在实际使用中,如果毫无顾忌的使用 fork 开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出一个 ForkJoinPool 线程池,对于 fork() 方法并不急着开启线程,而是提交给 ForkJoiinPool 线程池进行处理,以节省系统资源。

由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。因此,在实际执行过程中,可能遇到这么一种情况:线程A已经把自己的任务都处理完了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助” 线程B,从线程 B的任务队列中拿一个任务来处理,尽可能的达到平衡。值得注意的是:当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试图执行自己的任务时,则从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。

我们看看线程池 ForkJoinPool 的一个接口:

 /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }

你可以向 ForkJoinPool 线程池提交一个 ForkJoinTask 任务。所谓 ForkJoinTask 任务就是支持 fork () 分解以及 join()等待的任务。 ForkJoinTask 有两个重要的子类,RecursiveAction 和 RecursiveTask。他们分别表示没有返回值的任务和可以携带返回值的任务。有点像 Rannable 和 Callable。

下面来要给简单的例子展示 Fork/Join 框架的使用。这里查找给定的文件夹中,找出最大的文件

1、创建继承RecursiveTask的类

public class ReckonTask extends RecursiveTask<File> {


    private static final long serialVersionUID = 3932273269900797021L;
    File path;
    Filter filter;

    public ReckonTask(File path, Filter filter) {
        this.path = path;
        this.filter = filter;
    }

    @Override
    protected File compute() {
        File maxFile = null;

        File[] files = path.listFiles();
        for (File file : files) {
            if (file.isDirectory()){
                ReckonTask reckonTask = new ReckonTask(file,filter);
                reckonTask.fork();
                maxFile = filter.addFilter(maxFile,reckonTask.join());
            }else{
                maxFile = filter.addFilter(maxFile,file);
            }
        }
        return maxFile;
    }
}

2、创建过滤类Filter

public class Filter {
    public File addFilter(File f1, File f2) {
        if (null==f1){
            return f2;
        }else if (null==f2){
            return f1;
        }
        return f1.length()>f2.length()?f1:f2;
    }
}

3、调用

public static void main(String[] args) throws ExecutionException, InterruptedException {
        long begin = System.currentTimeMillis();
        File file = new File("D:\mavenDB");
        ForkJoinPool pool = new ForkJoinPool();
        ReckonTask reckonTask = new ReckonTask(file,new MyFilter());
        ForkJoinTask<File> submit = pool.submit(reckonTask);
        File file1 = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("路径:"+file1.getPath()+",大小:"+file1.length()+",耗时:"+(end-begin));
        //关闭线程池
        pool.shutdown();
    }
原文地址:https://www.cnblogs.com/cq-yangzhou/p/11326191.html