基于Redis分布式锁的秒杀实现

一、使用分布式锁要满足的几个条件

1、系统是一个分布式系统(关键是分布式,单机的可以使用ReentrantLock或者synchronized代码块来实现)

2、共享资源(各个系统访问同一个资源,资源的载体可能是传统关系型数据库或者NoSQL)

3、同步访问(即有很多个进程同事访问同一个共享资源。没有同步访问,谁管你资源竞争不竞争)

二、应用的场景例子

  开发环境部署架构(多台tomcat服务器+redis【多台tomcat服务器访问一台redis】+mysql【多台tomcat服务器访问一台服务器上的mysql】)就满足使用分布式锁的条件。多台服务器要访问redis全局缓存的资源,如果不使用分布式锁就会出现问题。

  从redis获取值N,对数值N进行边界检查,自加1,然后N写回redis中。 这种应用场景很常见,像秒杀,全局递增ID、IP访问限制等。以IP访问限制来说,恶意攻击者可能发起无限次访问,并发量比较大,分布式环境下对N的边界检查就不可靠,因为从redis读的N可能已经是脏数据。传统的加锁的做法(如java的synchronized和Lock)也没用,因为这是分布式环境,这个同步问题的救火队员也束手无策。在这危急存亡之秋,分布式锁终于有用武之地了。

  分布式锁可以基于很多种方式实现,比如zookeeper、redis...。不管哪种方式,他的基本原理是不变的:用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

  这里主要讲如何用redis实现分布式锁。

三、使用redis的setNX命令实现分布式锁

1、实现的原理

  Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问。且多客户端对Redis的连接并不存在竞争关系。redis的SETNX命令可以方便的实现分布式锁。

  PS:Redis客户端对服务端的每次调用都经历了发送命令,执行命令,返回结果三个过程。其中执行命令阶段,由于Redis是单线程来处理命令的,所有到达服务端的命令都不会立刻执行,所有的命令都会进入一个队列中,然后逐个执行,并且多个客户端发送的命令的执行顺序是不确定的,但是可以确定的是不会有两条命令被同时执行,不会产生并发问题,这就是Redis的单线程基本模型。

2、基本命令解析

1)setNX(SET if Not eXists),如果不存在,则 SET。

SETNX key value

说明:当且仅当 key 不存在时,将 key 的值设为 value。若给定的 key 已经存在,则 SETNX 不做任何动作。

返回值:设置成功,返回 1 。设置失败,返回 0 。

redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 设置成功
(integer) 1

redis> SETNX job "code-farmer"   # 尝试覆盖 job ,失败
(integer) 0

redis> GET job                   # 没有被覆盖
"programmer"

 所以可以执行下面的命令

SETNX lock.foo <current Unix time + lock timeout + 1> 

说明:

  • 如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。

  • 如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。

2)getSET

GETSET key value

说明:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。当 key 存在但不是字符串类型时,返回一个错误。

返回值:返回给定 key 的旧值。当 key 没有旧值时,也即是, key 不存在时,返回 nil 。

3)get
GET key

返回值:当 key 不存在时,返回 nil ,否则,返回 key 的值。如果 key 不是字符串类型,那么返回一个错误

四、解决死锁

上面的锁定逻辑有一个问题:如果一个持有锁的客户端失败或崩溃了不能释放锁,该怎么解决

我们可以通过锁的键对应的时间戳来判断这种情况是否发生了,如果当前的时间已经大于lock.foo的值,说明该锁已失效,可以被重新使用。 

  发生这种情况时,可不能简单的通过DEL来删除锁,然后再SETNX一次(讲道理,删除锁的操作应该是锁拥有这执行的,这里只需要等它超时即可),当多个客户端检测到锁超时后都会尝试去释放它,这里就可能出现一个竞态条件,让我们模拟一下这个场景: 

C0操作超时了,但它还持有着锁,C1和C2读取lock.foo检查时间戳,先后发现超时了。 
C1 发送DEL lock.foo 
C1 发送SETNX lock.foo 并且成功了。 
C2 发送DEL lock.foo 
C2 发送SETNX lock.foo 并且成功了。 
这样一来,C1,C2都拿到了锁!问题大了! 

  幸好这种问题是可以避免的,让我们来看看C3这个客户端是怎样做的: 

C3发送SETNX lock.foo 想要获得锁,由于C0还持有锁,所以Redis返回给C3一个0 
C3发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。 
反之,如果已超时,C3通过下面的操作来尝试获得锁: 
GETSET lock.foo <current Unix time + lock timeout + 1> 
通过GETSET,C3拿到的时间戳如果仍然是超时的,那就说明,C3如愿以偿拿到锁了。 
如果在C3之前,有个叫C4的客户端比C3快一步执行了上面的操作,那么C3拿到的时间戳是个未超时的值,这时,C3没有如期获得锁,需要再次等待或重试。留意一下,尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。 

  注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。

五、代码实现

  public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); // 锁到期时间
            if (this.setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = this.get(lockKey); // redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                // 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = this.getSet(lockKey, expiresStr);
                // 获取上一个锁到期时间,并设置现在的锁到期时间,
                // 只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    // 防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    // [分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
             * 延迟100 毫秒, 这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
             * 只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
             * 使用随机的等待时间可以一定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }

    /**
     * Acqurired lock release.
     */
    public synchronized void unlock() {
        if (locked) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

六、SpringBoot整合Redis实现分布式锁

SpringBoot提供的RedisTemplate来操作Redis,其中可以使用 redisTemplate.opsForValue().setIfAbsent()来实现Redis的setNX()功能。该方法还自带设置键过期时间功能。可以简化上述示例中的代码操作。

1、Redis工具类

    //===============================锁=================================
    /**
     * 赋值操作,存在值返回true,不存在返回false;
     *
     * @param key   键
     * @param value 值
     * @param millisecond 有效期
     * @return 赋值结果
     */
    public boolean setIfAbsent(String key,String value, long millisecond) {
        Boolean success = redisTemplate.opsForValue().setIfAbsent(key, value,
                millisecond, TimeUnit.MILLISECONDS);
        return success != null && success;
    }
    /**
     * 移除key对应的value。
     *
     * @param key   键
     *
     */
    public void delete(String key) {
        redisTemplate.delete(key);
    }

2、锁工具

@Component
public class RedisLock {
    @Autowired
    RedisUtil redisUtil;

    public boolean lock() {
        /*
        第一个加锁的用户,可以直接加锁,第二个加锁的用户,等待锁释放。 释放后再次加锁。
     每个锁的时间为5秒,若是5秒没有执行完业务操作,该锁就会被释放。
*/ while (true) { if (redisUtil.setIfAbsent("lockName","lock", 5000)) { return true; } else { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void releaseLock() { redisUtil.delete("lockName"); } }

 3、Controller类

  @GetMapping("get")
    public String ms(HttpServletRequest request) {
        int id = (int) (Math.random() * 100);
        String str = null;
        try {
            if (redisLock.lock()) {
                if (redisUtil.sGetSetSize("red") < 10) {
                    redisUtil.sSet("red", id);
                    str = request.getLocalPort() + " 抢购成功@!";
                } else {
                    str = request.getLocalPort() + " 抢光了!";
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            redisLock.releaseLock();
        }
        return str;

    }

 参考:https://www.cnblogs.com/zuolun2017/p/8028208.html

原文地址:https://www.cnblogs.com/david1216/p/13714525.html