使用redis做分布式锁

为什么会有这个需求:

例如一个简单用户的操作,一个线程去修改用户状态,首先在在内存中读出用户的状态,然后在内存中进行修改,然后在存到数据库中。在单线程中,这是没有问题的。但是在多线程中由于读取,修改,写入是三个操作,不是原子操作(同时成功或失败),因此在多线程中会存在数据的安全性问题。

这个问题的话,就可以用分布式锁在限制程序的并发执行。

实现思路:

就是进来一个先占位,当别的线程进来操作的时候,发现有人占位了,就会放弃或者稍后再试。

占位的实现:

在redis中的setnx命令来实现,redis命令可以参考我这篇博客https://www.cnblogs.com/javazl/p/12657280.html,默认set命令就是存值,当key存在的时候,set就会覆盖key的value值,而setnx则不会。当没有key的时候,setnx就会进来先占位,当key存在了,其他的setnx就进不来了。。等到第一个执行完成后,在del命令释放位子。

代码实现:

public class LockTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis->{
            Long setnx = jedis.setnx("k1", "v1");
          //setnx的返回值为long类型
            if (setnx == 1) {
                //没人占位
                jedis.set("name", "zl");
                String name = jedis.get("name");
                System.out.println(name);
                //释放资源
                 jedis.del("k1");
           }else{
                //有人占位,停止/暂缓 操作
           }
       });
   }
}

上边代码中,就是一个简易的分布式锁的实现,但是有一个问题。就是如果在占位后释放前挂了。那么这个线程会一直释放不了,也就是del命令没有调用,后面的全部请求都阻塞到这里,锁就变成了死锁。因此这里需要去优化。

优化的方法就是加过期时间,确保锁在一定时间后能够释放.

public class LockTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis->{
            Long setnx = jedis.setnx("k1", "v1");
            if (setnx == 1) {
                //给锁添加一个过期时间,防止应用在运行过程中抛出异常导致锁无法及时得到释放
                jedis.expire("k1", 5);
                //没人占位
                jedis.set("name", "zl");
                String name = jedis.get("name");
                System.out.println(name);
                jedis.del("k1");
           }else{
                //有人占位,停止/暂缓 操作
           }
       });
   }

这样处理后,就可以保证锁可以正常的释放。但是会有一个新的问题,就是如果在取锁和设置过期时间服务器挂掉了,因为取锁,也就是setnx和设置过期时间是两个操作,不具备原子性所以不可能同时完成。这个锁就会被一直占用,无法得到释放,成为死锁。那么如何解决呢?

在redis2.8之后,setnx和expireke可以通过一个命令一起执行,让两个操作变成一个,就会解决这个问题。

优化实现:

public class LockTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis->{
         //将两个操作合并成一个,nx就是setnx,ex就是expire
            String set = jedis.set("k1", "v1", new SetParams().nx().ex(5));
          //操作结果为okhuo或者error
            if (set !=null && "OK".equals(set)) {
          //给锁添加一个过期时间,防止应用在运行过程中抛出异常导致锁无法及时得到释放
                jedis.expire("k1", 5);
                //没人占位
                jedis.set("name", "zl);
                String name = jedis.get("name");
                System.out.println(name);
            //释放资源
                jedis.del("k1");
           }else{
                //有人占位,停止/暂缓 操作
           }
       });
   }
}

用过期时间优化后,虽然解决了死锁的问题,但是又有一个新的问题产生,就是超时问题:

举个例子:如果要执行的业务很耗时,可能会出现紊乱,当地一个线程获取到锁的时候,开始执行业务代码,但是业务代码很耗时,假如过期时间是3秒,而业务执行需要5秒,这样,锁就会提前释放,然后第二个线程获取到锁并开始执行。当执行到第2秒的时候,第一个锁也执行完了,此时第一个线程会释放第二个线程的锁,然后第三个线程继续获取锁并执行,当到第3秒的时候第二个线程执行完了,那么又会提前释放锁,一直如此循环,会造成线程的紊乱。

那么解决的思路主要有两种

  1. 尽量避免耗时操作。
  2. 去处理锁,给锁的value设置随机数或随机字符串,每当要释放的时候去判断这个value的值,如果是的话就去释放,如果不是就不释放,举个例子,假设第一个线程进来,它获取锁的value是1,如果发生超时就会进入下一个线程,下一个线程会获取新的value为3,在释放第二个所之前先去获取value并比较,发现1不等于三,那么就不去释放锁。

第一种的话没啥说的,但是第二种的话会有一个问题,就是释放锁会查看value,然后比较,然后释放,会有三个操作,那么就不具备原子性,这样操作的话,会出现死锁。这里我们可以使用Lua脚本去处理。

Lua脚本的特点:

 1.使用方便,redis内置了对Lua脚本的支持。

 2.Lua可以在redis服务端原子性的执行多个redis命令

 3.由于网络的原因会影响到redis的性能,因此,使用Lua可以让多个命令同时执行,降低了网络给redis带来的性能问题。

在redis中如何使用Lua脚本:

1.在redis服务端写好,然后在java业务中调用脚本

2.可以直接在java中直接去写,写好后,需要执行时,每次将脚本发送到redis中去执行。

创建Lua脚本:

//用redis.call调用一个redis命令,调的是get命令,这个key是从外面传进来的key
if
redis.call("get",KEYS[1])==ARGV[1] then
//如果相等就去操作释放命令  
return redis.call("del",KEYS[1]) else
 return 0 end

 可以给Lua脚本求一个SHA1和:

cat lua/equal.lua | redis-cli -a root script load --pipe

script load这个命令会在Redis中缓存Lua脚本,并返回脚本内容的SHA1校验和,然后在java中调用时,传入SHA1校验和作为参数,这样redis服务端就知道执行那个脚本了。

接下来在java中编写

 public static void main(String[] args) {
        Redis redis = new Redis();
        for (int i = 0; i < 2; i++) {
            redis.execute(jedis -> {
                //1.先获取一个随机字符串
                String value = UUID.randomUUID().toString();
                //2.获取锁
                String k1 = jedis.set("k1", value, new SetParams().nx().ex(5));
                //3.判断是否成功拿到锁
                if (k1 != null && "OK".equals(k1)) {
                    //4. 具体的业务操作
                    jedis.set("site", "zl");
                    String site = jedis.get("site");
                    System.out.println(site);
                    //5.释放锁
                    jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", 
Arrays.asList("k1"), Arrays.asList(value));
               } else {
                    System.out.println("没拿到锁");
               }
           });
       }
   }
}

这样处理的话,就解决了死锁的问题。

原文地址:https://www.cnblogs.com/javazl/p/12661730.html