java fork/join简单实践

我们知道,java8中有并行流,而并行流在后台的实现是通过fork/join池来完成的,例如:

List<Integer> a = buildList();

List<Integer> b = buildList();

a.parallelStream().foreach(System.out::println);

b.parallelStream().foreach(System.out::println);

如果查看线程dump会发现,这两个并行流公用的是同一套线程池,.这在I/O密集型任务中会是个问题,应为两个并行流会因阻塞而有不必要的等待.在查阅相关文章之后,发现可以通过java中的fork/join框架来改良.这里简单介绍一下:

要用fork/join框架需要继承RecursiveTask类或者RecursiveAction,这两个的区别是一个有返回值,一个没有返回值,按字面上的意思就是递归任务.

这类task有fork()方法,就是拆分任务,还有一个join方法,合并任务.

这些task放到ForkJoinPool中执行.ForkJoinPool可以指定线程池大小.

想到fork/join思想其实就是分治的思想,又和归并排序的思想一致,就写了一个用fork/join框架实现的归并排序,供大家参考使用:

package com.wl.test2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

/**
 * Created by wally on 9/14/17.
 */
public class MergeSortByRecursiveTask extends RecursiveTask<List<Integer>> {

    private int start;

    private int end;

    private List<Integer> numbers;


    public MergeSortByRecursiveTask(int start, int end, List<Integer> numbers) {
        this.start = start;
        this.end = end;
        this.numbers = numbers;
    }

    @Override
    protected List<Integer> compute() {
        if (start == end) {
            return numbers;
        }
        int mid = (start + end) / 2;
        int aa = mid+1;
        MergeSortByRecursiveTask mergeSortByRecursiveTask = new MergeSortByRecursiveTask(start, mid, numbers);
        MergeSortByRecursiveTask mergeSortByRecursiveTask1 = new MergeSortByRecursiveTask(aa, end, numbers);

        mergeSortByRecursiveTask.fork();
        mergeSortByRecursiveTask1.fork();

        mergeSortByRecursiveTask.join();
        mergeSortByRecursiveTask1.join();

        merge();
        return numbers;
    }


    private void merge() {

        List<Integer> newNumbers = new ArrayList<>(numbers);
        int mid = (end + start) / 2;

        int tempMid = mid + 1;
        int i = start;
        int k = start;
        int m = start;
        while (m <= mid && tempMid <= end) {
            if (numbers.get(m) <= numbers.get(tempMid)) {
                newNumbers.set(k, numbers.get(m));
                m++;
            }else{
                if(tempMid==2){
                    System.out.println("here");
                }
                newNumbers.set(k,numbers.get(tempMid));
                tempMid++;
            }
            k++;
        }

        while(m<=mid){
            newNumbers.set(k,numbers.get(m));
            m++;
            k++;
        }
        while(tempMid<=end){
            newNumbers.set(k,numbers.get(tempMid));
            tempMid++;
            k++;
        }
        System.out.println("start is :"+ start+", mid is " + mid+ ", end is " + end);
        while(i<=end){

            numbers.set(i,newNumbers.get(i));
            System.out.print(newNumbers.get(i)+",");
            i++;
        }
        System.out.println(";");
        System.out.println(Thread.currentThread().getName());

    }
}

main函数:

package com.wl.test2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by wally on 3/27/17.
 */
public class TestDIr {

    private static boolean a = true;

    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
        List<Integer> temp = new ArrayList<>();
        temp.add(3);
        temp.add(9);
        temp.add(4);
        temp.add(3);
        temp.add(1);
        temp.add(8);
        temp.add(5);
        List<Integer> result = new ArrayList<>(temp);
        MergeSortByRecursiveTask mergeSortByRecursiveTask = new 
       MergeSortByRecursiveTask(0,temp.size()-1,temp);
        ForkJoinPool pool = new ForkJoinPool();
        Future<List<Integer>> sortResult = pool.submit(mergeSortByRecursiveTask);

        try{
            System.out.println(Arrays.toString(sortResult.get().toArray()));
         
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

输出结果:

start is :4, mid is 4, end is 5
1,8,;
ForkJoinPool-1-worker-3
start is :4, mid is 5, end is 6
1,5,8,;
ForkJoinPool-1-worker-3
start is :0, mid is 0, end is 1
3,9,;
ForkJoinPool-1-worker-2
start is :2, mid is 2, end is 3
3,4,;
ForkJoinPool-1-worker-2
here
start is :0, mid is 1, end is 3
3,3,4,9,;
ForkJoinPool-1-worker-2
start is :0, mid is 3, end is 6
1,3,3,4,5,8,9,;
ForkJoinPool-1-worker-1
[1, 3, 3, 4, 5, 8, 9]

最后推荐几篇文章:

http://www.infoq.com/cn/articles/fork-join-introduction

http://www.importnew.com/16801.html

原文地址:https://www.cnblogs.com/foreveravalon/p/7542071.html