Zookeeper 源码分析-启动

Zookeeper 源码分析-启动

博客分类:
 

   本文主要介绍了zookeeper启动的过程

   运行zkServer.sh start命令可以启动zookeeper。入口的main函数在类中QuorumPeerMain。

   main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper。

Java代码  收藏代码
  1. public class QuorumPeerMain {  
  2.      protected QuorumPeer quorumPeer;  
  3.   
  4.     /** 
  5.      * To start the replicated server specify the configuration file name on 
  6.      * the command line. 
  7.      * @param args path to the configfile 
  8.      */  
  9.     public static void main(String[] args) {  
  10.         QuorumPeerMain main = new QuorumPeerMain();  
  11.         main.initializeAndRun(args);  
  12.     }  
  13.   
  14.     protected void initializeAndRun(String[] args)  
  15.         throws ConfigException, IOException  
  16.     {  
  17.         runFromConfig(config);  
  18.     }  
  19.       
  20.     public void runFromConfig(QuorumPeerConfig config) throws IOException {  
  21.       LOG.info("Starting quorum peer");  
  22.       try {  
  23.           NIOServerCnxn.Factory cnxnFactory =  
  24.               new NIOServerCnxn.Factory(config.getClientPortAddress(),  
  25.                       config.getMaxClientCnxns());  
  26.     
  27.           quorumPeer = new QuorumPeer();  
  28.           quorumPeer.setClientPortAddress(config.getClientPortAddress());  
  29.           quorumPeer.setTxnFactory(new FileTxnSnapLog(  
  30.                       new File(config.getDataLogDir()),  
  31.                       new File(config.getDataDir())));  
  32.           quorumPeer.setQuorumPeers(config.getServers());  
  33.           quorumPeer.setElectionType(config.getElectionAlg());  
  34.           quorumPeer.setMyid(config.getServerId());  
  35.           quorumPeer.setTickTime(config.getTickTime());  
  36.           quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());  
  37.           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());  
  38.           quorumPeer.setInitLimit(config.getInitLimit());  
  39.           quorumPeer.setSyncLimit(config.getSyncLimit());  
  40.           quorumPeer.setQuorumVerifier(config.getQuorumVerifier());  
  41.           quorumPeer.setCnxnFactory(cnxnFactory);  
  42.           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));  
  43.           quorumPeer.setLearnerType(config.getPeerType());  
  44.     
  45.           quorumPeer.start();  
  46.           quorumPeer.join();  
  47.       } catch (InterruptedException e) {  
  48.           // warn, but generally this is ok  
  49.           LOG.warn("Quorum Peer interrupted", e);  
  50.       }  
  51.     }  
  52. }  

    在QuorumPeer的start函数中,先调用了loadDataBase方法用于恢复数据。启动与client交互的线程,并

Java代码  收藏代码
  1. @Override  
  2. public synchronized void start() {  
  3.     try {  
  4.         zkDb.loadDataBase();  
  5.     } catch(IOException ie) {  
  6.         LOG.fatal("Unable to load database on disk", ie);  
  7.         throw new RuntimeException("Unable to run quorum server ", ie);  
  8.     }  
  9.     cnxnFactory.start(); //用于处理与client的交互          
  10.     startLeaderElection();//开始选举算法  
  11.     super.start();  
  12. }  

    调用loadDatabase从磁盘加载数据到内存

Java代码  收藏代码
  1. public long loadDataBase() throws IOException {  
  2.     PlayBackListener listener=new PlayBackListener(){  
  3.         public void onTxnLoaded(TxnHeader hdr,Record txn){  
  4.             Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),  
  5.                     null, null);  
  6.             r.txn = txn;  
  7.             r.hdr = hdr;  
  8.             r.zxid = hdr.getZxid();  
  9.             addCommittedProposal(r);  
  10.         }  
  11.     };  
  12.       
  13.     long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);  
  14.     initialized = true;  
  15.     return zxid;  
  16. }  

    调用QuorumPeer的run函数,按照peer的state做不同的处理

Java代码  收藏代码
  1. @Override  
  2.    public void run() {  
  3.        setName("QuorumPeer:" + cnxnFactory.getLocalAddress());  
  4.   
  5.        try {  
  6.            /* 
  7.             * Main loop 
  8.             */  
  9.            while (running) {  
  10.                switch (getPeerState()) {  
  11.                case LOOKING:  
  12.                    try {  
  13.                        LOG.info("LOOKING");  
  14.                        setCurrentVote(makeLEStrategy().lookForLeader());  
  15.                    } catch (Exception e) {  
  16.                        LOG.warn("Unexpected exception",e);  
  17.                        setPeerState(ServerState.LOOKING);  
  18.                    }  
  19.                    break;  
  20.                case OBSERVING:  
  21.                    try {  
  22.                        LOG.info("OBSERVING");  
  23.                        setObserver(makeObserver(logFactory));  
  24.                        observer.observeLeader();  
  25.                    } catch (Exception e) {  
  26.                        LOG.warn("Unexpected exception",e );                          
  27.                    } finally {  
  28.                        observer.shutdown();  
  29.                        setObserver(null);  
  30.                        setPeerState(ServerState.LOOKING);  
  31.                    }  
  32.                    break;  
  33.                case FOLLOWING:  
  34.                    try {  
  35.                        LOG.info("FOLLOWING");  
  36.                        setFollower(makeFollower(logFactory));  
  37.                        follower.followLeader();  
  38.                    } catch (Exception e) {  
  39.                        LOG.warn("Unexpected exception",e);  
  40.                    } finally {  
  41.                        follower.shutdown();  
  42.                        setFollower(null);  
  43.                        setPeerState(ServerState.LOOKING);  
  44.                    }  
  45.                    break;  
  46.                case LEADING:  
  47.                    LOG.info("LEADING");  
  48.                    try {  
  49.                        setLeader(makeLeader(logFactory));  
  50.                        leader.lead();  
  51.                        setLeader(null);  
  52.                    } catch (Exception e) {  
  53.                        LOG.warn("Unexpected exception",e);  
  54.                    } finally {  
  55.                        if (leader != null) {  
  56.                            leader.shutdown("Forcing shutdown");  
  57.                            setLeader(null);  
  58.                        }  
  59.                        setPeerState(ServerState.LOOKING);  
  60.                    }  
  61.                    break;  
  62.                }  
  63.            }  
  64.        }    
  65.    } 
  66. http://blog.csdn.net/xhh198781/article/details/10949697
原文地址:https://www.cnblogs.com/shaohz2014/p/5040763.html