zookeeper分布式锁的实现

* 实现思路:
* 使用Zookeeper最小节点的方式
* 执行过程:
* 1、创建根节点,在根节点下创建顺序节点
* 2、如当前创建的节点为根节点的所有子节点中最小的,则获取锁成功;
* 否则,找到当前节点的前一个节点,watch前一个节点,当前一个节点被删除时获得锁;另外,等待超时也不能获得锁

代码能跑通,但还要改,先记一下:

1.先创建一个抽象类实现Lock接口:
package com...zookeeper.zkLock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author com.
 * @date 
 */
public abstract class AbstractLock implements Lock {
    /**
     * 获取锁的重试次数
     */
    private static final int RE_TRY = 10;

    @Override
    public void lock() {
        int count = RE_TRY;
        while (!this.tryLock() && count > 0) {
            count--;
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
2.具体实现代码:
package com..zookeeper.zkLock;



import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author com.
 * @date 
 */

public class ZookeeperLock extends AbstractLock {

    private ZooKeeper zooKeeper;

    /**
     * 锁根节点
     */
    private final String lockNamespace;
    /**
     * 锁值节点
     */
    private final String lockKey;
    /**
     * 当前节点
     */
    private String currentNode;
    /**
     * 等待的前一个节点
     */
    private String waitNode;
    /**
     * 竞争的节点列表
     */
    private List<String> lockNodes;
    /**
     * 计数器
     */
    private volatile CountDownLatch countDownLatch;
    /**
     * 是否持有锁
     */
    private volatile boolean locked = false;

    public ZookeeperLock(String address, int timeout, String lockNamespace, String lockKey) {
        this.init(address, timeout);
        this.lockNamespace = lockNamespace;
        this.lockKey = lockKey;
    }

    private void init(String address, int timeout) {
        try {
            zooKeeper = new ZooKeeper(address, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("事件类型为:" + event.getType());
                    System.out.println("事件发生的路径:" + event.getPath());
                    System.out.println("通知状态为:" +event.getState());

                }
            });
        } catch (Exception e) {
              System.out.println(e.getMessage()+ e);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public boolean tryLock() {

        while (true) {
            String lock = lockNamespace + "/" + lockKey;
            try {
                // 确保zookeeper连接成功
                this.ensureZookeeperConnect();
                // 确保根节点存在
                ensureNameSpaceExist(lockNamespace);
                // 创建临时顺序节点,节点目录为/lockNamespace/lockKey_xxx,节点为lockKey_xxx
                currentNode = zooKeeper.create(lock, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL).replace(lockNamespace + "/", "");
                // 取出所有子节点
                List<String> childrenList = zooKeeper.getChildren(lockNamespace, false);
                // 竞争的节点列表
                lockNodes = new ArrayList<>();
                for (String children : childrenList) {
                    if (children.startsWith(lockKey)) {
                        lockNodes.add(children);
                    }
                }
                // 排序
                Collections.sort(lockNodes);
//                System.out.println("排序后的所有子节点--->:" + lockNodes);
                // 如当前节点为最小节点,则成功获取锁
                if (currentNode.equals(lockNodes.get(0))) {
                    locked = true;
                }
                System.out.println(Thread.currentThread().getName() + "   "+currentNode +  "  比较  " +  lockNodes.get(0)+   "  为  "+locked + "  创建了临时节点");
                return locked;
            } catch (InterruptedException | KeeperException e) {
                System.out.println(Thread.currentThread().getName() + "   获得锁异常");
                System.out.println(e.getMessage() + e);
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void unlock() {
        try {
            zooKeeper.delete(lockNamespace + "/" + currentNode, -1);
            zooKeeper.close();
            locked = false;
            System.out.println(Thread.currentThread().getName() + "   释放锁~~~");
        } catch (InterruptedException | KeeperException e) {
              System.out.println(e.getMessage()+e);
        }
    }

    /**
     * Zookeeper分布式锁
     * 实现思路:
     * 使用Zookeeper最小节点的方式
     * 执行过程:
     * 1、创建根节点,在根节点下创建顺序节点
     * 2、如当前创建的节点为根节点的所有子节点中最小的,则获取锁成功;
     * 否则,找到当前节点的前一个节点,watch前一个节点,当前一个节点被删除时获得锁;另外,等待超时也不能获得锁
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        //等待锁
        try {
            if (tryLock()) {
                System.out.println(Thread.currentThread().getName() + "   获得锁~~~");
                return locked;
            }
            System.out.println(Thread.currentThread().getName() + "   等待锁~~~");
            //找到当前节点的前一个节点
            waitNode = lockNodes.get(Collections.binarySearch(lockNodes, currentNode) - 1);
            this.waitLock(time, unit);
            return locked;
        } catch (KeeperException e) {
              System.out.println(e.getMessage()+ e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 等待锁
     */
    private void waitLock(long time, TimeUnit unit) throws KeeperException, InterruptedException {
        String waitLock = lockNamespace + "/" + waitNode;
        System.out.println(Thread.currentThread().getName() +"   等待锁 {} 释放    " + waitLock);
        Stat stat = zooKeeper.exists(waitLock, watchedEvent -> {
            if (countDownLatch != null) {
                locked = true;
                countDownLatch.countDown();
            }
        });
        // 前一个节点此刻存在,等待,节点消失则成功获取锁
        if (stat != null) {
            countDownLatch = new CountDownLatch(1);
            countDownLatch.await(time, unit);
            countDownLatch = null;
        } else {
            // 前一个节点此刻不存在,获得锁
            locked = true;
        }
    }

    /**
     * 确保根节点存在
     */
    private void ensureNameSpaceExist(String lockNamespace) throws KeeperException, InterruptedException {
        Stat statS = zooKeeper.exists(lockNamespace, false);
        if (statS == null) {
            //如果根节点不存在,创建
            zooKeeper.create(lockNamespace, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 确保zookeeper连接成功,防止出现连接还未完成就执行zookeeper的get/create/exsit操作出现错误KeeperErrorCode = ConnectionLoss
     */
    private void ensureZookeeperConnect() throws InterruptedException {
        CountDownLatch connectedLatch = new CountDownLatch(1);
        zooKeeper.register(watchedEvent -> {
            if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                connectedLatch.countDown();
            }
        });
        // zookeeper连接中则等待
        if (ZooKeeper.States.CONNECTING == zooKeeper.getState()) {
            connectedLatch.await();
        }
    }
}

3.测试类:

package com..zookeeper.zkLock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import wfc.service.database.RecordSet;
import wfc.service.database.SQL;

import java.io.IOException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

public class Main {

    private static final int  Thread_num   = 5;
    private static final CyclicBarrier cb = new CyclicBarrier(Thread_num);

    public static void main(String[] args) throws Exception {

        Thread [] threads = new Thread[Thread_num];
        for(int i=0;i<Thread_num;i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        cb.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }

                    Lock lock = new ZookeeperLock("localhost:2181",5000,"/zkLock","lockKey");
                    try {
                        if(lock.tryLock(1000, TimeUnit.MILLISECONDS)){
                            System.out.println(Thread.currentThread().getName() + "   获得锁执行业务");
                            //减数据库
                            int count = 1;
                            String st_id = "keys";
                            String updateSql = "update dang_fj set count = count-? where st_id = ? and count>=1";
                            Object[] updateObject = new Object[] {count,st_id};
                            RecordSet updateRs  = SQL.execute(updateSql,updateObject);
                            int number = updateRs.TOTAL_RECORD_COUNT;
                            //影响行数
                            System.out.println("数据库影响行数:"+number);

                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        lock.unlock();

                    }

                }
            });
            threads [i] = thread;
            thread.start();
        }
        for(Thread thread : threads){
            thread.join();
        }
        System.out.println("执行结束======");

    }                
原文地址:https://www.cnblogs.com/lifan12589/p/13573009.html