Java中的Fork /Join框架指南

1.概述

fork / join框架在Java 7中提供。它提供了一些工具,通过尝试使用所有可用的处理器内核来帮助加速并行处理 - 这是通过分而治之的方法实现的

实际上,这意味着框架首先“forks”,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。

之后,“join”部分开始,其中所有子任务的结果以递归方式连接到单个结果中,或者在返回void的任务的情况下,程序只是等待直到执行完每个子任务。

为了提供有效的并行执行,fork / join框架使用一个名为ForkJoinPool的线程池,它管理ForkJoinWorkerThread类型的工作线程

2. ForkJoinPool

ForkJoinPool是框架的心脏。它是ExecutorService一个实现,它管理工作线程(ForkJoinWorkerThread)并为我们提供工具来获取有关线程池状态和性能的信息。

一个工作线程同一时间只能执行一个任务,但是ForkJoinPool不会为所有子任务创建单独的线程。

相反,池中的每个线程都有自己的双端队列(deque),用于存储任务。

这种架构对于在工作窃取算法的帮助下平衡线程的工作负载至关重要

大致描述:

  就是建立一个线程池,每个线程有一个任务队列,

  提交的任务被算法(自己代码定义的)拆分成很多个小任务(任务实现这两个接口之一:RecursiveAction或者RecursiveTask),放到各个线程的双端队列中,

  每个线程从自己的队列中取任务执行,自己的队列执行完毕则会去同线程池的其它线程队列中取任务来自行。

 

图片来源:https://www.cnblogs.com/cjsblog/p/9078341.html

2.1.工作窃取算法

简单地说 - 空闲线程试图从繁忙线程的deques中“窃取”工作。

默认情况下,工作线程从其自己的双端队列中获取任务。

当它为空时,线程从另一个忙线程的双端队列尾部或全局入口队列中获取任务,因为这是待完成任务可能位于的位置。

这种方法最大限度地减少了线程竞争任务的可能性。它还减少了线程必须寻找工作的次数,因为它首先在最大可用工作块上工作。

2.2.ForkJoinPool实例化

在Java 8中,访问ForkJoinPool实例的最方便方法是使用其静态方法commonPool()顾名思义,这将提供对公共池的引用,公共池是每个ForkJoinTask的默认线程池

根据Oracle的文档,使用预定义的公共池可以减少资源消耗,因为这会阻止为每个任务创建单独的线程池。

 
ForkJoinPool commonPool = ForkJoinPool.commonPool();

通过创建ForkJoinPool并将其分配给实用程序类公共静态字段,可以在Java 7中实现相同的行为

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

使用ForkJoinPool的构造函数,可以创建具有特定级别的并行性,线程工厂和异常处理程序的自定义线程池。在上面的示例中,池的并行度为2.这意味着池将使用2个处理器核心。

3. ForkJoinTask <V>

ForkJoinTaskForkJoinPool中执行的任务的基本类型

在实践中,它的两个子类之一应该被继承:RecursiveAction返回值为空,RecursiveTask <V>有返回值。 

它们都有一个抽象方法compute(),其中定义了任务的逻辑,也就是要覆写的方法。

3.1.RecursiveAction - 一个例子

在下面的示例中,要处理的工作单元由称为工作负载String表示出于演示目的,该任务是一个荒谬的任务:它只是j简单的转为大写并打印它。

为了演示框架的分支行为,如果workload.length() 大于指定的阈值则使用createSubtask()方法该示例将分割任务

String被递归地划分为子串,创建基于这些子串的CustomRecursiveTask实例。

因此,该方法返回List <CustomRecursiveAction>。

使用invokeAll()方法将列表提交给ForkJoinPool

 
public class CustomRecursiveAction extends RecursiveAction {
 
    private String workload = "";
    private static final int THRESHOLD = 4;
 
    private static Logger logger = 
      Logger.getAnonymousLogger();
 
    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }
 
    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }
 
    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();
 
        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());
 
        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));
 
        return subtasks;
    }
 
    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
          + Thread.currentThread().getName());
    }
}

此模式可用于开发自己的RecursiveAction要执行此操作,请创建一个表示工作总量的对象,选择合适的阈值,定义分割工作的方法,并定义执行工作的方法。

3.2.RecursiveTask <V>

对于返回值的任务,此处的逻辑类似,除了每个子任务的结果合并在一个结果中:

 
public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;
 
    private static final int THRESHOLD = 20;
 
    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }
 
    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }
 
    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }
 
    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

在此示例中,工作由存储在CustomRecursiveTaskarr数组中

createSubtask() 方法递归地将所述任务分成小块任务,直到每个任务是小于阈值

然后,invokeAll() 方法将子任务提交给公共线程池并返回Future列表

要触发执行,为每个子任务调用join()方法。

4.将任务提交给ForkJoinPool

要将任务提交到线程池,可以使用很少的方法。

submit()execute() 方法(它们的使用情况是一样的):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke()方法拆分任务并等待结果,并且不需要任何手动join:

int result = forkJoinPool.invoke(customRecursiveTask);

invokeAll()方法是提交多个orkJoinTasksForkJoinPool的便捷方式。

它将任务作为参数,forks它们将按照生成它们的顺序返回Future对象的集合

或者,您可以使用单独的fork()join()方法。

fork()方法将任务提交到一个线程池中,但它不会触发它的执行。

join()方法被用于触发执行。RecursiveAction的情况下join()只返回null ; 对于RecursiveTask <V>,它返回任务执行的结果:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

在我们的RecursiveTask <V>示例中,我们使用invokeAll()方法向池提交一系列子任务。使用fork()join()可以完成相同的工作,但这会对结果的排序产生影响。

为避免混淆,使用invokeAll()方法向ForkJoinPool提交多个任务通常是个好主意

5.结论

使用fork / join框架可以加速处理大型任务,但要实现这一结果,应遵循一些指导原则:

  • 使用尽可能少的线程池 - 在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池
  • 如果不需要特定调整,请使用默认的公共线程池
  • 使用合理的阈值ForkJoingTask拆分为子任务
  • 避免在 ForkJoingTasks中出现任何阻塞
原文地址:https://www.cnblogs.com/gc65/p/10652300.html