zookeeper ZAB协议 Follower和leader源码分析

Follower处理逻辑

void followLeader() throws InterruptedException {
    //...
    try {
        //获取leader server
        QuorumServer leaderServer = findLeader();            
        try {
            //主动向leader发起连接,TCP连接
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            //发送follower的,包括last zxid sid,并从leader读取最新的zxid,再把last zxid发送给leader。返回leader zxid
            //1. 首先follower发送自己的last zxid和sid,目的是为了leader确认epoch。FOLLOWERINFO
            //2. leader返回确认后的epoch。LEADERINFO
            //3. follower再次发送自己的最新zxid。ACKEPOCH
            //4. 返回new epoch
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            //(注释有点问题,是判断的epoch而不是zxid)
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                        + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            //和leader开始同步,首先收到一条消息,判断DIFF,TRUNC,SNAP
            //
            syncWithLeader(newEpochZxid);                
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        }
}

Leader处理逻辑

void lead() throws IOException, InterruptedException {
    try {
        self.tick = 0;
        //初始化,清理旧的session和创建状态机树
        zk.loadData();
        
        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // 建立监听,同时处理和follower的发现,同步阶段逻辑
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                null, null);
        
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        try {
            //等待NEWLEADER_ACK,说明已经同步完成
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } 
        
        //开始服务,先发送UPTODATA,
        startZkServer();
}

发现阶段逻辑处理

建立LearnerCnxAcceptor监听后,会启动LearnerHandler线程

public void run() {
    try {
        //读取FOLLOWERINFO,里面包含了follower的sid和peerLastZxid
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        byte learnerInfoData[] = qp.getData();
        
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        peerLastZxid = ss.getLastZxid();
        
        /* the default to send to the follower */
        //默认为全量同步
        int packetToSend = Leader.SNAP;
        long zxidToSend = 0;
        long leaderLastZxid = 0;
        /** the packets that the follower needs to get updates from **/
        long updates = peerLastZxid;
        
        ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();   
            //读取缓存队列中最小的zxid,所有需要同步的最小值
            final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
            //读取缓存队列中最大的zxid,所有需要同步的最大值
            final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
            //获取当前leader的所有日志CommittedLog
            //根据上面可知,只会同步到和leader最后一个已提交日志
            //不需要同步,发送一个空的DIFF
            if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                // Follower is already sync with us, send empty diff
                LOG.info("leader and follower are in sync, zxid=0x{}",
                        Long.toHexString(peerLastZxid));
                packetToSend = Leader.DIFF;
                zxidToSend = peerLastZxid;
            } else if (proposals.size() != 0) {
                //minCommittedLog <= peerLastZxid <=maxCommittedLog,进行DIFF同步
                if ((maxCommittedLog >= peerLastZxid)
                        && (minCommittedLog <= peerLastZxid)) {
                        //这里有一种特殊情况,需要先TRUNK,再DIFF同步
                        //leader的日志是50001,50002,60001,60002
                        //follower的日志是50003
                        //把需要同步的数据加入发送队列
                } else if (peerLastZxid > maxCommittedLog) {
                    //大于maxCommittedLog,直接TRUCK
                    packetToSend = Leader.TRUNC;
                } else {
                    LOG.warn("Unhandled proposal scenario");
                }
            }    
            leaderLastZxid = leader.startForwarding(this, updates);

        } 
         QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                ZxidUtils.makeZxid(newEpoch, 0), null, null);

        //NEWLEADER报文加入发送队列,这时还没有发送任何报文
        queuedPackets.add(newLeaderQP);
        bufferedOutput.flush();
        //Need to set the zxidToSend to the latest zxid
        if (packetToSend == Leader.SNAP) {
            zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
        }
        //发送SNAP,DIFF,或者TRUNK
        oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
        bufferedOutput.flush();
        
        /*如果不是DIFF和TRUNK,直接发送全量信息 */
        if (packetToSend == Leader.SNAP) {
            leader.zk.getZKDatabase().serializeSnapshot(oa);
            oa.writeString("BenWasHere", "signature");
        }
        bufferedOutput.flush();
        
        // 开始发包
        new Thread() {
            public void run() {
                Thread.currentThread().setName(
                        "Sender-" + sock.getRemoteSocketAddress());
                try {
                    //发送同步报文
                    sendPackets();
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption",e);
                }
            }
        }.start();
        //等待NEWLEADER_ACK,等到了NEWLEADER_ACK说明已经同步完成
        leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
        
        //等待大多数同步完成,leader starts up
        synchronized(leader.zk){
            while(!leader.zk.isRunning() && !this.isInterrupted()){
            leader.zk.wait(20);
        }
        //发送UPTODATE报文,learn开始服务
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
        //正常处理流程
        while (true) {
            switch (qp.getType()) {
                //处理propose,commit
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received ACK from Observer  " + this.sid);
                    }
                }
                syncLimitCheck.updateAck(qp.getZxid());
                leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
                //和follower保持session信息
            case Leader.PING:
                // Process the touches
                ByteArrayInputStream bis = new ByteArrayInputStream(qp
                        .getData());
                DataInputStream dis = new DataInputStream(bis);
                while (dis.available() > 0) {
                    long sess = dis.readLong();
                    int to = dis.readInt();
                    leader.zk.touch(sess, to);
                }
                break;
            case Leader.REVALIDATE:
                //延长session时间
            case Leader.REQUEST:
                //加入处理队列
            default:
            }
        }
    } 
}

总结

源码差不多看完了,整体挺复杂的,这里总结一下发现和同步的过程。

image

  • newEpoch:提供服务的epoch
  • acceptedEpoch:没有确认的epoch,LEADERINFO阶段
  • currentEpoch:确认的epoch,接收到UPTODATE后
  • lastLoggedZxid:最后处理的日志(包括提交,未提交)
原文地址:https://www.cnblogs.com/biterror/p/7147448.html