Zookeeper 源码(四)Zookeeper 服务端源码

Zookeeper 源码(四)Zookeeper 服务端源码

Zookeeper 服务端架构

Zookeeper 服务端的启动入口为 QuorumPeerMain

public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    main.initializeAndRun(args);
}

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException {
    // 1. 读取配置文件
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // 2. 创建并启动历史文件清理器
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3. 集群启动
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // 4. 单机启动
        ZooKeeperServerMain.main(args);
    }
}

一、单机启动

单机启动流程

(1) 启动入口【ZooKeeperServerMain】

public static void main(String[] args) {
    ZooKeeperServerMain main = new ZooKeeperServerMain();
    main.initializeAndRun(args);
}

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException {
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    } else {
        config.parse(args);
    }

    runFromConfig(config);
}

(2) 核心启动方法【ZooKeeperServerMain】

public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // 1. 事务日志文件和快照数据文件处理器
        txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);

        // 2. 创建服务实例
        ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
                config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);

        // 省略...
        boolean needStartZKServer = true;
        if (config.getClientPortAddress() != null) {
            // 3. 创建底层通信实现,默认为 NIOServerCnxnFactory
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);

            // 4. 启动服务(核心)
            cnxnFactory.startup(zkServer);
            // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
            needStartZKServer = false;
        }
        
        // 省略...
        if (cnxnFactory != null) {
            cnxnFactory.join();
        }
        if (zkServer.isRunning()) {
            zkServer.shutdown();
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}

(3) ZooKeeperServer【ZooKeeperServer】

public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
        int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
    serverStats = new ServerStats(this);
    this.txnLogFactory = txnLogFactory;
    this.zkDb = zkDb;
    this.tickTime = tickTime;
    setMinSessionTimeout(minSessionTimeout);
    setMaxSessionTimeout(maxSessionTimeout);
}

ServerStats 记录了服务端的以下信息:

属性 说明
packetsSent Zookeeper 启动后响应的次数
packetsReceived Zookeeper 启动后接收请求的次数
maxLatency、minLatency、totalLatency Zookeeper 启动后最大、最小、总延迟时间
count Zookeeper 启动后处理客户端请求的总次数

(4) createFactory【ServerCnxnFactory】

// 创建底层通信的 ServerCnxnFactory
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                            .newInstance();
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}

(5) startup【NettyServerCnxnFactory】

@Override
public void startup(ZooKeeperServer zks, boolean startServer)
        throws IOException, InterruptedException {
    // 1. 启动 netty
    start();
    setZooKeeperServer(zks);
    if (startServer) {
        // 2. 恢复本地数据
        zks.startdata();
        // 3. 启动会话管理器和请求处理链等
        zks.startup();
    }
}

// 启动 netty
@Override
public void start() {
    LOG.info("binding to port " + localAddress);
    parentChannel = bootstrap.bind(localAddress);
}

(6) startdata【ZooKeeperServer】

// 恢复本地数据
public void startdata() throws IOException, InterruptedException {
    //check to see if zkDb is not null
    if (zkDb == null) {
        zkDb = new ZKDatabase(this.txnLogFactory);
    }
    if (!zkDb.isInitialized()) {
        loadData();
    }
}

(7) startup【ZooKeeperServer】

// 启动会话管理器、注册请求处理链
public synchronized void startup() {
    if (sessionTracker == null) {
        createSessionTracker();
    }
    startSessionTracker();
    setupRequestProcessors();

    registerJMX();

    state = State.RUNNING;
    notifyAll();
}

// 启动会话管理器
protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, 1, getZooKeeperServerListener());
}
protected void startSessionTracker() {
    ((SessionTrackerImpl)sessionTracker).start();
}

// 注册请求处理链(核心,处理客户端请求)
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

二、集群启动

集群启动流程

集群相比单机多个一个 Leader 选举的过程。Quorum 指多数,Peer 指法人,QuorumPeer 合起来表示多数派。

(1) 核心启动方法【QuorumPeerMain】

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
    }

    LOG.info("Starting quorum peer");
    try {
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        if (config.getClientPortAddress() != null) {
          cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                    config.getMaxClientCnxns(), true);
        }
        
        // 1. 初始化 QuorumPeer 并设置配置参数
        quorumPeer = new QuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  config.getDataLogDir(),
                  config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(
          config.isLocalSessionsUpgradingEnabled());
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        // 2. 设置内存数据库
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier()!=null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        // 3. 设置底层通信 ServerCnxnFactory
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

        // 4. 启动
        quorumPeer.start();
        quorumPeer.join();
    } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
    }
}

(2) start【QuorumPeer】

public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
     }
    // 1. 恢复本地数据
    loadDataBase();
    // 2. 启动 server
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    // 3. 设置选举算法
    startLeaderElection();
    // 4. 启动线程(QuorumPeer 继承自 Thread)
    super.start();
}

// 绑定端口,启动 server 端
private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        cnxnFactory.start();
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

(3) startLeaderElection【QuorumPeer】

electionType 是从配置文件的 electionAlg 设置,在 QuorumPeerConfig 中默认为 3,也就是说默认会采用 FastLeaderElection 算法进行 Leader 选举。

// 默认采用 FastLeaderElection 算法进行选举
synchronized public void startLeaderElection() {
    try {
       if (getPeerState() == ServerState.LOOKING) {
           currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
       }
    } catch(IOException e) {
       RuntimeException re = new RuntimeException(e.getMessage());
       re.setStackTrace(e.getStackTrace());
       throw re;
    }

    if (electionType == 0) {
        try {
            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = new QuorumCnxManager(this);
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

(3) 启动 QuorumPeer 线程【QuorumPeer】

@Override
public void run() {
    
    // 省略...
    try {
        while (running) {
            switch (getPeerState()) {
            // 1. Leader 选举
            case LOOKING:
                // 省略...
                try {
                   reconfigFlagClear();
                    if (shuttingDownLE) {
                       shuttingDownLE = false;
                       startLeaderElection();
                    }
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            // 2. Observer
            case OBSERVING:
                try {
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );
                } finally {
                    observer.shutdown();
                    setObserver(null);  
                   updateServerState();
                }
                break;
            // 3. Follower
            case FOLLOWING:
                try {
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                   LOG.warn("Unexpected exception",e);
                } finally {
                   follower.shutdown();
                   setFollower(null);
                   updateServerState();
                }
                break;
            // 4. Leader
            case LEADING:
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
            start_fle = Time.currentElapsedTime();
        }
    } finally {
        // 省略...
    }
}

下面两节会重点关注 Leader 选举和请求处理。

参考:

  1. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/9948790.html