使用Zookeeper 实现选主从或者分布式锁

概述


1.zookeeper实现选主从的原理

2.zookeeper实现选主从代码

选主从的原理

在分布式场景中经常会用到zookeeper,常用的有利用zookeeper来选举主从,管理节点状态,或者使用zookeeper来实现分布式锁;具体原理是什么呢?

这里只将实现方式的一种,根据编号大小来实现:(其他方式有通过创建节点实现的,等等)

所有的节点向zk的某个路径下注册,创建临时节点(临时节点,zookeeper会主动监控,一旦连接失效,zk会删除该临时节点),每个注册者创建时会有一个编号,每次选举编号最小的为主节点,其他节点就为从节点,从节点会监控主节点是否失效(怎么监控? zk有事件,监听事件的状态变化,然后重新选举),为避免“惊群”现象,每个节点只监控比它小的一个临近节点。

代码实现

选主从代码:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Created by  on 17/11/23.
 */
public class ChooseMaster implements Watcher {


    private ZooKeeper zk=null;
    private String selfPath=null;
    private String waitPath=null;
    private static final String ZK_ROOT_PATH="/zkmaster";  //选主从的跟路径
    private static final String ZK_SUB_PATH=ZK_ROOT_PATH+"/register";
    private CountDownLatch successCountDownLatch=new CountDownLatch(1);
    private CountDownLatch threadCompleteLatch=null;

    public ChooseMaster(CountDownLatch countDownLatch){

        this.threadCompleteLatch=countDownLatch;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {  //监听事件

        Event.KeeperState keeperState=watchedEvent.getState(); 
        Event.EventType eventType=watchedEvent.getType();
        if(Event.KeeperState.SyncConnected==keeperState){  //建立连接

            if(Event.EventType.None==eventType){

                System.out.println(Thread.currentThread().getName()+" connected to server");
                successCountDownLatch.countDown();
            }else if(Event.EventType.NodeDeleted==eventType && watchedEvent.getPath().equals(waitPath)){ //监测到节点删除,且为当前线程的等待节点

                System.out.println(Thread.currentThread().getName() + " some node was deleted,I'll check if I am the minimum node");
                try{

                    if(checkMinPath()){  //判断自己是不是最小的编号

                        processMasterEvent();  //处理主节点做的事情
                    }
                }catch (Exception e){

                    e.printStackTrace();
                }

            }

        }else if(Event.KeeperState.Disconnected==keeperState){  //连接断开

            System.out.println(Thread.currentThread().getName()+ " release connection");
        }else if(Event.KeeperState.Expired==keeperState){  //超时

            System.out.println(Thread.currentThread().getName()+ " connection expire");
        }
    }


    public void chooseMaster() throws Exception {

        selfPath=zk.create(ZK_SUB_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  //创建临时节点
        System.out.println(Thread.currentThread().getName()+ "create path "+selfPath);
        if(checkMinPath()){  //判断是否为主节点

            processMasterEvent();  
        }
    }

    public boolean createPersistPath(String path,String data,boolean needWatch) throws KeeperException, InterruptedException {

        if(zk.exists(path,needWatch)==null){

            zk.create(path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            System.out.println(Thread.currentThread().getName()+" create persist path "+path);
        }
        return true;


    }

    public void createConnection(String connection,int timeout) throws IOException, InterruptedException {

        zk=new ZooKeeper(connection,timeout,this);
        successCountDownLatch.await();

    }

    private void processMasterEvent() throws KeeperException, InterruptedException {

        if(zk.exists(selfPath,false)==null){

            System.out.println(Thread.currentThread().getName()+ " selfnode is not exist "+ selfPath);
            return;
        }
        System.out.println(Thread.currentThread().getName()+ " I'm the master,now do work");
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName()+" Finish do work,leave master");
        //zk.delete(selfPath,-1);
        releaseConnection();
        threadCompleteLatch.countDown();

    }

    private void releaseConnection() {

        if(zk!=null){

            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private boolean checkMinPath() throws Exception {

//获取根节点下的所有子节点,进行排序,取当前路径的index,如果排在第一个,则为主,否则检测前一个节点是否存在,不存在则重新选举最小的节点 List
<String> subNodes=zk.getChildren(ZK_ROOT_PATH,false); System.out.println(subNodes.toString()); Collections.sort(subNodes); System.out.println(Thread.currentThread().getName()+" tmp node index is "+selfPath.substring(ZK_ROOT_PATH.length()+1)); int index=subNodes.indexOf(selfPath.substring(ZK_ROOT_PATH.length()+1)); switch (index){ case -1: System.out.println(Thread.currentThread().getName()+" create node is not exist"); return false; case 0: System.out.println(Thread.currentThread().getName()+" I'm the master"); return true; default: waitPath=ZK_ROOT_PATH+"/"+subNodes.get(index-1); System.out.println(Thread.currentThread().getName()+" the node before me is "+waitPath); try{ zk.getData(waitPath,true,new Stat()); return false; }catch (Exception e){ if(zk.exists(waitPath,false)==null){ System.out.println(Thread.currentThread().getName()+" the node before me is not exist,now is me"); return checkMinPath(); }else{ throw e; } } } } }

测试代码:

import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.concurrent.*;

/**
 * Created by  on 17/11/23.
 */
public class MasterChoiceTest {

    private final static String ZK_CONNECT_STRING="127.0.0.1:2181";
    private final static String ZK_ROOT_PATH="/zkmaster";
    private final static int SESSION_TIMEOUT=10000;
    private static final int THREAD_NUM=5;
    private static int threadNo=0;
    private static ExecutorService executorService=null;
    private static CountDownLatch threadCompleteLatch=new CountDownLatch(THREAD_NUM);

    public static void main(String[] args){

        executorService= Executors.newFixedThreadPool(THREAD_NUM, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {

                String name=String.format("The %s thread",++threadNo);
                Thread ret=new Thread(Thread.currentThread().getThreadGroup(),r,name,0);
                ret.setDaemon(false);
                return ret;
            }
        });
        if(executorService!=null){

            startProcess();
        }
    }

    private static void startProcess() {

        Runnable masterChoiceTest=new Runnable() {
            @Override
            public void run() {

                String threadName=Thread.currentThread().getName();
                ChooseMaster chooseMaster=new ChooseMaster(threadCompleteLatch);
                try {
                    chooseMaster.createConnection(ZK_CONNECT_STRING,SESSION_TIMEOUT);
                    System.out.println(Thread.currentThread().getName()+" connected to server");
                    synchronized (MasterChoiceTest.class){

                        chooseMaster.createPersistPath(ZK_ROOT_PATH,"thread "+threadName,true);
                    }
                    chooseMaster.chooseMaster();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        for(int i=0;i<THREAD_NUM;i++){

            executorService.execute(masterChoiceTest);
        }
        executorService.shutdown();
        try {
            threadCompleteLatch.await();
            System.out.println("All thread finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果:

The 1 thread-EventThread connected to server
The 5 thread-EventThread connected to server
The 3 thread-EventThread connected to server
The 2 thread-EventThread connected to server
The 5 thread connected to server
The 3 thread connected to server
The 2 thread connected to server
The 1 thread connected to server
The 4 thread-EventThread connected to server
The 4 thread connected to server
The 5 thread create persist path /zkmaster
The 5 threadcreate path /zkmaster/register0000000000
[register0000000000]
The 5 thread tmp node index is register0000000000
The 5 thread I'm the master
The 1 threadcreate path /zkmaster/register0000000001
The 2 threadcreate path /zkmaster/register0000000002
The 5 thread I'm the master,now do work
The 3 threadcreate path /zkmaster/register0000000003
[register0000000000, register0000000002, register0000000001]
The 1 thread tmp node index is register0000000001
The 1 thread the node before me is /zkmaster/register0000000000
[register0000000000, register0000000002, register0000000001, register0000000003]
[register0000000000, register0000000002, register0000000001, register0000000003]
The 3 thread tmp node index is register0000000003
The 2 thread tmp node index is register0000000002
The 3 thread the node before me is /zkmaster/register0000000002
The 2 thread the node before me is /zkmaster/register0000000001
The 4 threadcreate path /zkmaster/register0000000004
[register0000000000, register0000000002, register0000000001, register0000000004, register0000000003]
The 4 thread tmp node index is register0000000004
The 4 thread the node before me is /zkmaster/register0000000003
The 5 thread Finish do work,leave master
The 1 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000002, register0000000001, register0000000004, register0000000003]
The 1 thread-EventThread tmp node index is register0000000001
The 1 thread-EventThread I'm the master
The 1 thread-EventThread I'm the master,now do work
The 1 thread-EventThread Finish do work,leave master
The 2 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000002, register0000000004, register0000000003]
The 2 thread-EventThread tmp node index is register0000000002
The 2 thread-EventThread I'm the master
The 2 thread-EventThread I'm the master,now do work
The 2 thread-EventThread Finish do work,leave master
The 3 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000004, register0000000003]
The 3 thread-EventThread tmp node index is register0000000003
The 3 thread-EventThread I'm the master
The 3 thread-EventThread I'm the master,now do work
The 3 thread-EventThread Finish do work,leave master
The 4 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000004]
The 4 thread-EventThread tmp node index is register0000000004
The 4 thread-EventThread I'm the master
The 4 thread-EventThread I'm the master,now do work
The 4 thread-EventThread Finish do work,leave master
All thread finished

分布式锁的原理也是一样,每次编号最小的获取锁。

原文地址:https://www.cnblogs.com/dpains/p/7885645.html