Zookeeper实现分布式锁

1、引入maven包

    <dependencies>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
    </dependencies>

2、创建zookeeper分布式锁

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DisClient {
    final String disLock = "/distrilock";
    String beforeNodePath;
    String currentNodePath;
    CountDownLatch countDownLatch = null;
    String threadNname;
    //获取到zkClient
    private ZkClient zkClient = new ZkClient("192.168.3.120:2181");
    // 把抢锁过程为量部分,一部分是创建节点,比较序号,另一部分是等待锁
    // 完整获取锁方法

    public DisClient() {
        threadNname = Thread.currentThread().getName();
        synchronized (DisClient.class) {
            if (!zkClient.exists(disLock)) {
                zkClient.createPersistent(disLock);
            }
        }
    }

    public void deleteLock() {
        if (zkClient != null && currentNodePath != null && currentNodePath.equals("") == false) {
            zkClient.delete(currentNodePath);
            System.out.println(threadNname + "释放锁");
        } else {
            System.out.println(threadNname + "释放锁,失败" + currentNodePath);
        }
    }

    //这个方法就是抢锁的入口,需要不停迭代使用
    public void getDisLock() {
        //第一步:肯定是去创建临时节点,如果创建后的节点的第一个节点就是自己,
        // 说明锁就抢到了
        if (tryGetDisLock()) {
            //说明获取到锁
            System.out.println(threadNname + ":获取到了锁,成功");
        } else {
            System.out.println(threadNname + ": 获取到了锁,失败,等待");
            waitForLock();
            //递归获取锁
            getDisLock();
        }
    }

    // 尝试获取锁
    public Boolean tryGetDisLock() {
        //判断节点释放创建过,如果第一次调用,则去创建
        if (currentNodePath == null || currentNodePath.equals("")) {
            // 创建临时节点成功,返回 节点的路径
            currentNodePath = zkClient.createEphemeralSequential(disLock + "/", "lock");
        }
        //获取到 /distrilock下面所有的子节点
        final List childs = zkClient.getChildren(disLock);
        //对节点信息进行排序
        Collections.sort(childs);//默认是升序
        String minNode = childs.get(0).toString();
        //如果此时自己是最早的节点,就抢到锁了,可以执行自己的业务逻辑了
        if (currentNodePath.equals(disLock + "/" + minNode)) {
            return true;
        } else {
            //说明最小节点不是自己创建的,要监控自己当前节点序号前一个节点
            final int i = Collections.binarySearch(childs, currentNodePath.substring((disLock + "/").length()));
            beforeNodePath = disLock + "/" + childs.get(i - 1);
        } return false;
    }

    /**
     * 等待之前节点释放锁,如何判断锁被释放,
     * 需要唤醒线程继续尝试 tryGetDisLock
     */
    public void waitForLock() {
        //注册一个监听器
        IZkDataListener izk = new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
            }

            public void handleDataDeleted(String s) throws Exception {
                System.out.println(threadNname + " 监控到了," + beforeNodePath + "节点发生变化了");
                countDownLatch.countDown();
                //把值减1变为0,唤醒之前await线程
            }
        };
        // 监控前一个节点
        zkClient.subscribeDataChanges(beforeNodePath, izk);
        //在监听的通知没来之前,该线程应该处于等待
        if (zkClient.exists(beforeNodePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
                //阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforeNodePath, izk);
    }
}

3、创建测试类

public class DisLockTest {
    public static void main(String[] args) {
        //使用3个线程模拟分布式环境
        for (int i = 0; i < 10; i++) {
            new Thread(new DisLockRunnable()).start();
        }
    }

    static class DisLockRunnable implements Runnable {
        public void run() {
            //每个线程就是去抢锁
            final DisClient client = new DisClient();
            client.getDisLock();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            client.deleteLock();
        }
    }
}
原文地址:https://www.cnblogs.com/raorao1994/p/15458710.html