zookeeper源码分析:选举流程和请求处理

集群启动:

  1.   QuorumPeerMain. runFromConfig()
  2.   quorumPeer.start();
    • loadDataBase();
    • cnxnFactory.start();          //网络通信交互
    • startLeaderElection();//启动选举类
    • super.start();
  3.   quorumPeer.run()
    • case LOOKING:   
      • new ReadOnlyZooKeeperServer()   //选举期间启用只读服务器,
      • FastLeaderElection.start()  //设置启动投票的后台线程
      • setCurrentVote(makeLEStrategy().lookForLeader()); while循环
    • case LEADING:        new LeaderZooKeeperServer().lead();

case LOOKING:快速选举

  1.   FastLeaderElection.start()
    • sendqueue = new LinkedBlockingQueue<ToSend>();//发出的投票队列
    • recvqueue = new LinkedBlockingQueue<Notification>();//收到的投票对列
    • this.messenger = new Messenger(manager);//投票消息后台服务线程
      • new WorkerSender(manager); //开启投票的发送线程
      • new WorkerReceiver(manager);//开启投票的接收线程
  2.  FastLeaderElection.lookForLeader():
    •   sendNotifications
    •   比较epoch
    •   totalOrderPredicate//对两个投票进行比较,规则是:选举期数 > zxid > serverid
    •   recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));/收集选票
    •   termPredicate//判断是否已经选出leader
  3. case LEADING: termPredicate
  4. ReadOnlyZooKeeperServer.startup()
    • Zookeeper.startup
    • createSessionTracker();//session管理类
    • setupRequestProcessors();//子类重写的方法,设置request处理责任链

case LEADING:

请求处理

  1.   NIOServerCnxnFactory.start()
  2.   setupRequestProcessors
    1. PrepRequestProcessor
    2. ProposalRequestProcessor
    3. CommitProcessor
    4. ToBeAppliedRequestProcessor
    5. FinalRequestProcessor
  3.   ProposalRequestProcessor.run()
    1. LearnerHandler.run();//leader初始化的时候,每accept一个connection,就会生成一个follower专门的LearnerHandler用于处理事务
    2. CommitProcessor.processRequest(request);// LinkedList queuedRequests,是已经发出的提议,要等待收到过半服务器ack的请求队列
    3. Leader.propose(request);//向所有的folllower发送提案
    4. Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
    5. p.ackSet.add(sid); //对于一个特定的提案,每个follower反馈的ACK信息
    6. if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ //超过半数

toBeApplied.add(p);//用于   ToBeAppliedRequestProcessor处理

commit(zxid);//提交给集群中所有成员

zk.commitProcessor.commit(p.request);//提交给commitProcessor处理,LinkedList committedRequests,是已经收到过半服务器ack的请求队列,以为着该请求可以被提交了。

}

4.  CommitProcessor.run();//匹配queuedRequests和committedRequests中的请求,并交给nextProcessor处理。

5.  ToBeAppliedRequestProcessor.processRequest(request);//交给nextProcessor处理,如果toBeApplied中的第一个元素与request的zxid相等,则可以移除toBeApplied队首元素

原文地址:https://www.cnblogs.com/nazhizq/p/8267695.html