基于Redis实现分布式锁

1、前言

  众所周知,对于高并发业务场景通常会考虑加锁机制保证线程安全,比如使用Synchronized对象锁。Synchronized为JVM进程级别,在项目采取单实例部署情况下几乎可以胜任。但是当项目采用分布式架构,考虑采用多实例高可用部署情况时,Synchronized对象锁应对高并发场景已经力不从心。

分布式高可用部署架构:

  那么,分布式部署架构下如何避免高并发造成的“超买/超卖现象”等类似线程安全问题呢?还好,目前也有不少成熟解决方案,整体上都是围绕实现分布式锁,常见的实现方案有:

  • 基于Redis(缓存等)实现分布式锁。
  • 基于ZooKeeper实现分布式锁。
  • 基于数据库实现分布式锁。

  本文将重点探讨如何采用Redis缓存实现分布式锁。

2、Redis SETNX

  通常,采用Redis SETNX指令实现基于Redis实现分布式锁。Redis为单线程模型,可以将高并发场景操作映射为单点指令操作。

  Redis数据库指令:SETNX key value  ,SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

  (Refer to :http://redisdoc.com/string/setnx.html)

  • 指令特性

只在键 key 不存在的情况下, 将键 key 的值设置为 value 。
若键 key 已经存在, 则 SETNX 命令不做任何动作。
  • 返回值

命令在设置成功时返回 1 , 设置失败时返回 0
  • setnx是Redis命令中的方法,java中对应的实现方法是setIfAbsent()。

3、代码验证

   下文我将展示一段购物库存简单的demo示例,若采用分布式部署多实例,那么在高并发情况下会存在哪些重要问题。

本文将采用JMeter性能测试工具,模拟高并发业务场景,完成高并发压力测试。 

  • 代码

    @PostMapping("/buyProduct1")
    public String buyProduct1() {
        String buyerName = "顾客" + Thread.currentThread().getId();
        Object stObj = redisTemplate.opsForValue().get("stockNum");
        int stockNum = Integer.parseInt(stObj.toString());
        if (stockNum > 0) {
            redisTemplate.opsForValue().set("stockNum", --stockNum);
            System.out.println(buyerName + "下单成功,库存剩余件数:" + stockNum);
        } else {
            System.out.println(buyerName + "下单失败,库存不足.");
            return buyerName + "下单失败!";
        }
        return buyerName + "下单成功!";
    }
  • JMeter测试

  JMeter设置10个用户线程,0.5s内并发请求一次。

  执行成功,模拟购物成功。

 

  •  执行结果

  从IDE控制台日志可以看到,0.5s内10次请求,出现了“超卖现象”,很明显的线程安全问题。

 

   当前代码如果在单实例部署架构中,可以采用Synchronized对象锁实现线程安全控制(在业务代码上添加锁),但是在分布式部署架构下将无法实现有效控制。

4、优化代码

   采用Redis实现分布式锁,并对上述简单代码添加分布式锁机制,实现线程安全控制。通常,也有两种具体的实现方式,详细见下文代码实现。

方式一:基于Redis SETNX指令

  • 代码实现

    @PostMapping("/buyProduct2")
    public String buyProduct2() {
        String buyerName = "顾客" + Thread.currentThread().getId();
        String lockKey = "buyProductLock";
        String lockValue = UUID.randomUUID().toString().concat(UUID.randomUUID().toString());
        try {
            // setIfAbsent是java中的方法,setnx是redis命令中的方法
            // 1.保证系统崩溃可以自然释放锁
            // 2.保证redis操作原子性,避免设置超时时刻系统崩溃
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
            if (!isSuccess) {
                System.out.println("系统繁忙,请稍后重试.");
                return "系统繁忙,请稍后重试.";
            }
            int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
            if (stockNum > 0) {
                redisTemplate.opsForValue().set("stockNum", --stockNum);
                System.out.println(buyerName + "下单成功,库存剩余件数:" + stockNum);
            } else {
                System.out.println(buyerName + "下单失败,库存不足.");
                return buyerName + "下单失败!";
            }
        } finally {//3.保证操作成功和系统异常情况下都能释放锁
            //4.采用线程标识主动检查,保证仅删除自己的锁。避免redis超时时间小于业务逻辑执行时间,前一个线程释放了后一个线程的加锁,造成锁永久失效。
            //lockValue存储方法栈中线程私有
            if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
                //释放锁
                redisTemplate.delete(lockKey);
            }
        }
        return buyerName + "下单成功!";
    }
  • 运行结果

  部分线程执行成功,部分线程执行被拦截,保证了用户并发下单库存数据正确性,实现了线程安全控制。

 

 

方式二:采用Redisson 实现

  • 实现原理

 

  加锁失败情况下,可以设置超时时间T,在时间T内自旋加锁,超过时间T之后加锁失败返回,避免死锁。

  当Redis集群为多Master-Slave模式时,Redis根据hash算法选择一个master尝试加锁。

  Redisson是通过执行lua脚本完成对Redis加锁操作。

  • maven依赖

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.15.0</version>
        </dependency>
  • Redisson配置

@Component
public class redissonConfig {

    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6379")
                .setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}
  •  代码实现

    @PostMapping("/buyProduct3")
    public String buyProduct3() {
        String buyerName = "顾客" + Thread.currentThread().getId();
        String lockKey = "buyProductLock";
        // redisson加锁
        RLock redissonLock = redisson.getLock(lockKey);
        try {
            //redisson设置锁时间
            redissonLock.lock(10, TimeUnit.SECONDS);
            int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
            if (stockNum > 0) {
                redisTemplate.opsForValue().set("stockNum", --stockNum);
                System.out.println(buyerName + "下单成功,库存剩余件数:" + stockNum);
            } else {
                System.out.println(buyerName + "下单失败,库存不足.");
                return buyerName + "下单失败!";
            }
        } finally {
            //redisson释放锁
            redissonLock.unlock();
        }
        return buyerName + "下单成功!";
    }
  • 执行结果

 

 

5、总结

  • 比较

  方法一与方法二都实现了在分布式部署场景下,控制高并发业务请求下线程安全。方法一拦截并发线程,直接结束在业务逻辑执行过程中其他线程并发请求,并发吞入量较小。方法二基于Redisson可以设置并发线程等待状态,保证每个线程请求都能完成业务,提高了系统并发吞吐量。另外,方式二的实现代码量较少。

  方法一基于Redis指令面临的问题:当Redis设置超时时间<应用程序执行时间,Redis分布式锁先于程序执行完成释放,导致当前加锁失效。方式二Redisson分布式锁,通过加锁时候开启分线程,定期(小于redis超时时间,eg:1/3)检查redis锁标记,如果存在再延时机制,解决了这类时间差问题。

  • 共同存在的问题

  分布式架构下,Redis也以集群模式部署,当Redis master节点加锁成功之后,返回成功,这是Redis主节点可能宕机故障,slave从节点晋升为主节点,造成Redis锁标识丢失,从而导致分布式锁失效。

解决方案:

  采取Redlock或者 Zookeeper。Redisson性能更高,确保绝对数据安全采用Zookeeper(也是主从结构)。

6、防重复提交代码优化

   对前文《防止重复提交解决方案》进行代码优化,支持高并发场景线程安全控制。

  • 代码

    @Around("preventDuplication()")
    public Object before(ProceedingJoinPoint joinPoint) throws Throwable {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        Assert.notNull(request, "request cannot be null.");
        //获取执行方法
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        //获取防重复提交注解
        PreventDuplication annotation = method.getAnnotation(PreventDuplication.class);
        // 获取token以及方法标记,生成redisKey和redisValue
        String token = request.getHeader(IdempotentConstant.TOKEN);
        String redisKey = IdempotentConstant.PREVENT_DUPLICATION_PREFIX
                .concat(token)
                .concat(getMethodSign(method, joinPoint.getArgs()));
        String redisValue = redisKey.concat(annotation.value()).concat("submit duplication");
        System.out.print("当前线程号:" + Thread.currentThread().getId());
        System.out.println("存储redisKey: " + redisKey);
        redisValue.concat(UUID.randomUUID().toString() + Thread.currentThread().getId());
        try {
            //设置防重复操作限时标记(前置通知)
            //redisTemplate实现jedis.setnx(key,value),setIfAbsent 是java中的方法,setnx 是 redis命令中的方法
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisKey, redisValue, annotation.expireSeconds(), TimeUnit.SECONDS);
            System.out.println("当前线程号:" + Thread.currentThread().getId() + "," + "startTime:" + isSuccess);
            long startTime = System.currentTimeMillis();
            if (!isSuccess) {
                throw new RuntimeException("请勿重复提交");
            }
            System.out.println("当前线程号:" + Thread.currentThread().getId() + "," + "startTime:" + startTime + "ms耗时");
            //ProceedingJoinPoint类型参数可以决定是否执行目标方法,且环绕通知必须要有返回值,返回值即为目标方法的返回值
            Object proceed = joinPoint.proceed();
            long endStart = System.currentTimeMillis();
            System.out.println("当前线程号:" + Thread.currentThread().getId() + "," + "endStart:" + endStart + "ms耗时");
            return proceed;
        } finally {
            //释放锁校验是否为当前线程
            if (redisValue.equals(redisTemplate.opsForValue().get(redisKey))) {
                //释放锁
                redisTemplate.delete(redisKey);
            }
        }
    }

7、源代码

 本文代码已经上传托管至GitHub以及Gitee,有需要的读者请自行下载。

  • GitHub:https://github.com/gavincoder/distributedlock.git
  • Gitee:https://gitee.com/gavincoderspace/distributedlock.git
原文地址:https://www.cnblogs.com/gavincoder/p/14413436.html