zk observer 节点

zk 集群中有3种节点:leader,follower,observer,其中 observer 节点没有投票权,即它不参与选举和写请求的投票。

比较 Follower 和 Observer 的代码:

//void org.apache.zookeeper.server.quorum.Observer.processPacket(QuorumPacket qp) throws IOException
protected void processPacket(QuorumPacket qp) throws IOException{
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        LOG.warn("Ignoring proposal");
        break;
    case Leader.COMMIT:
        LOG.warn("Ignoring commit");            
        break;            
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Observer started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        ((ObserverZooKeeperServer)zk).sync();
        break;
    case Leader.INFORM:            
        TxnHeader hdr = new TxnHeader();
        Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
        Request request = new Request (null, hdr.getClientId(), 
                                       hdr.getCxid(),
                                       hdr.getType(), null, null);
        request.txn = txn;
        request.hdr = hdr;
        ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
        obs.commitRequest(request);            
        break;
    }
}
// void org.apache.zookeeper.server.quorum.Follower.processPacket(QuorumPacket qp) throws IOException
protected void processPacket(QuorumPacket qp) throws IOException{
    switch (qp.getType()) {
    case Leader.PING:            
        ping(qp);            
        break;
    case Leader.PROPOSAL:            
        TxnHeader hdr = new TxnHeader();
        Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
        if (hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                    + Long.toHexString(hdr.getZxid())
                    + " expected 0x"
                    + Long.toHexString(lastQueued + 1));
        }
        lastQueued = hdr.getZxid();
        fzk.logRequest(hdr, txn);
        break;
    case Leader.COMMIT:
        fzk.commit(qp.getZxid());
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Follower started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        fzk.sync();
        break;
    }
}

可以看到,observer 不处理 leader 的 proposal 和 commit,但是它会接收 leader 的 inform,所以它是能听到 write 的。

// void org.apache.zookeeper.server.quorum.Leader.processAck(long sid, long zxid, SocketAddress followerAddr)
// 代码片段
commit(zxid); // 向 followers 发送 COMMIT
inform(p); // 向 observers 发送 INFORM
zk.commitProcessor.commit(p.request); // leader 提交
原文地址:https://www.cnblogs.com/allenwas3/p/9389506.html