Redis实现分布式锁

假设有一个秒杀程序,库存为50,代码如下:

    @GetMapping("/lock")
    public String Redis() {
        String retVal;
        synchronized (this) {
            int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int remainStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
                retVal = "剩余库存:" + remainStock;
            } else {
                retVal = "库存不足";
            }
            log.info(retVal);
        }
        return retVal;
    }

单机下,上面代码没有任何问题,但是在集群下,使用synchronized 就不好使了,启动2台机器,分别是8001,8002,压测情况如下:

可以看到,出现了重复消费的情况,接下来使用分布式锁来解决上面的问题

分布式锁

redis有一个setnx操作,如果key存在,就不进行操作,否则就操作,使用setnx后代码如下:

    @GetMapping("/lock1")
    public String RedisTest1() {
        String retVal;
        String lockKey="lockKey";
        //加锁
        Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "");
        if(!isLock){
            retVal="服务器繁忙";
        }
        int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int remainStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
            retVal = "剩余库存:" + remainStock;
        } else {
            retVal = "库存不足";
        }
        log.info(retVal);
        //解锁
        stringRedisTemplate.delete(lockKey);
        return retVal;
    }

上面的代码表面上看似实现了加锁,实际上有很多问题,假设有A,B两个请求同时到达,由于redis是执行命令时是单线程,所以只会有一个请求拿到锁,假设A拿到,存在的问题有:

  • 1、如果A线程在执行的过程中发生了异常,锁就不会释放;针对这个问题使用try{}finally{}

  • 2、如果A线程还未释放锁,但所在的机器突然宕机了,锁也不会释放;针对这个问题设置过期时间

为了解决以上问题,改进后的代码如下:

 @GetMapping("/lock2")
    public String RedisTest2() {
        String retVal;
        String lockKey="lockKey";
        //加锁
        Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "");
        stringRedisTemplate.expire(lockKey, 10,TimeUnit.SECONDS);
        if(!isLock){
            retVal="服务器繁忙";
        }
        try{
            int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int remainStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
                retVal = "剩余库存:" + remainStock;
            } else {
                retVal = "库存不足";
            }
            log.info(retVal);
        }finally {
            //解锁
            stringRedisTemplate.delete(lockKey);
        }
        return retVal;
    }

仔细分析上面的程序,还是会有一系列问题:

  • 1.由于设置过期时间不是原子性的,如果刚拿到锁,还未来得及设置过期时间,机器宕掉了,锁不会释放;

  • 2.加锁A线程先拿到锁,还未执行完成,时间到期,然后B线程也拿到了锁,过一段时间后,A执行结束,释放锁,但B还未结束,此时其他请求也可以拿到锁了;

针对第一个问题,可以使用set命令,可以同时到达setnx和设置过期时间的效果,由于只有jedis才有相应的api,RedisTemplate未提供相应的功能,所以需要自己拿到jedis实例,然后调用set方法;

针对第二个问题,可以加锁后设置一个标识,只有锁是自己的,才释放;

改进代码如下:

  @GetMapping("/lock2")
    public String RedisTest2() {
        String retVal;
        String lockKey="lockKey";
        String clientId= UUID.randomUUID().toString();
        //加锁
       Boolean isLock = stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
            Jedis jedis = (Jedis) redisConnection.getNativeConnection();
            String result = jedis.set(lockKey, clientId, "NX", "EX", 10);
            if ("OK".equals(result)) {
                return Boolean.TRUE;
            }
            return Boolean.FALSE;
        });
        if(!isLock){
            retVal="服务器繁忙";
        }
        try{
            int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int remainStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
                retVal = "剩余库存:" + remainStock;
            } else {
                retVal = "库存不足";
            }
            log.info(retVal);
        }finally {
            //解锁
            if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete("stock");
            }
        }
        return retVal;
    }

实际上以上代码仍然有问题,体现如下:

  • 1.解锁不是原子性的,仍会刚判断了是自己的锁,还未来得及释放就宕机了;针对这个问题要实现原子操作,需要写脚本解决;

  • 2.过期时间到底设置多少合适,如果设置短了,可能程序还未执行完,锁就释放了,如果设置长了,万一机器宕掉了,其他机器就会等待很长的时间才能获取锁;针对这个问题,可以拿到锁后,开启一个线程定时检测是否程序持有锁,未完成就把过期时间延迟(重新

设置),具体实现自己动手比较麻烦,后面会使用redisson框架来解决该问题。

改进后的代码如下:

   @GetMapping("/lock2")
    public String RedisTest2() {
        String retVal;
        String lockKey="lockKey";
        String clientId= UUID.randomUUID().toString();
        //加锁
       Boolean isLock = stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
            Jedis jedis = (Jedis) redisConnection.getNativeConnection();
            String result = jedis.set(lockKey, clientId, "NX", "EX", 10);
            if ("OK".equals(result)) {
                return Boolean.TRUE;
            }
            return Boolean.FALSE;
        });
        if(!isLock){
            retVal="服务器繁忙";
            return retVal;
        }
        try{
            int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int remainStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
                retVal = "剩余库存:" + remainStock;
            } else {
                retVal = "库存不足";
            }
            log.info(retVal);
        }finally {
            //解锁
            String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
                Jedis jedis = (Jedis) redisConnection.getNativeConnection();
                Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey),
                        Collections.singletonList(clientId));
                Long RELEASE_SUCCESS = 1L;
                if (RELEASE_SUCCESS.equals(result)) {
                    return Boolean.TRUE;
                }
                return Boolean.FALSE;
            });
        }
        return retVal;
    }

最终把代码整理后如下:

  @GetMapping("/lock")
    public String RedisTest() {
        String retVal;
        String lockKey = "lockKey";
        String stockKey = "stock";
        String clientId = UUID.randomUUID().toString();
        //加锁
        Boolean isLock = lockService.tryLock(lockKey, clientId, 10);
        if (!isLock) {
            retVal = "服务器繁忙";
            return retVal;
        }
        try {
            BoundValueOperations<String, String> valueOps = stringRedisTemplate.boundValueOps(stockKey);
            Integer stock = Integer.valueOf(valueOps.get());
            if (stock > 0) {
                int remainStock = stock - 1;
                valueOps.set(String.valueOf(remainStock));
                retVal = "剩余库存:" + remainStock;
            } else {
                retVal = "库存不足";
            }
            log.info(retVal);
        } finally {
            //解锁
            lockService.releaseLock(lockKey,clientId);
        }
        return retVal;
    }
package com.yyb.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.scripting.support.StaticScriptSource;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 */
@Component
public class NewLockService {
    private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 该加锁方法仅针对单实例 Redis 可实现分布式加锁
     * 对于 Redis 集群则无法使用
     *
     * @param lockKey  加锁键
     * @param clientId 加锁客户端唯一标识(采用UUID)
     * @param seconds  锁过期时间
     * @return
     */
    public Boolean tryLock(String lockKey, String clientId, long seconds) {
        return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
            return redisConnection.set(lockKey.getBytes(), clientId.getBytes(), Expiration.seconds(seconds), RedisStringCommands.SetOption.ifAbsent());
        });
    }

    /**
     * 释放锁操作
     * @param key
     * @param value
     * @return
     */
    public boolean releaseLock(String key, String value) {
        DefaultRedisScript<Boolean> lockScript = new DefaultRedisScript<>();
        lockScript.setScriptSource(
                new StaticScriptSource(RELEASE_LOCK_SCRIPT));
        lockScript.setResultType(Boolean.class);
        Boolean result = redisTemplate.execute(lockScript,Collections.singletonList(key),value);
        return result;
    }
}

上述代码实现,仅对 redis 单实例架构有效,当面对 redis 哨兵模式或集群时就无效了。原因是当在主机宕机,从机被升级为主机的一瞬间的时候,如果恰好在这一刻,由于 redis 主从复制的异步性,导致从机中数据没有即时同步,那么上述代码就会无效,导致同

一资源有可能会产生两把锁,违背了分布式锁的原则。

使用Redisson解决分布式问题

Redisson实现分布式锁

  @GetMapping("/lock")
    public String RedisTest() {
        String retVal = "";
        String stockKey = "stock";
        //加锁
        RLock lock = redissonClient.getLock("myLock");
        lock.lock();
        try {
            BoundValueOperations<String, String> valueOps = stringRedisTemplate.boundValueOps(stockKey);
            Integer stock = Integer.valueOf(valueOps.get());
            Thread.sleep(3000);
            if (stock > 0) {
                int remainStock = stock - 1;
                valueOps.set(String.valueOf(remainStock));
                retVal = "剩余库存:" + remainStock;
            } else {
                retVal = "库存不足";
            }
            log.info(retVal);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //解锁
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return retVal;
    }


@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}
原文地址:https://www.cnblogs.com/ginb/p/14517254.html