Zookeeper服务器启动

1 单机版启动

单机版启动大致可以分为如下步骤:

  • 配置文件解析
  • 初始化数据管理器
  • 初始化网络IO管理器
  • 数据恢复
  • 对外服务

image

1.1 预启动

QuorumPeerMain作为启动类,该类会进行如下操作:

  • 解析zoo.cfg配置文件
  • 清理历史数据
  • 根据配置文件中服务器地址列表的数量,判断是单机还是集群启动
  • 创建ZooKeeperServerMain真正启动ZK服务器

注意:QuorumPeerMain作为启动类,是在zkServer.sh脚本中进行的配置。

1.2 初始化并启动

ZooKeeperServerMain作为真正启动单机版ZK服务器的核心实现,ZK服务器的启动包括两个方面:

  • 网络IO管理(ServerCnxnFactory):监听2181端口,用于处理客户端网络请求处理
  • 服务启动(ZookeeperServer)

    ① 加载数据
    ② 应用会话管理
    ③ 创建请求处理链
    ④ 将ServerCnxnFactory注册到ZookeeperServer中,以便ZookeeperServer可以利用ServerCnxnFactory获取到客户端请求。

Zookeeper使用链式方式处理客户端请求,单机版启动时,初始化的处理链如下:
image

2 集群版启动

image

QuormPeer类是集群启动过的核心实现:

  • 启动ServerCnxnFactory接收客户端请求
  • 初始化Leader选举算法

    根据zoo.cfg文件中的electionArg属性的值,创建相应的选举算法,默认提供了3中算法:

    • LeaderElection(已废弃)
    • AuthFastLeaderElection(已废弃)
    • FastLeaderElection:
  • 启动QuormPeer线程

    选举流程、执行Leader流程、Follower流程、Observer流程

2.1 初始化选举算法

QuorumPeer中会首先获取选举算法,代码如下:

// 代码已经省略所有try{} catch{}部分
synchronized public void startLeaderElection() {
    currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    for (QuorumServer p : getView().values()) {
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) { /* 异常省略 */ }
    if (electionType == 0) {
        udpSocket = new DatagramSocket(myQuorumAddr.getPort());
        responder = new ResponderThread();
        responder.start();
    }
    this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    //此处使用工厂类更加适合
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

2.2 异步线程根据服务器角色进入相应流程

集群启动会在QuormPeer的异步线程中循环判断服务器的状态,以便执行服务器相应的流程,简易代码如下:

while (running) {
    switch (getPeerState()) {
        case LOOKING:
            setCurrentVote(makeLEStrategy().lookForLeader());
            break;
        case OBSERVING:
            setObserver(makeObserver(logFactory));
            observer.observeLeader();
            break;
        case FOLLOWING:
            setFollower(makeFollower(logFactory));
            follower.followLeader();
            break;
        case LEADING:
            setLeader(makeLeader(logFactory));
            leader.lead();
            setLeader(null);
            break;
        }
    }
}

3 选举算法

待补充

原文地址:https://www.cnblogs.com/wolfdriver/p/10657528.html