Redisson分布式锁原理

分布式锁产生的场景

分布式锁在需要分布式同步的场景下使用,也就是在分布式系统下才能发挥作用,传统的单台系统使用java提供的锁,分布式场景多实例项目需要多个项目之间同步。

都有哪些实现方式

1 数据库锁:通过在数据库中创建一条记录,根据创建结果来判断是否上锁成功。实现简单但是数据库效率低。
2 redis实现:通过redis缓存中间件实现,比较繁琐,效率高。
2 zookeeper实现:通过临时节点实现,实现简单,失效时间可以控制。
效率:redis > zookeeper > 数据库

redis实现

判断 key 是否存在,如果不存在,设置key,加锁成功,并设置过期是时间,每过一段时间判断是否执行完,没执行完重设过期时间。
如果key存在,就循环判断,直到获取锁。

Redisson

Redisson是redis官方出的分布式锁工具。

基本使用方式如下:

Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient client = Redisson.create(config);
RLock lock = client.getLock("abcde");
lock.lock();
try {
    ...
} finally {
    lock.unlock();
}

创建客户端连接

根据给出的配置 创建连接
RedissonClient client = Redisson.create(config);
创建时包括,连接管理器、命令执行器和调度器

 protected Redisson(Config config) {
    this.config = config;
    Config configCopy = new Config(config);
    
    connectionManager = ConfigSupport.createConnectionManager(configCopy);
    commandExecutor = new CommandSyncService(connectionManager);
    evictionScheduler = new EvictionScheduler(commandExecutor);
}

获取锁

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    Long ttl = tryAcquire(leaseTime, unit);
    // 获取了锁 返回
    if (ttl == null) {
        return;
    }

    long threadId = Thread.currentThread().getId();
    // 订阅锁消息
    Future<RedissonLockEntry> future = subscribe(threadId);
    get(future);//

    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

tryAcquire 获取锁方法

<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command){
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +	
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +	
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +	
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +	
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

上边一段lua脚本可以看到redis获取锁的逻辑:
假设key:abcde,Lock的id为123456789 线程为thread1 ,有效期为 10
1 如果不存在 key:abcde
2 设置 abcde 然后里面一个键值对 123456789:thread1 值为 1
3 设置 abcde 有效期 10
4 如果存在abcde并且123456789:thread1
5 将123456789:thread1 的值加 1
6 重设过期时间

这里数据结构用的是redis的hash表
抢到锁后有一个线程会一直循环续命默认是10秒执行一次,一个锁默认是30秒
如果没有抢到锁:
subscribe 订阅锁的释放消息
然后循环获取锁

释放锁

public void unlock() {
    Boolean opStatus= commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    if (opStatus) {
        cancelExpirationRenewal();
    }
}

假设 参数1是key:abcde,参数2:redisson_lock__channel__{abcde},参数3:123456789:thread1

1 判断是否有abcde这个key
2 不存在广播解锁消息
3 存在key,判断key是否有123456789:thread1参数
4 存在参数将数值减一
5 判断减一后数字是否大于0
6 大于0 重设过期时间
7 不大于 删除key,并广播解锁消息,其他线程就可以开始抢占锁了

原文地址:https://www.cnblogs.com/paper-man/p/13284598.html