ForkJoinPool 源码

ForkJoinPool----(以下简称FJP)
先看task.fork方法,含义是将当前任务,放到当前线程的工作队列中。但是第一次执行这个方法是在主线程中,主线程是不可能被FJP管理的。那么就进入ForkJoinPool.common.externalPush,在common这个default的线程池里执行这个任务,
externalPush的意思,是把外面的任务,放到当前线程池中执行。刚进入externalPush,会检查随机数不是0,workQueues不是空,这些条件第一次肯定是不满足的,那么进入externalSubmit,先初始化随机数,

ctl是一个volatile long类型的控制变量,从高到低,前16位是(当前活跃线程数-最小并发数),往后16位是(总线程数-最小并发数),再往后16位是栈顶(Treiber stack)等待线程的标志,最后16位是栈顶等待线程在线程池数组中的位置。
runState是一个volatile int类型的控制变量,来标志当前线程池运行状态,有锁住,信号,已启动,停止,终止,关闭几种状态。(当然也可以不从fork方法进来,而是pool.invoke,意味着已经有一个FJP了,直接执行任务)

FJP的逻辑很清晰:externalPush是统一入口(没有FJP,用common,有的话直接进),没初始化的进入externalSubmit,externalSubmit逻辑很清晰:初始化workQueues,初始化当前线程对应的workQueue,hash冲突了重新生成随机数rehash。都
完成之后,提交到自己的workQueue中,然后signalWork。

signalWork:方法的含义是现在新添加了一个task,需要去看下有没有满负荷(判断的标准是活跃线程是不是已经达到了并发数)。有空闲线程,那么唤起(去第三个和第四个16位找空闲线程栈顶信息,unpark方法),如果没有空闲线程但是总线程数不足(c & ADD_WORKER是判断第二个16位是正数还是负数,负数说明总线程数不足)
那么addWorker。

唤起的线程或新增的线程,会在run方法里进行无限循环,现在自己的任务队列里面找任务,找不到去别的线程的任务队列里找(steal),steal和自己从自己队列找是不同方向,所以能减少并发。直到所有的task都被取走,当前线程
挂起休眠。

ForkJoinTask.join,其实fork把任务放进任务队列之后,任务就可以被执行了,但是如果想获取任务执行的结果,需要让task.join,

原文地址:https://www.cnblogs.com/chuliang/p/9851726.html