关于Redisson MultiLock 的改良

1. 背景:什么时候需要联锁(MultiLock)?

     当我们需要对多个实例进行锁定,禁止别人同时修改任意一个锁定的实例,我们就需要一个联锁(MultiLock);
     比如业务上,我需要同时操作1000条单据,处理过程是原子的,无法拆分;那么我们就必须使用上联锁(当然乐观锁也可以是一种选择);

2. 我想要什么:优化Redisson MultiLock在非常大的锁数量时的性能问题

     在Redission官方提供的联锁(MultiLock)示例中,在如下示例中,即使使用同一个redissonInstrance实例,IO消耗还是令人沮丧。


RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();


MultiLock性能测试


     上图是我在本地同一机器上测试的性能趋势,可以看到上锁时间,随着锁的数量,线性上升(由于图标X轴是非线性的,所以折线无法体现);平均下来每增加一个锁,就要增加1~2毫秒。如果有成千上万个锁对象,那么时间消耗会是非常难以忽视的;

3. 解决办法:使用Redis中的Hash数据结构来实现一个MultiLock

     基本思路是,把所有需要锁定的对象集合,放到一个公共缓存空间中,每次执行锁定任务时,检查是否有锁对象已经在缓存空间中了,如果任一锁对象已经存在,那么lock fail,如果不存在,那么将所有锁对象存到缓存空间中。
     当任务执行完毕后,将之前锁定的锁对象集合,从缓存空间中移除;
     如果只是这样,那么redis的Set数据结构已经够用了,但是因为在Set中的数据没法做独立的过期时间设置,如果一个锁对象没有清除成功,那么其他任何线程都无法在对这个锁对象进行操作;
     所以这里使用Hash的key来存储锁对象集合,value是每个锁对象的过期时间
     那么当任务执行时,需要比对是否有锁对象已经在缓存空间的同时,如果存在,还要校验缓存空间的锁对象是否已经过期


/**
     * 同步执行任务
     *
     * @param leaseTime              锁占用释放时间,超时会自动释放锁对象, 如果锁对象太多,leaseTime不能小于上锁的耗时
     * @param timeUnit               时间单位
     * @param runnable               待执行的任务,不返回任何值
     * @param lockName             锁名称,避免不同地方使用同一个锁名称
     * @param multiLockValue         锁名称集合
     * @param lockAcquireFailMessage 锁获取失败的时候,log.error 的错误信息
     */
    public <T> void runWithMultiLock(int leaseTime, TimeUnit timeUnit, Runnable runnable, String lockName, List<T> multiLockValue, String lockAcquireFailMessage) {

        Long start = System.currentTimeMillis();

        Assert.notEmpty(multiLockValue, "multiLockValue can not be empty!");

        String[] multiLockValueStrArr = new String[multiLockValue.size()];
        int i = 0;
        for (T t : multiLockValue) {
            if (t == null) {
                throw new IllegalArgumentException("lock value can not be null!");
            }
            multiLockValueStrArr[i] = String.valueOf(t);
            ++i;
        }
        long afterConvert = System.currentTimeMillis();

        String realLockName = MULTI_LOCK_MAP_PREFIX + lockName;

        boolean lockSuccess = tryLockWithMultiLock(realLockName, multiLockValueStrArr, leaseTime, timeUnit);

        Long afterLock = System.currentTimeMillis();

        if (lockSuccess) {
            try {
                runnable.run();
            } finally {
                releaseMultiLock(realLockName, multiLockValueStrArr);
            }

            Long finished = System.currentTimeMillis();
            log.info("afterConvert:{}, lockTime:{}, releaseTime:{}", afterConvert - start, afterLock - afterConvert, finished - afterLock );

        } else {
            log.error("DistributionSyncJob execute error! lock require fail, 
 errorMsg:{}", lockAcquireFailMessage);
            throw new LockFailException();
        }
    }






    /**
     * 释放锁
     * @param lockName 锁名
     * @param multiLockValue 多个key值
     */
    public void releaseMultiLock(String lockName, String[] multiLockValue) {
        RMap<String, Long> originalLocksAndExpires = redissonClient.getMap(lockName);
        originalLocksAndExpires.fastRemove(multiLockValue);
    }


    /**
     * 上锁方法
     * 如果multiLockValue size 太大,那么可能执行上锁的时间太长
     * @param lockName 锁名
     * @param multiLockValue 多个key值
     * @param leaseTime 释放时间限制
     * @param timeUnit 时间单位
     * @return
     */
    public boolean tryLockWithMultiLock(String lockName, String[] multiLockValue, int leaseTime, TimeUnit timeUnit) {

        return this.execute(() -> {
            //原始的lock集合(RMap是没有本次缓存的,所以基于RMap的每次操作都是一次IO)
            RMap<String, Long> originalLocksAndExpires = redissonClient.getMap(lockName);
            //需要新增的lock集合
            HashMap<String, Long> addOnLockAndExpires = new HashMap<>(multiLockValue.length);
            //新增lock集合的stl时间
            long newExpireTime = System.currentTimeMillis() + timeUnit.toMillis(leaseTime);
            //在这里一次获取,缓存本地,而不是在for循环内,循环获取(循环IO)
            Map<String, Long> repeatLockAndExpires = originalLocksAndExpires.getAll(Arrays.stream(multiLockValue).collect(Collectors.toSet()));

            for (String lockValue : multiLockValue) {
                if (repeatLockAndExpires != null && ! repeatLockAndExpires.isEmpty()) {
                    Long expireTime = repeatLockAndExpires.get(lockValue);
                    if (expireTime != null
                            && expireTime > System.currentTimeMillis()) {
                        //如果任何一个lockValue已经存在,且过期时间大于当前时间,那么获锁失败
                        return false;
                    }
                }

                addOnLockAndExpires.put(lockValue, newExpireTime);
            }
            //新锁加入RMap中
            originalLocksAndExpires.putAll(addOnLockAndExpires);
            //重新设置map的整体过期时间
            originalLocksAndExpires.expire(defaultMultiLockMapExpireTime, defaultTimeUnit);

            if (newExpireTime <= System.currentTimeMillis()) {
                //整体的multiLock上锁时间,超过了multiLock的leaseTime,这意味着,上锁完成后,就已经部分锁失效了
                //所以不能算作上锁成功
                return false;
            }
            return true;

        }, lockName + "_OUT_LOCK");
    }

    改造后的性能趋势如图

改造后的性能趋势


4. 缺点 及 待优化:

  1. 还不支持象java原生锁一样能够在获取不到锁的时候阻塞住,直到获取锁;当然可以优化一下代码,做成循环尝试获取锁对象的方式,但是在大数据量的情况下并不划算;
  2. 目前还是只支持单节点,如果redis节点挂掉,那么就无法正常工作
  3. 如果没有把锁释放,程序崩溃了,那么可能这个锁对象会长时间在缓存空间中,虽然有过期时间,对别的线程影响不是特别大,但是还是会占用空间,成为废数据。以后可以加一个定时清除过期锁对象的定时任务。
原文地址:https://www.cnblogs.com/IC1101/p/14236373.html