5 物理执行图(Task 的调度和执行)
从 1.3.6 接着进行调度的源码分析:
JobMaster.java
DefaultScheduler.java
PipelinedRegionSchedulingStrategy.java
DefaultScheduler.java
DefaultExecutionVertexOperations.java
ExecutionVertex.java
Execution.java
RpcTaskManagerGateway.java
TaskExecutor.java
Task.java
接下来启动 Task 执行线程,调用 Task.run() -> doRun()
这里的 invokable 即为 operator 对象实例,通过反射创建,比如 StreamTask。
nameOfInvokableClass 在生成 StreamGraph 的时候,就已经确定了,见 3.1.2 中的StreamGraph.addOperator 方法:
这里的 OneInputStreamTask.class 即为生成的 StreamNode 的 vertexClass。这个值会一直传递,当 StreamGraph 被转化成 JobGraph 的时候,这个值会被传递到 JobVertex 的invokableClass。然后当 JobGraph 被转成 ExecutionGraph 的时候,这个值被传入到ExecutionJobVertex.TaskInformation.invokableClassName 中,一直传到 Task 中。
继续看 invokable.invoke():
StreamTask.java
MailboxProcessor.java
runDefaultAction()执行默认操作,通过 Control+h 查找具体实现,为 StreamTask.java 中第 292 行
StreamTask.java
MailboxProcessor.java 查看构造器
所以执行的默认操作就是 processInput():
StreamTask.java
StreamOneInputProcessor.java
StreamTaskNetworkInput.java