6.跑步者--并行编程框架 ForkJoin

本文如果您已经了解一般并行编程知识。了解Java concurrent部分如ExecutorService等相关内容。

虽说是JavaForkJoin并行框架。但不要太在意Java,当中的思想在其他语言环境也是相同适用的。由于并发编程在本质上是一样的。就好像怎样找到优秀的Ruby程序猿?事实上要找的仅仅是一个优秀的程序猿。

当然,假设语言层面直接支持相关的语义会更好。

 

引言

Java 语言从一開始就支持线程和并发性语义。

Java5添加的并发工具又攻克了一般应用程序的并发需求。Java6Java7又进一步补充了一些内容。原来的工具主要是粗粒度的并发。

比方每一个web请求由一个工作线程处理,在线程池分配任务。

而Java7中新引入的ForkJoin能够处理更细粒度的并行计算。

 

早期的时候都是单核cpu环境。假设不是多核环境下。线程/进程并非真正的并行运行,主要用来表示异步运行效果。

单核cpu上,假如每一个任务全然是cpu密集的(没有等待),那么这样的伪并发并不会使计算变快。

仅仅有在真正的多核环境才干起到加速作用,而如今多核已经普及。甚至已经到了手机上!


介绍

ForkJoin是适用于多核环境的轻量级并行框架。目标是在多核系统下,通过并行运算。充分利用多处理器。提高效率与加速执行。


ForkJoin编程范式:将问题递归地分解为较小的子问题,并行处理这些子问题,然后合并结果,如:

if (my portion of the work is small enough)   

     do the work directly 

else   

     split my work into two pieces   

     invoke the two pieces and wait for the results

 

ForkJoin由一组工作线程组成。用来运行任务。核心是work-stealing算法。能够有大量任务,但实际仅仅有少量真正的物理线程。默认是机器的cpu数量,也可指定。非常多其他工作的算法都能够在此基础之上进行。


尽管起初直接使用它的人可能不多,但将来会被非常多框架在底层使用,由于是如此基础,所以终于ForkJoin可能会无处不在。

一般而言,使用者仅仅须要关心两个方法fork()  join()。它们分别表示:子任务的异步运行和堵塞等待结果完毕。 


ForkJoin框架的核心是ForkJoinPool类,实现了work-stealing算法,用于运行ForkJoinTask类型的任务(也就是依照该算法调度线程与任务,当然还负责解决好相关的一些其他问题)。

 

work-stealing算法

work-stealing 是一种任务调度方法,由多个工作线程组成。每一个工作线程用一个双端队列维护一组任务。

Fork的时候是把任务加到队列的头部。而不像一般的线程池那样是加到任务队列末尾。工作线程选择头部最新的任务来运行。当工作线程没有任务可运行时,它会尝试从其他线程的任务队列尾部窃取一个任务运行。假设没有任务运行了而且窃取其他任务失败,那么工作线程停止。


这样的方法的长处是降低了争用,由于工作线程从头获取任务,而窃取线程从尾部窃取任务。还有一个长处是递归的分治法使得早期产生的是较大的任务单元。而窃取到较大任务会进一步递归分解。因此也降低了尾部窃取的次数。

另外,父任务非常可能要等待子任务(join)。所以从队列头部子任务開始运行也是一种优化。


总之。它会使用有限的线程运行大量任务,同一时候保持各线程的任务都处于繁忙的运行状态。而尽量不让线程处于等待状态。

为了做到这点可能会从其他线程的任务队列中窃取任务来运行,所以叫work-stealing


就像前面所说物理线程不能太多,过多的话切换管理开销就会较大,还会消耗很多其它的内存等资源,而且没有带来不论什么优点。默认是用cpu数量的线程数,普通情况都比較合适(比方Runtime.getRuntime().availableProcessors()返回处理器的数量),但详细的数值还和任务自身的特点有关,能够通过不同參数測试比較一下。而任务能够是大量的,由每一个线程的工作队列维护。


ForkJoin是简化了一些开发人员的工作,假设不用ForkJoin。最原始的方式是自己手工切分任务并分别创建线程运行。

 

分治、并行、可伸缩的思考:

这三者关系非常亲热。分治思想(divide-and-conquer)是一种简单朴素的思想。非常多问题都能够这样解决。ForkJoin就相当于分治法的并行版本号。 分治本身仅仅是解决这个问题的思想,既能够顺序运行也能够并行运行,可是在并行环境中更加有效。由于能够并行处理子问题。

而在并行方面,可并行处理问题要么是彼此全然独立的问题,要么是可分解单独处理的问题。可伸缩性又和是否能并行处理紧密相关。由于假设不能并行处理就要受到单机处理能力的限制,也就难以伸缩了。


ForkJoinMapReduce两个并行计算框架的差别 

MapReduce是把大数据集切分成小数据集,并行分布计算后再合并。

ForkJoin是将一个问题递归分解成子问题。再将子问题并行运算后合并结果。

二者共同点:都是用于运行并行任务的。基本思想都是把问题分解为一个个子问题分别计算,再合并结果。应该说并行计算都是这样的思想,彼此独立的或可分解的。从名字上看ForkMap都有切分的意思。JoinReduce都有合并的意思。比較类似。

差别:

1)环境差异。分布式 vs 单机多核:ForkJoin设计初衷针对单机多核(处理器数量非常多的情况)。MapReduce一開始就明白是针对非常多机器组成的集群环境的。也就是说一个是想充分利用多处理器,而还有一个是想充分利用非常多机器做分布式计算。这是两种不同的的应用场景。有非常多差异,因此在细的编程模式方面有非常多不同。

2)编程差异:MapReduce通常是:做较大粒度的切分,一開始就先切分好任务然后再运行,而且彼此间在最后合并之前不须要通信。这样可伸缩性更好,适合解决巨大的问题,但限制也很多其它。ForkJoin能够是较小粒度的切分,任务自己知道该怎样切分自己,递归地切分到一组合适大小的子任务来运行。由于是一个JVM内,所以彼此间通信是非常easy的。更像是传统编程方式。


ForkJoin框架基本结构

ForkJoinPool本身实现了ExecutorService接口,负责调度运行ForkJoinTask

ForkJoinTask是提交给ForkJoinPool 运行的任务,本身也实现了Future 接口。

 

ForkJoinTask有两个子类RecursiveActionRecursiveTask

 RecursiveAction 没有返回值(仅仅需fork);RecursiveTask有返回值(须要合并)。

类似于Runnable Callable一样。没有返回值一般意味着全部子任务都运行完了就可以,中间的子任务不须要join了。事实上要不要返回值都能够实现。有返回值能够直接合并。没有返回值能够把结果保存在共享的数据上。

 

而我们要做的是实现自己要完毕的任务,仅仅须要继承其一,并覆盖抽象方法compute()。在这种方法中实现自己的任务,递归分解任务。

 

ForkJoinPool与一般的ExecutorService实现的区别

ForkJoin实现了ExecutorService接口,这个接口就是用来把任务交给线程池中的工作线程去运行。ForkJoin也是一个ExecutorService,但差别在于ForkJoin使用了work-stealing算法,见前面的介绍。

普通的线程池是按FIFO的方式运行,而ForkJoin优先运行(由其他任务)后创建子任务。

对于大部分产生子任务的任务模式,ForkJoin的处理实现会非常高效。假设设置了异步模式, ForkJoin也可能适合运行事件类型(不须要join)的任务。


影响ForkJoin加速效果的因素

理想效果是核越多加速效果越好。

可是并行不一定更快,參数不正确还可能更慢:

1)  并发数,即线程数。通常是可用的cpu数,默认就是这个,一般表现非常好。

2)  任务切分的粒度。假设切分粒度等于总任务量,一个任务运行,就相当于单线程顺序运行。每一个任务运行的计算量。太大的话加速效果有限。不能发挥到最好。相反,太小的话,消耗在任务管理的成本占了主要部分。导致还不如顺序运行的快。 

须要适当平衡二者。由于还和任务本身的特定有关。所以能够做个基准測试比較一下。

而总的运行时间还与任务的规模有关。


任务粒度应该适中,多大合适?好像在什么地方上看到说:经验上单个任务为100-10000个基本指令,当然还和任务本身的特定有关。

 

个人感觉多核cpu仅仅适用于解决计算密集型应用,由于实际问题可能IO等其它方面的瓶颈,多核也还是无法充分利用的。

  

使用ForkJoin的步骤:

ForkJoin框架替我们完毕了一些工作,那么我们使用时还要完毕哪些工作:

1)  怎样运行单个任务。假设仅仅切分出一个任务运行,就相当于单线程顺序运行。

2)  怎样递归地切分任务(以及任务切分后是否须要合并结果)

3)  切分粒度多少合适(最小任务单元)

这些详细表如今:继承ForkJoinTask的一个子类。并实现抽象方法compute()

在这种方法中实现自己的任务,递归分解任务。

 

这些准备好之后就能够启动了:创建一个表示所有任务的ForkJoinTask对象,创建一个ForkJoinPool的实例。把task作为參数运行ForkJoinPoolinvoke方法。

 

ForkJoin任务外部运行总任务:execute异步运行任务。没有返回结果voidinvoke运行任务并等待返回结果。结果是特定类型。submit运行一个任务,返回ForkJoinTask(实际上是作为Future对象返回)。一般应该在外部使用invoke调用运行总任务。

executesubmit仅仅是为了实现ExecutorService规定的相关语义,invokeForkJoin中特有的。

 

ForkJoinTask内部递归运行的过程中:fork是异步运行。invoke是等待任务运行完毕。


 

详细实例:

多看看详细演示样例比較好。

1)  合并排序演示样例:

合并排序是常见的排序算法之中的一个。

演示样例实现了对一个整数数组的合并排序。同一时候还演示了不同并发数(线程数)与不同数组大小的组合測试。

代码在<jdk_ home>/sample/ForkJoin/ 中。

 

2)  把图片模糊处理演示样例:

一个图片能够被表示为一个m*n大小的整数数组,当中每一个整数表示一个像素(的颜色)。模糊处理之后的图像还是一个相同大小的整数数组。处理过程是把原来的每一个像素与周围像素的颜色求平均值就可以。

假设顺序运行就是从头到尾对每一个像素运行一次计算得到目标像素,由于每一个像素的计算是独立的,所以能够把这个整数数组切分成一块一块的子数组(即子任务)分别运行。任务不适合切分的过小。所以设定了一个常数阈值10000,大小小于10000的子数组就直接运行,否则对半切分为两个子数组的任务分别运行。文章 源码

      在我自己的机器上i3处理器 (i3,4cpu),并行确实快了不少。

 

其他的比如:求最大值max、平均值avg、求和sum等聚合函数都是能够分解计算的。

演示样例中都是对数组的处理。比較常见的是对数组、集合进行并行地处理操作。但也不限于此。  

网上有一些Fibonacci 的演示样例。但这些演示样例并不适合展示ForkJoin


 

Doug LeaJSR-166

说到Java并发编程,就不能不说到Doug LeaJSR-166Doug Lea是并发编程方面的专家。纽约州立大学奥斯威戈分校的计算机教授。曾是JCP运行委员。是JSR-166leader

JSR-166就是负责向Java语言中加入并发编程工具的,即我们见到的java.util.concurrent包(及子包)。还是《 Concurrent Programming in Java Design Principles and Patterns》一书的作者,是这方面最早的书。

他还是知名的内存分配方法dlmalloc的作者,这是C语言中的动态内存分配函数malloc的一种普遍使用的实现。

  

參考资料:

 其他并行框架:

版权声明:本文博客原创文章,博客,未经同意,不得转载。

原文地址:https://www.cnblogs.com/zfyouxi/p/4650218.html