fork Join框架

fork Join很像mapreduce的处理过程。先将任务切割成小的任务分别计算然后再将小任务的计算结果合并起来

package forkJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

/**
 * Created by luozhitao on 2017/9/29.
 */
public class forkTask extends RecursiveTask<Integer> {
    private static int LIMIT=10;
    int start;
    int end;

    int sum;

    public forkTask(int start,int end){

        this.start=start;
        this.end=end;


    }


    @Override
    protected Integer compute() {


        if(end-start<LIMIT){


            for (int i=start;i<=end;i++){


                sum+=i;
            }

        }
        else{

            int mid=(start+end)/2;

         //   System.out.println("new -----");

            forkTask task1=new forkTask(start,mid);
            forkTask task2=new forkTask(mid+1,end);

            invokeAll(task1,task2);
            try {

//拿到小任务的计算结果 sum
= task1.get() + task2.get(); }catch (InterruptedException e){ } catch (ExecutionException e){} } return sum; } }

main

package forkJoin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
 * Created by luozhitao on 2017/9/29.
 */
public class Taskapp {

    private  static Logger logger= LoggerFactory.getLogger(Taskapp.class);

    public static void main(String [] args){


        ForkJoinPool forkJoinPool=new ForkJoinPool();


        forkTask forktask=new forkTask(1,1000000);

        Future<Integer> future=forkJoinPool.submit(forktask);


        do{

            logger.info("activeThreadCount {},stealCount {},parallelisem {},QueueCount {}",forkJoinPool.getActiveThreadCount(),forkJoinPool.getStealCount(),forkJoinPool.getParallelism(),forkJoinPool.getQueuedTaskCount());
        //    logger.info("stealCount",forkJoinPool.getStealCount());

            //最大并发数
      //      logger.info("parallelisem",forkJoinPool.getParallelism());
       //     logger.info("QueueCount",forkJoinPool.getQueuedTaskCount());


        }while (!forktask.isDone());


        forkJoinPool.shutdown();

        try {
            logger.info("results is {}",future.get());

        }catch (ExecutionException e){


        }catch (InterruptedException e){}



        //



        //

     //


    }
}
原文地址:https://www.cnblogs.com/luo-mao/p/7610422.html