通过 lookForLeader 方法选举完成以后,会设置当前节点的 PeerState,要么为 Leading、要么就是 FOLLOWING、或者 OBSERVING到这里,只是表示当前的 leader 选出来了,但是 QuorumPeer.run 方法里面还没执行完,我们再回过头看看后续的处理过程。
QuorumPeer.run
分别来看看 case 为 FOLLOWING 和 LEADING,会做什么事情:
@Override public void run() { setName("QuorumPeer" + "[myid=" + getId() + "]" + cnxnFactory.getLocalAddress()); while (running) { switch (getPeerState()) { case LOOKING: case OBSERVING: case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } //省略部分代码 }
makeFollower
初始化一个 Follower 对象,构建一个 FollowerZookeeperServer,表示 follower 节点的请求处理服务。
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); }
follower.followLeader();
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); } follower.followLeader(); void followLeader() throws InterruptedException { //省略部分代码 try { //根据 sid 找到对应 leader,拿到 lead 连接信息 QuorumServer leaderServer = findLeader(); try {//连接到 Leader connectToLeader(leaderServer.addr, leaderServer.hostname); //将 Follower 的 zxid 及 myid 等信息封装好发送到 Leader,同步 epoch。 也就是意味着接下来 follower 节点只同步新 epoch 的数据信息 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); //如果 leader 的 epoch 比当前 follow 节点的 poch 还小,抛异常 long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); }//和 leader 进行数据同步 syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { //接受 Leader 消息,执行并反馈给 leader,线程在此自旋 readPacket(qp);//从 leader 读取数据包 processPacket(qp);//处理 packet } } catch (Exception e) { LOG.warn("Exception when following the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); } // clear pending revalidations pendingRevalidations.clear(); } } finally { zk.unregisterJMX((Learner)this); } }
makeLeader
初始化一个 Leader 对象,构建一个 LeaderZookeeperServer,用于表示leader 节点的请求处理服务。
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException { return new Leader(this, new LeaderZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); }
leader.lead();
在 Leader 端, 则通过 lead()来处理与 Follower 的交互leader 和 follower 的处理逻辑这里就不再展开来分析,朋友们可以自己去分析并且画出他们的交互图。