基于ZooKeeper的分布式锁实现

夜深了,先贴代码保存一下明天再整理

WatchCallBack.java

package com.breeze.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;

public class WatchCallBack implements Watcher, AsyncCallback.Create2Callback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    ZooKeeper zk ;
    String threadName;
    String pathName;

    CountDownLatch cc = new CountDownLatch(1);
    //Watcher要实现的方法
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zk.getChildren("/",false,this,"test");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    public void tryLock(){
        try {
            zk.create("/lock",threadName.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL,this,"test");
            cc.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void unLock(){
        try {
            zk.delete(pathName,-1);

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    //Create2Callback的需实现方法
    @Override
    public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
        if(name != null){
            pathName = name;
            System.out.println(threadName + "create path name: "+name);
            //this 是callback 回调下面的processResult
            zk.getChildren("/",false,this,"test");

        }
    }

    //Children2Callback 的实现方法 zk.getChildren()

    /**
     * 每一个node 只watch 它的前一个node ,这样,zk在通知时,只给第一个通知,减少zk服务器压力
     *
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        //先排序,第一个放行,表示可以获得锁
        Collections.sort(children);
        int i = children.indexOf(pathName.substring(1));
        if(i == 0){
            //是第一个
            System.out.println(threadName+"i am the first ...");
            cc.countDown();
        }else {
            //不是
            zk.exists("/"+children.get(i-1),this,this,"test");
        }

        //可以看到自己前面的node
//        System.out.println(threadName+"look locks...");
//        for (String child : children) {
//            System.out.println(child);
//        }


    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public String getPathName() {
        return pathName;
    }

    public void setPathName(String pathName) {
        this.pathName = pathName;
    }


    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //如果不是第一个children的状态回调

    }
}

TestLock.java

package com.breeze.lock;

import com.breeze.zk.ZKUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestLock {

    ZooKeeper zk;


    @Before
    public void conn(){
        zk = ZKUtils.getZK();
    }

    @After
    public void close(){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void lock(){
        for (int i = 0; i < 10; i++) {
            new Thread(){
                public void run(){
                    String threadName = Thread.currentThread().getName();
                    WatchCallBack watchCallBack = new WatchCallBack();
                    watchCallBack.setZk(zk);
                    watchCallBack.setThreadName(threadName);
                    //抢锁
                    watchCallBack.tryLock();
                    //干活
                    System.out.println(threadName+"is working...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    //释放锁
                    watchCallBack.unLock();


            }
            }.start();
        }

        while (true);

    }
}

ZKUtils.java

package com.breeze.zk;

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZKUtils {
    private static ZooKeeper zk;

    private static String address = "zkserver ip1:2181,zk server ip2:2181/testLock";

    private static DefaultWatch watch = new DefaultWatch();

    private static CountDownLatch cdLatch = new CountDownLatch(1);
    public static ZooKeeper getZK(){
        try {
            zk = new ZooKeeper(address,1000,watch);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            watch.setCc(cdLatch);
            cdLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }
}


原文地址:https://www.cnblogs.com/shengjm/p/13252857.html