zookeeper(五)

集群领导选举机制

多台服务器先选zxid(事务id)最大的那台,如果一样则再选serverid大的那台。

QuorumPeer#start{
        loadDataBase();
        cnxnFactory.start();        
        startLeaderElection();
        super.start();@10
}
startLeaderElection() {...
    	try {
    		currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); //开始的票都投自己  并且初始化状态 this.state = ServerState.LOOKING; ...
                this.electionAlg = createElectionAlgorithm(electionType);

createElectionAlgorithm{...
            case 3:
            qcm = createCnxnManager();
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
            listener.start();@1
            le = new FastLeaderElection(this, qcm);@2
...}
createCnxnManager—>new QuorumCnxManager->this->QuorumCnxManager{...
      queueSendMap //投票发送的map
      recvQueue //接收的queue
      ConcurrentHashMap<Long, SendWorker> senderWorkerMap //发送 queueSendMap的投票 线程。。。
       listener = new Listener();@1
...}
@1 run{...
      while((!shutdown) && (numRetries < 3)){... 
                try {
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort(); //选举端口,cfg里面它前一个是各个服务器之间的同步接口
                        addr = new InetSocketAddress(port);...
                    while (!shutdown) {
                        Socket client = ss.accept(); //等待接收投票
                        setSockOpts(client);...
                  receiveConnection(client)->handleConnection{....
                  if (sid < this.mySid) {... //互相投票 如果sid小于当前sid则关闭这个连接有sid大的重新发起连接。避免双方重复连接
                        SendWorker sw = senderWorkerMap.get(sid);
                        LOG.debug("Create new connection to server: " + sid);
                        closeSocket(sock);
                        connectOne(sid);
                  } else {
                        SendWorker sw = new SendWorker(sock, sid);
                        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
                        sw.setRecv(rw);
                        SendWorker vsw = senderWorkerMap.get(sid);
                        if(vsw != null)
                        vsw.finish();
                        senderWorkerMap.put(sid, sw);
                        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
                        sw.start();
                        rw.start();
      ...}
...}
SendWorker.run{...
      ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
      if (bq != null) {
            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
            send(b);
}
RecvWorker.run{...
      byte[] msgArray = new byte[length];
      din.readFully(msgArray, 0, length); //读数据并且添加进queue
      ByteBuffer message = ByteBuffer.wrap(msgArray);
      addToRecvQueue(new Message(message.duplicate(), sid));
...}

@2 FastLeaderElection{...
      starter{...
        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager)
...}
Messenger(QuorumCnxManager manager) {
            this.ws = new WorkerSender(manager);
            Thread t = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
            this.wr = new WorkerReceiver(manager);
            t = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
}
WorkerSender.run{
      ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
      process(m);
}
process{
      manager.toSend(m.sid, requestBuffer){}
      if (this.mySid == sid) { //如果发送者是自己就直接加入待发送队列,就不建立连接
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
      } else {
             addToSendQueue(bq, b);
             connectOne(sid);
}
WorkerReceiver.run{...
      response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);...
      if((ackstate == QuorumPeer.ServerState.LOOKING)
           && (n.electionEpoch < logicalclock.get())){
           Vote v = getVote();
           ToSend notmsg = new ToSend(ToSend.mType.notification,v.getId(),
            v.getZxid(),logicalclock.get(),self.getPeerState(),
            response.sid,v.getPeerEpoch());sendqueue.offer(notmsg);}
...}

@10 run{...
      while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    try {
                         roZkMgr.start();
                         setBCVote(null);
                         setCurrentVote(makeLEStrategy().lookForLeader());
        ...}
FastLeaderElection#lookForLeader{...
            synchronized(this){
                logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); //更新提议为自己
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications(); //启动时先投给自己 因为配置项默认都是自己
            while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){
                  Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);
                  if(n == null){
                    if(manager.haveDelivered()){ 
                        sendNotifications();
                    } else {
                        manager.connectAll();//如果还有未发送的就再连接发送
                    }
                  ...else if(validVoter(n.sid) && validVoter(n.leader)) {
                        switch (n.state) {
                          case LOOKING:
                          sendNotifications();//找出可以投票的机器 发送投票
                          recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)) //把自己的投票放到投票箱 在这之前会对比收到的和自己投点如果届号和zxid或者                    
                          //myid不如自己就忽略,如果收到的好就把自己的投票更新成收到的
                          if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) { //对比我们收到的选票和我们自己的投票看下是否有过半的
                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                } //后面紧接着返回while循环中如果是leader就走lead(),follower也是走对应的方法 完成数据同步等
...}
sendNotifications{
        for (QuorumServer server : self.getVotingView().values()) {
            long sid = server.id;

            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock.get(),
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch);
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
            sendqueue.offer(notmsg);
        }

当leader宕机后,follower会在异常中把state改为LOOKING进行重新选举,这个可以通过模拟手动关闭leader然后看日志报错的地方查看,当follwer宕机后leader会先判断是否继续满足过半机制,如果满足就不做改变,如果不满足则会把自己shutDown变为state为LOOKING然后重新选举,但是leader可能重新选举后还是原来的,或者宕机后选举不出leader如:4台的集群宕机2台后不满足过半机制。

原文地址:https://www.cnblogs.com/leifonlyone/p/12844663.html