zk分布锁的java实现

只做记录,直接上代码

父类:

package com.ylcloud.common.lock;

import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 * @author cjh
 * @Description: zk分布锁
 * @date: 2018/9/27 11:36
 */
public class ZkLock {

    private static Logger logger = LogManager.getLogger(ZkLock.class);

    public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";

    public static Integer sessionTimeout = 30000;

    public static Integer connectTimeout = 30000;

    /**
     * 节点锁标记
     */
    public String lockPath;

    /**
     * 前一个节点(设置用户添加监听器)
     */
    public String beforeNode;

    /**
     * 当前执行节点(设置用于删除)
     */
    public String currentNode;

    /**
     * 当前请求节点
     */
    public String threadTag = null;

    private String lock1 = null;

    private String lock2 = null;

    public static final ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer());

    private static final ThreadLocal<String> NODES = new ThreadLocal<String>();

    public ZkLock() {
    }

    public void init(String code) {
        this.lockPath = code;
        this.lock1 = code + "LOCK";
        this.lock2 = code + "UNLOCK";
        client.deleteRecursive(lockPath);
    }

    public void lock() {

        synchronized (lock1) {
            if (!client.exists(lockPath)) {
                client.createPersistent(lockPath);
            }

            if (!tryLock()) {
            }

        }

    }

    public void unlock() {

        List<String> childrens = client.getChildren(lockPath);
        Collections.sort(childrens);

        String nodes = NODES.get();
        logger.info(JSON.toJSONString(childrens) + " ==== " + nodes + " ==== " + (nodes.equals(lockPath + '/' + childrens.get(0))));
        if (childrens.size() > 0 && nodes.equals(lockPath + '/' + childrens.get(0))) {
            client.delete(nodes);
        }

    }

    private boolean tryLock() {

        threadTag = client.createEphemeralSequential(lockPath + '/', "");

        NODES.set(threadTag);

        List<String> childrens = client.getChildren(lockPath);
        Collections.sort(childrens);

        currentNode = lockPath + '/' + childrens.get(0);

        if (threadTag.equals(currentNode)) {
            return true;
        } else {

            currentNode = threadTag;

            int wz = Collections.binarySearch(childrens, threadTag.substring(lockPath.length() + 1));
            beforeNode = lockPath + '/' + childrens.get(wz - 1);

            final CountDownLatch latch = new CountDownLatch(1);
            try {

                client.subscribeDataChanges(beforeNode, new IZkDataListener() {

                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        if (latch != null && latch.getCount() > 0) {
                            latch.countDown();
                        }
                    }

                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {
                    }
                });

                if (client.exists(beforeNode)) {
                    latch.await(sessionTimeout, TimeUnit.MILLISECONDS);
                }

            } catch (Exception e) {
                return true;
            } finally {
            }

        }
        return false;
    }

}

子类

package com.ylcloud.common.lock.ext;

import com.ylcloud.common.lock.ZkLock;

/**
  * @Description: 用户编码锁
  * @author cjh
  * @date: 2018/10/10 14:47
  */
public class ZkLockUserCode extends ZkLock {

    public ZkLockUserCode() {
        super.init("/USER_CODE");
    }

}

使用示例:

private ZkLock zkLock = new ZkLockUserCode();

public void addUser() {
    
    try {

        zkLock.lock();

            /**
             *业务实现
             */
            
    } catch (Exception e) {
        logger.info("err {} {} ", e.getMessage(), e.getCause());


    } finally {
        zkLock.unlock();
    }
        
}     

注意:unlock必须写在finally里面,否则一旦业务出现运行错误造成没有解锁,下一次访问的人就需要等待一个sessionTime了

题外话:zk在linux上启动命令 ./zkServer.sh start

自己实现的很多细节没考虑到导致在高并发的项目中出现了问题,然后我改用了Curator框架来操控zookeeper实现分布锁:

    <curator.version>4.0.0</curator.version> 

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>    

锁实现代码:

package com.ylcloud.common.lock;

import com.ylcloud.common.lock.ext.ZkLockRoleCode;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 * @author cjh
 * @Description: zk分布锁
 * @date: 2018/9/27 11:36
 */
public class ZkLock {

    private static Logger logger = LogManager.getLogger(ZkLock.class);

    public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";

    public static String NAME_SPACE = "YL_CLOUD";

    public static Integer sessionTimeout = 15000;

    public static Integer connectTimeout = 30000;

    private static Object initLock = new Object();

    public static CuratorFramework client = null;

    private static Map<String,ZkLock> tagLocks = new HashMap<String,ZkLock>();

    public ZkLock() {
    }

    public InterProcessMutex mutex = null;

    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client =
                CuratorFrameworkFactory.builder()
                        .connectString(ZOOKEEPER_IP_PORT)
                        .sessionTimeoutMs(sessionTimeout)
                        .connectionTimeoutMs(connectTimeout)
                        .retryPolicy(retryPolicy)
                        .namespace(NAME_SPACE)
                        .build();

        client.start();
    }

    public void init(String code) {
        this.mutex = new InterProcessMutex(client,code);
    }

    //prefix 标记  code 主键
    public static ZkLock getTagLock(String prefix,String code){
        String tag = prefix + "/" + code;

        if(!tagLocks.containsKey(tag)){
            synchronized (initLock){
                if(!tagLocks.containsKey(tag)){
                    ZkLock zkLock = new ZkLock();
                    zkLock.init(tag);
                    tagLocks.put(tag,zkLock);
                }
            }
        }

        return tagLocks.get(tag);
    }

    public void lock(){
        try {
            this.mutex.acquire();
        } catch (Exception e) {
            logger.error("加锁失败");
        }
    }

    public void unlock(){
        try {
            this.mutex.release();
        } catch (Exception e) {
            logger.error("锁释放失败");
        }
    }

}

调用代码(不变)

转载请注明博客出处:http://www.cnblogs.com/cjh-notes/

原文地址:https://www.cnblogs.com/cjh-notes/p/9744183.html