分布式锁

1.redis锁

1.工具类代码:

package com.lhw.chche;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;

public class RedisLock {
    private final static Logger LOG = LoggerFactory.getLogger(RedisLock.class);
    static StringBuilder sb = new StringBuilder();
    static {
        sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call("del",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
    }
    public static final String UNLOCK_LUA = sb.toString();
    
    /**
     * redis加锁
     * @param key lock
     * @param expire
     * @return
     */
    public static String setLock(final String key, final long expire) {
        //System.out.println("1--------------setLock,key="+key);
        try {
            RedisCallback<String> callback = new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    return commands.set(key, key, "NX", "PX", expire);
                }
            };
            String result = RedisOperationConfig.getStringRedisTemplate().execute(callback);
            //System.out.println("1--------------setLock,result="+result);
            if (StringUtils.isEmpty(result)) {
                LOG.warn("key="+key+",获取锁失败。。。。。");
                return null;
            }else {
                if(LOG.isInfoEnabled()) {
                    LOG.info("key="+key+",获取锁成功。。。。。");
                }
                return key;
            }
        } catch (Exception e) {
            LOG.error("",e);
            LOG.warn("key="+key+",获取锁异常。。。。。");
        }
        return null;
    }
    /**
     * redis 解锁
     * @param key
     * @param requestId
     * @return
     */
    public static boolean releaseLock(String key, String requestId) {
        try {
            final List keys = new ArrayList();
            keys.add(key);
            final List args = new ArrayList();
            args.add(requestId);

            RedisCallback callback = new RedisCallback() {
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();

                    if ((nativeConnection instanceof JedisCluster)) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }

                    if ((nativeConnection instanceof Jedis)) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    return Long.valueOf(0L);
                }
            };
            Long result = (Long) RedisOperationConfig.getStringRedisTemplate().execute(callback);

            boolean bool = (result != null) && (result.longValue() > 0L);
            return bool;
        } catch (Exception e) {

        } finally {
        }
        return false;
    }

    /**
     * 获取redis
     * @param key
     * @return
     */
    public static String getLockValue(final String key){
        try {
            RedisCallback<String> callback = new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    return commands.get(key);
                }
            };
            String result = RedisOperationConfig.getStringRedisTemplate().execute(callback);
            if (StringUtils.isEmpty(result)) {
                LOG.warn("key="+key+",获取锁值失败。。。。。");
                return null;
            }else {
                if(LOG.isInfoEnabled()) {
                    LOG.info("key="+key+",获取锁值成功。。。。。");
                }
                return result;
            }
        } catch (Exception e) {
            LOG.error("",e);
            LOG.warn("key="+key+",获取锁异常。。。。。");
        }
        return null;
    }
}

  

2.使用场景顺序

1 --> taskNo是一个任务的唯一值,先进行上锁.看是否成功,成功则继续

2 --> taskNo+时间(年月日),先进行上锁.看是否成功,成功则继续,执行具体的任务

3 --> 设置 2里面 taskNo+时间(年月日)的过期时间

4 --> 释放 1里面 taskNo 的锁

1.
        String rediskey = "com.lhw.lock."+taskNo;
        String lock = RedisLock.setLock(rediskey,outTime);
2.
        String redisDealkey = "com.lhw.dealLock."+taskNo+"."+ymd;
        String dealLock = RedisLock.getLockValue(redisDealkey);
3.
        //设置当天任务已执行的开关值,默认12小时过期时间
        RedisLock.setLock(redisDealkey,12*60*60*1000);
4.
     RedisLock.releaseLock(rediskey,rediskey);

2.zookeeper锁

图解:公平锁和可重入锁 模型

分布式锁的概念和原理,比较抽象难懂。如果用一个简单的故事来类比,估计就简单多了。

很久以前,在一个村子有一口井,水质非常的好,村民们都抢着取井里的水。井就那么一口,村里的人很多,村民为争抢取水打架斗殴,甚至头破血流。

问题总是要解决,于是村长绞尽脑汁,最终想出了一个凭号取水的方案。井边安排一个看井人,维护取水的秩序。

说起来,秩序很简单,取水之前,先取号。号排在前面的,就可以先取水。先到的排在前面,那些后到的,没有排在最前面的人,一个一个挨着,在井边排成一队。取水示意图如下 :

这种排队取水模型,就是一种锁的模型。排在最前面的号,拥有取水权,就是一种典型的独占锁。另外,先到先得,号排在前面的人先取到水,取水之后就轮到下一个号取水,至少,看起来挺公平的,说明它是一种公平锁。

在公平独占锁的基础上,再进一步,看看可重入锁的模型。

假定,取水时以家庭为单位,哪个家庭任何人拿到号,就可以排号取水,而且如果一个家庭有一个人拿到号,其它家人这时候过来打水不用再取号。新的排号取水示意图如下 :

如上图的1号,老公有号,他的老婆来了,直接排第一个,妻凭夫贵。再看上图的2号,父亲正在打水,他的儿子和女儿也到井边了,直接排第二个,这个叫做子凭父贵。 等等,如果是同一个家庭,可以直接复用排号,不用重新取号从后面排起。

以上这个故事模型,就是可以重入锁的模型。只要满足条件,同一个排号,可以用来多次取水。在锁的模型中,相当于一把锁,可以被多次锁定,这就叫做可重入锁。

zookeeper分布式锁的原理

理解了锁的原理后,就会发现,Zookeeper 天生就是一副分布式锁的胚子。

首先,Zookeeper的每一个节点,都是一个天然的顺序发号器。

在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一

比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。

 

其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。

一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。

每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

Zookeeper的节点监听机制,可以说能够非常完美的,实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听linsten或者 watch 监视排号在自己前面那个,而且紧挨在自己前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,则获得锁。

为什么说Zookeeper的节点监听机制,可以说是非常完美呢?

一条龙式的首尾相接,后面监视前面,就不怕中间截断吗?比如,在分布式环境下,由于网络的原因,或者服务器挂了或则其他的原因,如果前面的那个节点没能被程序删除成功,后面的节点不就永远等待么?

其实,Zookeeper的内部机制,能保证后面的节点能够正常的监听到删除和获得锁。在创建取号节点的时候,尽量创建临时znode 节点而不是永久znode 节点,一旦这个 znode 的客户端与Zookeeper集群服务器失去联系,这个临时 znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。

说Zookeeper的节点监听机制,是非常完美的。还有一个原因。

Zookeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反映,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反映。

1.简单的分布式锁

代码实现

AbstractLock.java

package zklock;
 
import org.I0Itec.zkclient.ZkClient;
 
public abstract class AbstractLock {
 
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.0.230:2181";
    //超时时间
    public static final int SESSION_TIMEOUT = 10000;
    //创建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
    
    
    /**
     * 可以认为是模板模式,两个子类分别实现它的抽象方法
     * 1,简单的分布式锁
     * 2,高性能分布式锁
     */
    public void getLock() {
        String threadName = Thread.currentThread().getName();
        if (tryLock()) {
            System.out.println(threadName+"-获取锁成功");
        }else {
            System.out.println(threadName+"-获取锁失败,进行等待...");
            waitLock();
            //递归重新获取锁
            getLock();
        }
    }
    
    public abstract void releaseLock();
    
    public abstract boolean tryLock();
    
    public abstract void waitLock();
}

 AbstractLock类是个抽象类,里面getLock使用模板模式,子类分别是简单的zk锁和高性能的zk锁

SimpleZkLock.java

package zklock;
 
import java.util.concurrent.CountDownLatch;
 
import org.I0Itec.zkclient.IZkDataListener;
 
/**
 * 简单的分布式锁的实现
 */
public class SimpleZkLock extends AbstractLock {
 
    private static final String NODE_NAME = "/test_simple_lock";
    
    private CountDownLatch countDownLatch;
    
    @Override
    public void releaseLock() {
        if (null != zkClient) {
            //删除节点
            zkClient.delete(NODE_NAME);
            zkClient.close();
            System.out.println(Thread.currentThread().getName()+"-释放锁成功");
        }
        
    }
 
    //直接创建临时节点,如果创建成功,则表示获取了锁,创建不成功则处理异常
    @Override
    public boolean tryLock() {
        if (null == zkClient) return false;
        try {
            zkClient.createEphemeral(NODE_NAME);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
 
    @Override
    public void waitLock() {
        //监听器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            //节点被删除回调
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
            //节点改变被回调
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // TODO Auto-generated method stub
                
            }
        };
        zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener);
        //如果存在则阻塞
        if (zkClient.exists(NODE_NAME)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName()+" 等待获取锁...");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //删除监听
        zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener);
    }
 
}

SimpleZkLock 表示简单的zk分布式锁,逻辑还是相对比较简单,下面看下测试

LockTest.java

package zklock;
 
public class LockTest {
    public static void main(String[] args) {
        //模拟多个10个客户端
        for (int i=0;i<10;i++) {
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }
        
    }
    
    static class LockRunnable implements Runnable{
 
        @Override
        public void run() {
            AbstractLock zkLock = new SimpleZkLock();
            //AbstractLock zkLock = new HighPerformanceZkLock();
            zkLock.getLock();
            //模拟业务操作
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            zkLock.releaseLock();
        }
        
    }
}

zk实现的简单的分布式锁存在的性能问题

二。高性能分布式锁

1、思路:客户端在抢锁的时候进行排队,客户端只要监听它前一个节点的变化就行,如果前一个节点释放了锁,客户端才去进行抢锁操作,这个时候我们就需要创建顺序节点了

 

 HighPerformanceZkLock .java

package zklock;
 
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
 
import org.I0Itec.zkclient.IZkDataListener;
 
/**
 * 高性能分布式锁
 * @author hongtaolong
 *
 */
public class HighPerformanceZkLock extends AbstractLock {
 
    private static final String PATH = "/highPerformance_zklock";
    //当前节点路径
    private String currentPath;
    //前一个节点的路径
    private String beforePath;
    
    private CountDownLatch countDownLatch = null;
    
    public HighPerformanceZkLock() {
        //如果不存在这个节点,则创建持久节点
        if (!zkClient.exists(PATH)) {        
            zkClient.createPersistent(PATH);
        }
    }
    
    @Override
    public void releaseLock() {
        if (null != zkClient) {
            zkClient.delete(currentPath);
            zkClient.close();
        }
 
    }
 
    @Override
    public boolean tryLock() {
        //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (null == currentPath || "".equals(currentPath)) {
            //在path下创建一个临时的顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
        }
        //获取所有的临时节点,并排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);
        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
            beforePath = PATH+"/"+childrens.get(wz-1);
        }
        return false;
    }
 
    @Override
    public void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                
            }
        };
        //监听前一个节点的变化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
    }
 
}

这里只要帖高性能锁的代码了,AbstractLock没变化,LockTest中只要修改一行代码

//AbstractLock zkLock = new SimpleZkLock();

AbstractLock zkLock = new HighPerformanceZkLock();

 测试结果:

 转载自:https://blog.csdn.net/crazymakercircle/article/details/85956246

    https://blog.csdn.net/hongtaolong/java/article/details/88898875

3.数据库锁

mysql行锁。

存储引擎使用innoDB

行锁的使用方式为:

Select * from student_game where id=3 for update;

这样我们的业务代码大概是这样的:

conn.setAutoCommit(false);//关闭自动提交
select scores from test.student where id= 4 for update;//上锁
//获取学生id,scores数据,查询es记录,等到总分。
//更新学生得分表scores记录
Conn.commit;//释放锁

原文链接:https://blog.csdn.net/humanity11/java/article/details/93356344

mysql分布式锁

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。
     当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
     创建这样一张数据库表:
CREATE TABLE `methodLock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

当我们想要锁住某个方法时,执行以下SQL:

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
delete from methodLock where method_name ='method_name'

上面这种简单的实现有以下几个问题:

1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

当然,我们也可以有其他方式解决上面的问题。
1.数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
2.没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
3.非阻塞的?搞一个while循环,直到insert成功再返回成功。
4.非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。


链接:https://www.jianshu.com/p/7b57ebb25900

原文地址:https://www.cnblogs.com/linhongwenBlog/p/13344031.html