Zookeeper 源码(五)Leader 选举

Zookeeper 源码(五)Leader 选举

前面学习了 Zookeeper 服务端的相关细节,其中对于集群启动而言,很重要的一部分就是 Leader 选举,接着就开始深入学习 Leader 选举。

一、选举规则

Leader 选举是保证分布式数据一致性的关键所在。当 Zookeeper 集群中的一台服务器出现以下两种情况之一时,需要进入 Leader 选举。

  • 服务器初始化启动
  • 服务器运行期间无法和 Leader 保持连接

下面以服务器启动时期的 Leader 选举为例进行分析讲解。

在集群初始化阶段,当有一台服务器 Server1 启动时,其单独无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,此时两台机器可以相互通信,每台机器都试图找到L eader,于是进入 Leader 选举过程。选举过程如下

(1) 每个Server发出一个投票。由于是初始情况,Server1 和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的 myid 和 ZXID,使用 (myid, ZXID) 来表示,此时 Server1 的投票为 (1, 0),Server2 的投票为 (2, 0),然后各自将这个投票发给集群中其他机器。

(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自 LOOKING 状态的服务器。

(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK,PK 规则如下

  • 优先检查 ZXID。ZXID 比较大的服务器优先作为 Leader。
  • 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为 Leader 服务器。

对于 Server1 而言,它的投票是 (1, 0),接收 Server2 的投票为 (2, 0),首先会比较两者的 ZXID,均为 0,再比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为 (2, 0),然后重新投票,对于 Server2 而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了 (2, 0) 的投票信息,此时便认为已经选出了 Leader。

(5) 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为 FOLLOWING,如果是 Leader,就变更为 LEADING。

Leader 选举

由上面规则可知,通常那台服务器上的数据越新(ZXID 会越大),其成为 Leader 的可能性越大,也就越能够保证数据的恢复。如果 ZXID 相同,则 SID 越大机会越大。

二、Leader 选举实现

2.1 几个基本概念

(1) 服务器的状态

服务器具有四种状态,分别是 LOOKING、FOLLOWING、LEADING、OBSERVING。

  • LOOKING 寻找 Leader 状态。当服务器处于该状态时,它会认为当前集群中没有 Leader,因此需要进入 Leader 选举状态。
  • FOLLOWING 跟随者状态。表明当前服务器角色是 Follower。
  • LEADING 领导者状态。表明当前服务器角色是 Leader。
  • OBSERVING 观察者状态。表明当前服务器角色是 Observer。

(2) Vote 数据结构

属性 说明
id 被推举的 Leader 的 sid(myid)
zxid 被推举的 Leader 的事务 id
electionEpoch 用来判断多个投票是否在同一轮选举周期中,每进行一轮 Leader 选举自增 1
peerEpoch 被推举的 Leader 的 epoch
state 服务器的状态,有 LOOKING、FOLLOWING、LEADING、OBSERVING

2.2 QuorumCnxManager(网络I/O)

每台服务器在启动的过程中,会启动一个 QuorumPeerManager,负责各台服务器之间的底层 Leader 选举过程中的网络通信。

(1) 初始化 QuorumCnxManager 【QuorumPeer】

protected Election createElectionAlgorithm(int electionAlgorithm){
    qcm = new QuorumCnxManager(this);
    QuorumCnxManager.Listener listener = qcm.listener;
    if(listener != null){
        listener.start();
        FastLeaderElection fle = new FastLeaderElection(this, qcm);
        fle.start();
        le = fle;
    } 
}

在 createElectionAlgorithm 会启动 QuorumCnxManager,本小节重点关注 QuorumCnxManager 干了那些事。

public QuorumCnxManager(QuorumPeer self) {
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
    this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
    
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }
    
    this.self = self;

    // Starts listener thread that waits for connection requests 
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}

(2) 消息队列

QuorumCnxManager 内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器,除接收队列以外,其他队列都按照 SID 分组形成队列集合。

  • recvQueue 消息接收队列,用于存放那些从其他服务器接收到的消息。
  • queueSendMap 消息发送队列,用于保存那些待发送的消息,按照 SID 进行分组。
  • senderWorkerMap 发送器集合,每个 SenderWorker 消息发送器,都对应一台远程 Zookeeper 服务器,负责消息的发送,也按照 SID 进行分组。
  • lastMessageSent 最近发送过的消息,为每个 SID 保留最近发送过的一个消息。

(3) Listener

可以看到 Listener 初始化了一个 ServerSocket,默认端口为 3888 进行底层 Leader 选举通信。

public class Listener extends ZooKeeperThread {
    @Override
    public void run() {
        int numRetries = 0;
        InetSocketAddress addr;

        while((!shutdown) && (numRetries < 3)){
            try {
                // 1. 建立 ServerSocket
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                if (self.getQuorumListenOnAllIPs()) {
                    int port = self.getElectionAddress().getPort();
                    addr = new InetSocketAddress(port);
                } else {
                    self.recreateSocketAddresses(self.getId());
                    addr = self.getElectionAddress();
                }
                LOG.info("My election bind port: " + addr.toString());
                setName(addr.toString());
                ss.bind(addr);
                while (!shutdown) {
                    Socket client = ss.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request "
                            + client.getRemoteSocketAddress());
                    // 2. 处理请求 Socket
                    receiveConnection(client);
                    numRetries = 0;
                }
            } catch (IOException e) {
                if (shutdown) {
                    break;
                }
                LOG.error("Exception while listening", e);
                numRetries++;
                try {
                    ss.close();
                    Thread.sleep(1000);
                } catch (IOException ie) {
                    LOG.error("Error closing server socket", ie);
                } catch (InterruptedException ie) {
                    LOG.error("Interrupted while sleeping. " +
                        "Ignoring exception", ie);
                }
            }
        }
        LOG.info("Leaving listener");
        if (!shutdown) {
            LOG.error("As I'm leaving the listener thread, "
                    + "I won't be able to participate in leader "
                    + "election any longer: "
                    + self.getElectionAddress());
        } else if (ss != null) {
            // Clean up for shutdown.
            try {
                ss.close();
            } catch (IOException ie) {
                // Don't log an error for shutdown.
                LOG.debug("Error closing server socket", ie);
            }
        }
    }
}

为了避免两台机器之间重复地创建 TCP 连接,Zookeeper 只允许 SID 大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的 SID 值来判断是否接收连接请求,如果当前服务器发现自己的 SID 更大,那么会断开当前连接,然后自己主动和远程服务器建立连接。一旦连接建立,就会根据远程服务器的 SID 来创建相应的消息发送器 SendWorker 和消息接收器 RecvWorker,并启动。

每个 RecvWorker 只需要不断地从这个 TCP 连接中读取消息,并将其保存到 recvQueue 队列中。每个 SendWorker 只需要不断地从对应的消息发送队列中获取出一个消息发送即可,同时将这个消息放入 lastMessageSent 中。

2.3 FastLeaderElection(选举算法核心)

  • 外部投票:特指其他服务器发来的投票。
  • 内部投票:服务器自身当前的投票。
  • 选举轮次:Zookeeper 服务器 Leader 选举的轮次,即 logicalclock。
  • PK:对内部投票和外部投票进行对比来确定是否需要变更内部投票。

FastLeaderElection选举流程

(1) 初始化 FastLeaderElection

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    // 初始化 sendqueue 和 recvqueue
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

Messenger(QuorumCnxManager manager) {
    // 1. WorkerSender 选票接收器,负责从 QuorumCnxManager 接收选票后保存到 recvqueue 中
    this.ws = new WorkerSender(manager);
    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);

    // 2. WorkerReceiver 选票发送器,负责从 sendqueue 中获取待发送的选票并传递给 QuorumCnxManager
    this.wr = new WorkerReceiver(manager);
    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}

在 FastLeaderElection 中有几个属性需要我们重点关注一下:

  • sendqueue 选票发送队列,用于保存待发送的选票。

  • recvqueue 选票接收队列,用于保存接收到的外部投票。

  • WorkerReceiver 选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到 recvqueue 中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。

  • WorkerSender 选票发送器,不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中。

(2) lookForLeader(核心算法)

public Vote lookForLeader() throws InterruptedException {
    // 省略...
    if (self.start_fle == 0) {
       self.start_fle = Time.currentElapsedTime();
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        int notTimeout = finalizeWait;

        // 1. 启动时先投自己一票并广播给其它服务器
        synchronized(this){
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        sendNotifications();

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            // 2. 获取其它服务器发送过来的选票
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            // 3. 如果没有选票,则先判断是否存在连接,如存在则是先投自己一票,如没则立即连接
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();
                }
                // 省略...
            } 
            // 4. 收到投票信息,根据 LOOKING、OBSERVING、FOLLOWING、LEADING 分别处理
            else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
                switch (n.state) {
                // 5. LOOKING 时才会进行选举
                case LOOKING:
                    // 5.1 判断投票是否过时,如果自己过时就清除之前已经接收到的信息
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        // 重新发起投票,PK 一下:如果收到的票据大则更新票据,否则仍投自己一票
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            // 更新票据
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            // 仍投自己一票
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    // 5.2 收到的票据过时则直接忽略
                    } else if (n.electionEpoch < logicalclock.get()) {
                        break;
                    // 5.3 epoch 相等则要 PK
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    // 5.4 统计谁的投票超过半数,就成为 Leader
                    if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // 5.5 再等一会儿(200ms),看是否有新的投票
                        while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 5.6 如果没有发生新的投票,则结束选举过程则结束选举,修改状态为 LEADING
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
                            Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                // 6. OBSERVING 不能与投票
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                // 7. FOLLOWING、LEADING 说明已存在 Leader。
                //    可能在同一轮选举中,也可能是之前就存在的 Leader ,则不在同一轮选举中
                case FOLLOWING:
                case LEADING:
                    // 7.1 在同一轮选举中,则收集所有的选票放到 recvset 中
                    //     如有半数支持则更新状态退出选举
                    if(n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        if(termPredicate(recvset, new Vote(n.leader,
                                        n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                        && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    // 7.2 如果收到的 logicalclock 与当前不相等,那说明在另一个选举中已经有了结果(Leader 已存在)
                    //     收集所有的选票到 outofelection 中,如有半数支持则更新状态退出选举
                    outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                    if (termPredicate(outofelection, new Vote(n.leader,
                            IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                            && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        // 省略...
    }
}

Leader 选举有两个函数需要重点关注一下,totalOrderPredicate() 对两张选票进行 PK,termPredicate() 判断投票是否可以结束了。

(3) totalOrderPredicate(PK 选票)

// id(sid) zxid(事务id) epoch(选举轮数,每更新一次 Leader 自增 1)
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, 
        long curId, long curZxid, long curEpoch) {
    /*
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

(4) termPredicate(结束投票)

// 票据占多数则结束选举
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 将支持 vote 的票据放到 set 集合中(Set 可去重)
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey());
        }
    }
    
    // self.getQuorumVerifier().containsQuorum(set)
    return voteSet.hasAllQuorums();
}

参考:

  1. 《Zookeeper的Leader选举》:https://www.cnblogs.com/leesf456/p/6107600.html
  2. 《ZooKeeper之FastLeaderElection算法详解》:https://www.cnblogs.com/wally/p/4477042.html
  3. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/9950285.html