海豚调度Dolphinscheduler源码分析(四)

今天来分析server模块的master,MasterServer类

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.master;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;

import javax.annotation.PostConstruct;




@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
        @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
})
public class MasterServer {

    /**
     * logger of MasterServer
     */
    private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);

    /**
     * master config
     * master 配置类注入
     */
    @Autowired
    private MasterConfig masterConfig;

    /**
     *  spring application context
     *  only use it for initialization
     *  仅用于初始化
     */
    @Autowired
    private SpringApplicationContext springApplicationContext;

    /**
     * 网络远程服务器
     * netty remote server
     */
    private NettyRemotingServer nettyRemotingServer;

    /**
     * master registry
     * master服务监听
     */
    @Autowired
    private MasterRegistry masterRegistry;

    /**
     * zk master client
     * zk 客户端curator
     */
    @Autowired
    private ZKMasterClient zkMasterClient;

    /**
     * scheduler service
     */
    @Autowired
    private MasterSchedulerService masterSchedulerService;

    /**
     * master server startup
     *
     * master server not use web service
     * @param args arguments
     */
    public static void main(String[] args) {
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
    }

    /**
     * run master server
     * @PostConstruct 会在该bean依赖注入完成后,执行该方法
     */
    @PostConstruct
    public void run(){

        //init remoting server
        //初始化 netty 服务器
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
        this.nettyRemotingServer.start();

        // register
        // 添加node/master/XXX:5678节点,并监听事件
        this.masterRegistry.registry();

        // self tolerant
        // 这个地方有个分布式锁,保持/dolphinscheduler/lock/failover/startup-masters
        // 锁,然后创建znode节点,
        // 并查询master任务和worker任务是否需要容错
        this.zkMasterClient.start();

        // scheduler start
        // 启动master scheduler
        // MasterSchedulerService,继承了Thread类
        this.masterSchedulerService.start();

        // start QuartzExecutors
        // what system should do if exception
        try {
            logger.info("start Quartz server...");
            // 开启quartzExecutor 服务
            QuartzExecutors.getInstance().start();
        } catch (Exception e) {
            try {
                QuartzExecutors.getInstance().shutdown();
            } catch (SchedulerException e1) {
                logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
            }
            logger.error("start Quartz failed", e);
        }

        /**
         *  register hooks, which are called before the process exits,在关闭程序时,jvm会先执行close方法
         */
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                close("shutdownHook");
            }
        }));

    }

    /**
     * gracefully close
     * @param cause close cause
     */
    public void close(String cause) {

        try {
            //execute only once
            if(Stopper.isStopped()){
                return;
            }

            logger.info("master server is stopping ..., cause : {}", cause);

            // set stop signal is true
            Stopper.stop();

            try {
                //thread sleep 3 seconds for thread quietly stop
                Thread.sleep(3000L);
            }catch (Exception e){
                logger.warn("thread sleep exception ", e);
            }
            //
            this.masterSchedulerService.close();
            this.nettyRemotingServer.close();
            this.masterRegistry.unRegistry();
            this.zkMasterClient.close();
            //close quartz
            try{
                QuartzExecutors.getInstance().shutdown();
                logger.info("Quartz service stopped");
            }catch (Exception e){
                logger.warn("Quartz service stopped exception:{}",e.getMessage());
            }
        } catch (Exception e) {
            logger.error("master server stop exception ", e);
            System.exit(-1);
        }
    }
}

有两个点 需要看一下:

一:Runtime.getRuntime().addShutdownHook(shutdownHook);

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                close("shutdownHook");
            }
        }));


   这个方法的含义说明:
    这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。


二:PostConstruct

public static void main(String[] args) {
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
    }

    /**
     * run master server
     * @PostConstruct 会在该bean依赖注入完成后,执行该方法
     */
    @PostConstruct
    public void run(){

        //init remoting server
        //初始化 netty 服务器
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer.start();

        // register
        // 添加node/master/XXX:5678节点,并监听事件
        this.masterRegistry.registry();
}

在main方法中调用了MasterServer.class 此时构造方法还无法完成初始化,需要借助@PostConstruct,通过run()方法,完成MasterServer类的

对象的初始化。


这个参考https://www.cnblogs.com/erlou96/p/13753824.html
原文地址:https://www.cnblogs.com/erlou96/p/13758943.html