JobContainer

 

 
真正的执行都是在这里完成,start代码如下:
View Code

主要执行流程为:

1、preHandle():job前置操作
2、init():初始化reader和writer
3、prepare():执行插件的prepare操作
4、split():切分任务
5、schedule():执行任务
6、post():执行插件的post操作
7、postHandle():job后置操作
8、invokeHooks():调用hook
9、输出统计结果

上述任务流是顺序执行的,第5步会将切分的task分配到多个taskGroup中并发执行,

其中preHandle、postHandle、invokeHooks不影响整体执行,可以先忽略,

下面主要介绍关键步骤


1)init()

关键代码主要是初始化reader和writer插件

 

reader插件初始化的时候使用了自定义classLoader,这样可以做到插件级别的隔离,插件初始化完成之后调用了job的init函数,用于初始化插件的Job,writer插件初始化同理


2)prepare():

prepare操作比较简单,分别执行reader和writer插件Job中的prepare函数即可,同样,每次执行前都会先加载对应的classLoader用于隔离

private void prepare() {
    this.prepareJobReader();
    this.prepareJobWriter();
}

3)split():切分任务task

 

上面函数主要分为3步:

1、计算限速和并发,即实际的channel数和每个channel的限速,主要在adjustChannelNumber()中,这里不做过多说明
2、根据实际的channel数,切分reader端,具体的切分逻辑reader插件可以自行实现
3、根据reader端切分的数目切分writer端,达到reader:writer=1:1,这样每个task中都包含一个reader和一个writer

4)schedule():执行切分出来的task

 

schedule执行过程主要分为以下几步:

1、计算taskGroup个数
2、将切分的task分配到taskGroup中
3、启动线程池执行taskGroup,具体代码流程为scheduler.schedule(taskGroupConfigs) -> AbstractScheduler.schedule -> startAllTaskGroup -> ProcessInnerScheduler.startAllTaskGroup -> this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner) -> TaskGroupContainerRunner.run() -> this.taskGroupContainer.start()
4、收集taskGroup汇报的信息

5)post():执行插件的post操作

private void post() {
    this.postJobWriter();
    this.postJobReader();
}
原文地址:https://www.cnblogs.com/muzhongjiang/p/13158986.html