zookeeper原理之Leader选举完成之后的处理逻辑

通过 lookForLeader 方法选举完成以后,会设置当前节点的 PeerState,要么为 Leading、要么就是 FOLLOWING、或者 OBSERVING到这里,只是表示当前的 leader 选出来了,但是 QuorumPeer.run 方法里面还没执行完,我们再回过头看看后续的处理过程。
QuorumPeer.run
分别来看看 case 为 FOLLOWING 和 LEADING,会做什么事情:
 @Override
    public void run() {
        setName("QuorumPeer" + "[myid=" + getId() + "]" + cnxnFactory.getLocalAddress());
        while (running) {
            switch (getPeerState()) {
                case LOOKING:
                case OBSERVING:
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
            }
            //省略部分代码
        }
 
makeFollower
初始化一个 Follower 对象,构建一个 FollowerZookeeperServer,表示 follower 节点的请求处理服务。
 protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
        return new Follower(this, new FollowerZooKeeperServer(logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
 
follower.followLeader();
protected Follower
    makeFollower(FileTxnSnapLog logFactory) throws IOException {
        return new Follower(this, new FollowerZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
    follower.followLeader();
    void followLeader() throws InterruptedException {
        //省略部分代码
        try {
            //根据 sid 找到对应 leader,拿到 lead 连接信息
            QuorumServer leaderServer = findLeader();
            try {//连接到 Leader
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                //将 Follower 的 zxid 及 myid 等信息封装好发送到 Leader,同步 epoch。 也就是意味着接下来 follower 节点只同步新 epoch 的数据信息
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                //如果 leader 的 epoch 比当前 follow 节点的 poch 还小,抛异常
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }//和 leader 进行数据同步
                syncWithLeader(newEpochZxid);
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) { //接受 Leader 消息,执行并反馈给 leader,线程在此自旋
                    readPacket(qp);//从 leader 读取数据包
                    processPacket(qp);//处理 packet
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                try {
                    sock.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        } 
    }
 
makeLeader
初始化一个 Leader 对象,构建一个 LeaderZookeeperServer,用于表示leader 节点的请求处理服务。
 protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
        return new Leader(this, new LeaderZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
leader.lead();
在 Leader 端, 则通过 lead()来处理与 Follower 的交互leader 和 follower 的处理逻辑这里就不再展开来分析,朋友们可以自己去分析并且画出他们的交互图。
 
原文地址:https://www.cnblogs.com/47Gamer/p/13553730.html