海豚调度活动图解析

最近在看dolphinScheduler的源码,还是很经典的

一、架构图

二、活动图

三、执行流程

3.1.master分配任务给worker

3.1.1.主线

MasterServer启动NettyRemotingServer,MasterSchedulerService,QuartzExecutors,ZKMasterClient

MasterSchedulerService构造了ThreadPoolExecutor线程池

ThreadPoolExecutor线程池执行MasterExecThread(对应一个任务流)

MasterExecThread将任务流构造出一张task构成的DAG,并将task写入PriorityQueue

3.1.2.消费支线-对象初始化后启动

TaskPriorityQueueConsumer从PriorityQueue获取task,调用ExecutorDispatcher

ExecutorDispatcher调用NettyExecutorManager

NettyExecutorManager调用NettyRemotingClient.send()发送到worker

3.1.3.调度支线

QuartzExecutors启动Scheduler

Scheduler接收从API提交的调度任务

3.1.4.DB支线

TaskResponseService初始化后启动一个TaskResponseWorker线程

TaskResponseWorker负责将eventQueue中的事件进行持久化,并向channel发送一条信息,分两种情况:

ACK:发送DBTaskAckCommand

Response:发送DBTaskResponseCommand

3.1.5.处理器

master中的NettyRemotingServer有三个processor:task回复、task-确认,task-kill回复

TaskResponseProcessor:将信息更新到eventQueue
TaskAckProcessor:将信息更新到eventQueue
TaskKillResponseProcessor:打印一条日志
3.1.6.竞选master

ZKMasterClient

 

3.2.worker接收任务

3.2.1.主线

WorkerServer.run()启动NettyRemotingServer,WorkerManagerThread,RetryReportTaskStatusThread,WorkerRegistry(将自己注册到zookeeper)

WorkerManagerThread从workerExecuteQueue获取TaskExecuteThread并执行

TaskExecuteThread.run()进行AbstractTask.handle()处理,处理完后调用TaskCallbackService.sendResult()中的NettyRemoteChannel发送responseCommand消息给master

3.2.2.执行任务支线

NettyRemotingServer注册NettyServerHandler

NettyServerHandler.processReceived()处理从Master接收到的command,根据type得到不同的processor

TaskExecuteProcessor执行process()方法,调用TaskCallbackService

TaskCallbackService执行NettyRemoteChannel发送task:ack消息给master

3.2.3.处理器

worker里的NettyRemotingServer有4个processor:task-execute请求、task-kill请求、db-确认、db-回复

TaskExecuteProcessor:初始化taskExecutionContext,创建一条新channel,发送master一条ACK信息,并将command加入到workerExecuteQueue
TaskKillProcessor:杀掉进程,并给master发送一条taskKillResponseCommand
DBTaskAckProcessor:如果taskAckCommand状态成功,从缓存中删除对应taskID
DBTaskResponseProcessor:如果taskResponseCommand状态为成功,从缓存中删除taskID
3.2.4.杀掉线程请求

TaskKillProcessor.process()调用WorkerManagerThread.killTaskBeforeExecuteByInstanceId()发送responseCommand给master

3.2.5.执行线程请求

TaskExecuteProcessor调用TaskCallbackService.sendAck()中的NettyRemoteChannel发送消息给master

3.2.6.重试线程

RetryReportTaskStatusThread不断从缓存读command信息,调用TaskCallbackService.sendResult()/sendAck()发送消息给master

3.2.7.注册zookeeper

WorkerRegistry将自己注册到zookeeper

原文地址:https://www.cnblogs.com/wangbin2188/p/14960457.html