forkJoin

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:

  1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

  2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据

1.ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:

    a.RecursiveAction:用于没有返回结果的任务

    b.RecursiveTask:用于有返回结果的任务

  2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行

  任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。

计算和

package testThread.zm.fork;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkServer extends RecursiveTask<Long> {
	private Integer start;//任务初始值
	private Integer end;//任务结束值
	private static final Integer THREAD_HOLD=20;//阈值
	private int count=0;
	public ForkServer(Integer start,Integer end){
		this.start=start;
		this.end=end;
	}
	@Override
	protected Long compute() {
		count++;
		Long sum=0L;
		boolean swatch=end-start<=THREAD_HOLD;
		if(swatch){
			for(int i=start;i<=end;i++){
				sum+=i;
			}
		}else{
			Integer middle = (end+start)/2;
			ForkServer fork1 = new ForkServer(start, middle);//拆分头
			ForkJoinTask<Long> task1=fork1.fork();
			ForkServer fork2 = new ForkServer(middle+1, end);
			ForkJoinTask<Long> task2= fork2.fork();
			System.out.println(task1.join()+"===="+task2.join()+"=="+count+"=="+Thread.currentThread().getName());
			return fork1.join()+fork2.join();
		}
		return sum;
	}
	
	public static void main(String[] args) {
		ForkJoinPool pool = new ForkJoinPool();
		Long begin = System.currentTimeMillis();
		//Long sum = pool.invoke(new ForkServer(1, 1000000)); 
		Future<Long> sum = pool.submit(new ForkServer(1, 1000000));
		try {
			System.out.println("sum="+sum.get()+"耗时:"+(System.currentTimeMillis()-begin));
		} catch (InterruptedException | ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
}

  计算文件大小

package testThread.zm.fork;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class FileSize {
    private final static ForkJoinPool forkJoinPool = new ForkJoinPool();
    
    private static class FileSizeFinder extends RecursiveTask<Long>{
        private final File file;
        public FileSizeFinder(File file){
            this.file=file;
        }
        
        @Override
        protected Long compute() {
            Long sum=0L;
            if(file.isFile()){
                sum= file.length();
            }else{
                File[] files = file.listFiles();
                if(null!=files){
                    List<ForkJoinTask<Long>> tasks = new ArrayList<ForkJoinTask<Long>>();
                    for(File file:files){
                        if(file.isFile()){
                            sum+=file.length();
                        }else{
                            tasks.add(new FileSizeFinder(file));
                        }
                    }
                    
                    for(ForkJoinTask<Long> task:tasks){
                        task.fork();
                        sum+=task.join();
                    }
                }
                
            }
            return sum;
        }
    }
    
    public static void main(String[] args) {
        File file = new File("D:/ftp");
        Long size=forkJoinPool.invoke(new FileSizeFinder(file));
        System.out.println("文件大小:"+size);
    }
}
原文地址:https://www.cnblogs.com/zmblog/p/9681404.html