【连载】redis库存操作,分布式锁的四种实现方式[二]--基于Redisson实现分布式锁

一、redisson介绍

redisson实现了分布式和可扩展的java数据结构,支持的数据结构有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。并且是线程安全的,底层使用Netty 4实现网络通信。和jedis相比,功能比较简单,不支持排序,事务,管道,分区等redis特性,可以认为是jedis的补充,不能替换jedis。

二、redisson几种锁介绍

1、可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口

1 RLock lock = redisson.getLock("anyLock");
2 // 最常见的使用方法
3 lock.lock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

1 // 加锁以后10秒钟自动解锁
2 // 无需调用unlock方法手动解锁
3 lock.lock(10, TimeUnit.SECONDS);
4 
5 // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 lock.unlock();

Redisson同时还为分布式锁提供了异步执行的相关方法:

1 RLock lock = redisson.getLock("anyLock");
2 lock.lockAsync();
3 lock.lockAsync(10, TimeUnit.SECONDS);
4 Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

2.公平锁(Fair Lock)

基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。

1 RLock fairLock = redisson.getFairLock("anyLock");
2 // 最常见的使用方法
3 fairLock.lock();

同样的,Fair lock也提供加锁时间

1 // 10秒钟以后自动解锁
2 // 无需调用unlock方法手动解锁
3 fairLock.lock(10, TimeUnit.SECONDS);
4 
5 // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
6 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 fairLock.unlock();

Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:

1 RLock fairLock = redisson.getFairLock("anyLock");
2 fairLock.lockAsync();
3 fairLock.lockAsync(10, TimeUnit.SECONDS);
4 Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

3.联锁(MultiLock)

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

 1 RLock lock1 = redissonInstance1.getLock("lock1");
 2 RLock lock2 = redissonInstance2.getLock("lock2");
 3 RLock lock3 = redissonInstance3.getLock("lock3");
 4 
 5 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
 6 // 同时加锁:lock1 lock2 lock3
 7 // 所有的锁都上锁成功才算成功。
 8 lock.lock();
 9 ...
10 lock.unlock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

1 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
2 // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
3 lock.lock(10, TimeUnit.SECONDS);
4 
5 // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 lock.unlock();

4.红锁(Red Lock)

基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

 1 RLock lock1 = redissonInstance1.getLock("lock1");
 2 RLock lock2 = redissonInstance2.getLock("lock2");
 3 RLock lock3 = redissonInstance3.getLock("lock3");
 4 
 5 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
 6 // 同时加锁:lock1 lock2 lock3
 7 // 红锁在大部分节点上加锁成功就算成功。
 8 lock.lock();
 9 ...
10 lock.unlock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

1 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
2 // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
3 lock.lock(10, TimeUnit.SECONDS);
4 
5 // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 lock.unlock();

5. 读写锁(ReadWriteLock)

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。同时还支持自动过期解锁。该对象允许同时有多个读取锁,但是最多只能有一个写入锁。

1 RReadWriteLock rwlock = redisson.getLock("anyRWLock");
2 // 最常见的使用方法
3 rwlock.readLock().lock();
4 //
5 rwlock.writeLock().lock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
//
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
//
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

6. 闭锁(CountDownLatch)

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

三、redisson分布式锁在业务中的应用

1、引入相关pom

        <!-- redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.5.0</version>
        </dependency>

2、将redisson交由spring管理,本文采用redisson的集群模式装配,另外可配置哨兵模式,单点等

/**
 * redisson客户端参数配置类
 */
@Configuration
@Data
public class RedissonProperties {

    private int idleConnectionTimeout = 10000;

    private int pingTimeout = 1000;

    private int connectTimeout = 10000;

    private int timeout = 3000;

    private int retryAttempts = 3;

    private int retryInterval = 1500;

    private int reconnectionTimeout = 3000;

    private int failedAttempts = 3;

    private int subscriptionsPerConnection = 5;

    private String clientName = "none";

    private int subscriptionConnectionMinimumIdleSize = 64;

    private int subscriptionConnectionPoolSize = 256;

    private int slaveConnectionMinimumIdleSize = 64;

    private int slaveConnectionPoolSize = 256;

    private int masterConnectionMinimumIdleSize = 64;

    private int masterConnectionPoolSize = 256;

    private ReadMode readMode = ReadMode.MASTER;

    private SubscriptionMode subscriptionMode = SubscriptionMode.MASTER;

    private int scanInterval = 1000;

    @Value("${rediscluster.pwd}")
    private String password;

    @Value("${redis.cluster}")
    private String nodeAddress;

    @Value("${redis.cluster1}")
    private String nodeAddress1;

    @Value("${redis.cluster2}")
    private String nodeAddress2;

}
/**
 * 初始化redisson Bean
 *
 * @author LiJunJun
 * @date 2018/10/19
 */
@Configuration
public class RedissonAutoConfiguration {

    @Autowired
    private RedissonProperties redssionProperties;

    /**
     * 集群模式自动装配
     *
     * @return
     */
    @Bean
    public RedissonClient redissonClient() {

        Config config = new Config();
        String passWord = redssionProperties.getPassword();
        ClusterServersConfig serverConfig = config.useClusterServers();
        serverConfig.addNodeAddress(redssionProperties.getNodeAddress(), redssionProperties.getNodeAddress1(), redssionProperties.getNodeAddress2());
        serverConfig.setPingTimeout(redssionProperties.getPingTimeout());
        serverConfig.setConnectTimeout(redssionProperties.getConnectTimeout());
        serverConfig.setTimeout(redssionProperties.getTimeout());
        serverConfig.setRetryAttempts(redssionProperties.getRetryAttempts());
        serverConfig.setRetryInterval(redssionProperties.getRetryInterval());
        serverConfig.setReconnectionTimeout(redssionProperties.getReconnectionTimeout());
        serverConfig.setFailedAttempts(redssionProperties.getFailedAttempts());
        serverConfig.setSubscriptionsPerConnection(redssionProperties.getSubscriptionsPerConnection());
        serverConfig.setClientName(redssionProperties.getClientName());
        serverConfig.setSubscriptionConnectionMinimumIdleSize(redssionProperties.getSubscriptionConnectionMinimumIdleSize());
        serverConfig.setSubscriptionConnectionPoolSize(redssionProperties.getSubscriptionConnectionPoolSize());
        serverConfig.setSlaveConnectionMinimumIdleSize(redssionProperties.getSlaveConnectionMinimumIdleSize());
        serverConfig.setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());
        serverConfig.setMasterConnectionMinimumIdleSize(redssionProperties.getMasterConnectionMinimumIdleSize());
        serverConfig.setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize());
        serverConfig.setReadMode(redssionProperties.getReadMode());
        serverConfig.setSubscriptionMode(redssionProperties.getSubscriptionMode());
        serverConfig.setScanInterval(redssionProperties.getScanInterval());
        serverConfig.setPassword(StringUtils.isNotBlank(passWord) && !"null".equals(passWord) ? passWord : null);
        return Redisson.create(config);
    }
}

3、业务代码中的应用。此处使用的是悲观锁,即必须拿到锁之后才能继续往下执行,也可使用乐观锁,tryLock,利用重试去获取锁

    /**
     * redissonClient
     */
    @Resource
    private RedissonClient redissonClient;

    /**
     * 减库存
     *
     * @param trace 请求流水
     * @param stockManageReq(stockId、decrNum)
     * @return -1为失败,大于-1的正整数为减后的库存量,-2为库存不足无法减库存
     */
    @Override
    @ApiOperation(value = "减库存", notes = "减库存")
    @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) {

        long startTime = System.currentTimeMillis();

        LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq));

        int res = 0;
        String stockId = stockManageReq.getStockId();
        Integer decrNum = stockManageReq.getDecrNum();

        // 添加分布式锁
        RLock stockLock = null;

        try {
            if (null != stockId && null != decrNum) {

                stockId = PREFIX + stockId;

                // 添加分布式锁
                stockLock = redissonClient.getFairLock(stockId);

                stockLock.lock();

                // redis 减库存逻辑
                String vStock = redisStockPool.get(stockId);
                long realV = 0L;
                if (StringUtils.isNotEmpty(vStock)) {
                    realV = Long.parseLong(vStock);
                }
                //库存数  大于等于 要减的数目,则执行减库存
                if (realV >= decrNum) {
                    Long v = redisStockPool.decrBy(stockId, decrNum);
                    res = v.intValue();
                } else {
                    res = -2;
                }

                stockLock.unlock();
            }
        } catch (Exception e) {
            LOGGER.error(trace, "decr sku stock failure.", e);
            res = -1;
        } finally {
            if (stockLock != null && stockLock.isLocked() && stockLock.isHeldByCurrentThread()) {
                stockLock.unlock();
            }
            LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res));
        }
        return res;
    }

四、ab压测及结果分析
同样的,我们以5000的请求量100的并发量来压、tps在330左右,相对于zk做分布式锁来看,提升了10倍的性能,但仍然不能满足我们的要求

 五、总结

redisson提供了丰富的分布式锁实现机制,并且使用起来相对比较简单方便,具体选用哪种锁,可以根据业务来选择,但在高并发的情况下,性能还是有些差强人意,下一篇,我们使用redis的watch来实现分布式锁。

原文地址:https://www.cnblogs.com/ft535535/p/10149526.html