【分布式锁】

一、分布式锁应用场景:

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。    // 比如确保集群只有一个节点获取锁,同步数据到Redis缓存

  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。 // 解决秒杀商品超卖问题

二、分布式锁的特点如下:

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。

  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。

  • 锁超时:和本地锁一样支持锁超时,防止死锁。避免获取锁的对象异常,导致无法删除锁。

  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。

  • 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)。

  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

三、常见问题

  1、调用的操作非原子操作,导致高并发场景下问题

        2、释放别人的锁:     

    步骤1:客户端 1 加锁成功,开始操作共享资源

    步骤2:客户端 1 操作共享资源的时间,「超过」了锁的过期时间,锁被「自动释放」

    步骤3:客户端 2 加锁成功,开始操作共享资源

    步骤4:客户端 1 操作共享资源完成,释放锁(但释放的是客户端 2 的锁

    解决: 删除前,检查当前锁是否为自己所有(客户端在加锁时,设置value为只有自己知道的「唯一标识」如UUID进去。是否时GET判断锁是否为该UUID,再DELETE)

  3、锁过期时间不好评估,导致锁在业务执行完前被提前释放问题:

        解决:Redisson后台实现

                          1)加锁时,后台实现启动了watchdog线程,先设置一个过期时间,然后我们开启一个「守护线程」,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行「续期,原有超时时间1/3」,重新设置过期时间。

                          2)获取锁后台实现依然通过Lua脚本实现,保证了原子性                    

    4、如何检查监听锁被释放

        1)可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。

        2)使用 Redis 的发布订阅功能,当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送锁释放消息。

  5、集群场景问题(生产问题):

            1)Master/Slave时,但在Master上持有分布式锁未同步问题  参考:https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/

        2)集群脑裂  参考:https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/

             3) 因分布式锁带来的高并发性能问题

四、秒杀样例代码

  参考 https://blog.csdn.net/xlgen157387/article/details/79036337 

  基于Redis的分布式锁定义: 如下存在问题,如果执行expire时,程序宕机,则该锁将永远无法释放,解决方案:setNx+lua脚本,或使用新的set() API(见下面Redis章节)

 1 public class DistributedLock {
 2     private final JedisPool jedisPool;
 3 
 4     public DistributedLock(JedisPool jedisPool) {
 5         this.jedisPool = jedisPool;
 6     }
 7 
 8     /**
 9      * 加锁
10      * @param lockName       锁的key
11      * @param acquireTimeout 获取超时时间
12      * @param timeout        锁的超时时间
13      * @return 锁标识
14      */
15     public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
16         Jedis conn = null;
17         String retIdentifier = null;
18         try {
19             // 获取连接
20             conn = jedisPool.getResource();
21             // 随机生成一个value
22             String identifier = UUID.randomUUID().toString();
23             // 锁名,即key值
24             String lockKey = "lock:" + lockName;
25             // 超时时间,上锁后超过此时间则自动释放锁
26             int lockExpire = (int) (timeout / 1000);
27 
28             // 获取锁的超时时间,超过这个时间则放弃获取锁
29             long end = System.currentTimeMillis() + acquireTimeout;
30             while (System.currentTimeMillis() < end) {
31                 if (conn.setnx(lockKey, identifier) == 1) {
32                     conn.expire(lockKey, lockExpire);
33                     // 返回value值,用于释放锁时间确认
34                     retIdentifier = identifier;
35                     return retIdentifier;
36                 }
37                 // 返回-1代表key没有设置超时时间,为key设置一个超时时间
38                 if (conn.ttl(lockKey) == -1) {
39                     conn.expire(lockKey, lockExpire);
40                 }
41 
42                 try {
43                     Thread.sleep(10);
44                 } catch (InterruptedException e) {
45                     Thread.currentThread().interrupt();
46                 }
47             }
48         } catch (JedisException e) {
49             e.printStackTrace();
50         } finally {
51             if (conn != null) {
52                 conn.close();
53             }
54         }
55         return retIdentifier;
56     }
57 
58     /**
59      * 释放锁
60      * @param lockName   锁的key
61      * @param identifier 释放锁的标识
62      * @return
63      */
64     public boolean releaseLock(String lockName, String identifier) {
65         Jedis conn = null;
66         String lockKey = "lock:" + lockName;
67         boolean retFlag = false;
68         try {
69             conn = jedisPool.getResource();
70             while (true) {
71                 // 监视lock,准备开始事务
72                 conn.watch(lockKey);
73                 // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
74                 if (identifier.equals(conn.get(lockKey))) {
75                     Transaction transaction = conn.multi();
76                     transaction.del(lockKey);
77                     List<Object> results = transaction.exec();
78                     if (results == null) {
79                         continue;
80                     }
81                     retFlag = true;
82                 }
83                 conn.unwatch();
84                 break;
85             }
86         } catch (JedisException e) {
87             e.printStackTrace();
88         } finally {
89             if (conn != null) {
90                 conn.close();
91             }
92         }
93         return retFlag;
94     }
95 }
View Code

  秒杀Service实现(借助上一步分布式锁实现)

 1 public class Service {
 2     private static JedisPool pool = null;
 3 
 4     static {
 5         JedisPoolConfig config = new JedisPoolConfig();
 6         config.setMaxTotal(200);
 7         config.setMaxIdle(8);
 8         config.setMaxWaitMillis(1000 * 100);
 9         config.setTestOnBorrow(true);
10         pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
11     }
12 
13     private DistributedLock lock = new DistributedLock(pool);
14 
15     int n = 500;
16 
17     public void secKill() {
18         String identifier = lock.lockWithTimeOut("resource", 500, 1000);
19         if (identifier != null) {
20             System.out.println((Thread.currentThread().getName() + "获取锁"));
21             System.out.println(--n);
22             lock.releaseLock("resource", identifier);
23         }
24     }
25 
26 }
View Code

   模拟秒杀调用

public class ThreadA extends Thread {
    private Service service;

    public ThreadA(Service service) {
        this.service = service;
    }

    @Override
    public void run() {
        service.seckill();
    }
}

public class Test {
    public static void main(String[] args) {
        Service service = new Service();
        for (int i = 0; i < 50; i++) {
            ThreadA threadA = new ThreadA(service);
            threadA.start();
        }
    }
}

五、常见分布式锁

1)zk: 

     ZooKeeper 有四种节点类型,包括持久节点、持久顺序节点、临时节点和临时顺序节点,利用 ZooKeeper 支持临时顺序节点的特性,可以实现分布式锁。

  1.持久节点 (PERSISTENT)
    默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在

  2.持久节点顺序节点(PERSISTENT_SEQUENTIAL)

    所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号

  3.临时节点(EPHEMERAL)
    和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除

  4.临时顺序节点(EPHEMERAL_SEQUENTIAL)
    顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

  原理:当客户端对某个方法加锁时,在 ZooKeeper 中该方法对应的指定节点目录下,生成一个唯一的临时有序节点。当自己创建的有序节点是当前目录下编号最小的,则认为获取锁成功

  具体实现

    (1)创建一个目录mylock;
    (2)线程A:想获取锁就在mylock目录下 1)创建临时顺序节点;2)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
    (3)线程B:获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
    (4)线程A:处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

  这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

      优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。

      缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。

2)Redis

   参考: https://www.cnblogs.com/linjiqin/p/8003838.html

  1、加锁:

    错误实现:setnx(set if not exist 成功返回1,否则返回0)+ expire(设置过期时间):非原子操作,当执行expire时程序宕机,则导致锁永远不会被释放

    正确实现:根据Jedis版本有差异

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";

/**
     * 尝试获取分布式锁
    * @param jedis Redis客户端
    * @param lockKey 锁
    * @param requestId 请求标识  常 UUID.randomUUID().toString()
    * @param expireTime 超期时间
    * @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;

}

       基于3.5.2样例(Jedis API有调整)   

SetParams params = new SetParams();
params.ex(100); // 设置超时时间
params.nx(); // 若锁不存在才进行写操作
jedis.set(key, requestId, params); // 成功返回OK

解决方案:通过lua脚本 参考https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/

  2、解锁 (通过lua脚本实现)

private static final Long RELEASE_SUCCESS = 1L;

/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;

}

  错误的实现:多线程场景,可能if判断成功后,锁已被别的客户端获取,此时执行删除的是别人创建的锁

// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
    // 若在此时,这把锁突然不是这个客户端的,则会误解锁
    jedis.del(lockKey);
}

3)Redisson分布式锁

    失效时间设置多长时间为好?这个问题在 redisson 的做法是:每获得一个锁时,只设置一个很短的超时时间,同时起一个线程在每次快要到超时时间时去刷新锁的超时时间。在释放锁的同时结束这个线程。

    主要使用接口:redisson.getLock(lockKey)、lock.lock(获取不到会一直阻塞)、lock.unLock ,

    同时封装了可重入锁、乐观锁、公平锁、读写锁(缓存双写不一致),

      RReadWriteLock lock = redisson.getReadWriteLock("lockKey");

   RLock rLock = lock.readLock();    lock.lock(); lock.unlock();

   RLock wLock = lock.writeLock();   lock.lock(); lock.unlock();

    如何开发参考官方的github样例


 缓存数据库双写不一致

读多写少场景:使用redisson.getReadWriteLock("lockKey"), 获取读写锁,同时可以提升并发。

读多写多场景:1)减少数据库缓存过期时间;2)直接对接数据库;

原文地址:https://www.cnblogs.com/clarino/p/15245678.html