Redis之品鉴之旅(七)

分布式锁

1)阻塞锁:

尝试在redis中创建一个字符串结构缓存,方法传入的key,value为锁的过期时间timeout的时间戳。
若redis中没有这个key,则创建成功(即抢到锁),然后立即返回。
若已经有这个key,则先watch,然后校验value中的时间戳是否已经超过当前时间。
若已超过,则尝试使用提交事务的方式覆盖新的时间戳,事务提交成功(即抢到锁),然后立即返回;
若未超过当前时间或事务提交失败(即被别人抢到锁),

如果没有抢到锁,则进入 一个内部优化过的微循环,不断重试。

//这个是阻塞的锁
public static void Show(int i, string key, TimeSpan timeout)
{
	using (var client = new RedisClient("127.0.0.1", 6379, "12345", 10))
	{
		//尝试在redis中创建一个字符串结构缓存,方法传入的key,value为锁的过期时间timeout的时间戳。
        // 加了这句话,下面所有的代码都是单线程的执行
		using (var datalock = client.AcquireLock("DataLock:" + key,timeout))
		{
			//库存数量
			var inventory = client.Get<int>("inventoryNum");
			if (inventory > 0)
			{
				client.Set<int>("inventoryNum", inventory - 1);
				//订单数量
				var orderNum = client.Incr("orderNum");
				Console.WriteLine($"{i}抢购成功*****线程id:{ Thread.CurrentThread.ManagedThreadId.ToString("00")},库存:{inventory},订单数量:{orderNum}");
			}
			else
			{
				Console.WriteLine($"{i}抢购失败");
			}

			//client.Remove("DataLock:" + key);
			Thread.Sleep(100);

		}

	}

}

using完成方法调用或者显式调用dispose,都会直接清除key。

AcquireLock这句话对应的redis内部源码:

public RedisLock(IRedisClient redisClient, string key, TimeSpan? timeOut)
        {
            this.redisClient = redisClient;
            this.key = key;
    		//如果返回fasle,则进入一个内部优化过的微循环,不断重试
            ExecUtils.RetryUntilTrue(delegate
            {
                TimeSpan value = timeOut ?? new TimeSpan(365, 0, 0, 0);
                DateTime dateTime = DateTime.UtcNow.Add(value);
                string lockString = (dateTime.ToUnixTimeMs() + 1).ToString();
                //若redis中没有这个key,则创建成功(即抢到锁),然后立即返回。
                if (redisClient.SetValueIfNotExists(key, lockString))
                {
                    return true;
                }
				
                //若已经有这个key,则先watch,然后校验value中的时间戳是否已经超过当前时间
                redisClient.Watch(key);
                if (!long.TryParse(redisClient.Get<string>(key), out long result))
                {
                    redisClient.UnWatch();
                    return false;
                }
                //通过检查value中时间戳来判断是否过期,并不是利用redis在key上设置expire time来通过key的过期实现,下面的代码是靠事务实现的,如果key不存在了,事务也就不能使用了,所以这个过期时间使用的是value,而不是set方法设置的过期时间。
				//若未超过当前时间(即被别人抢到锁)
                if (result > DateTime.UtcNow.ToUnixTimeMs())
                {
                    redisClient.UnWatch();
                    return false;
                }
				//若已超过,则尝试使用提交事务的方式覆盖新的时间戳,事务提交成功(即抢到锁),然后立即返回;事务提交失败(即被别人抢到锁)
                using (IRedisTransaction redisTransaction = redisClient.CreateTransaction())
                {
                    redisTransaction.QueueCommand((Func<IRedisClient, bool>)((IRedisClient r) => r.Set(key, lockString)));
                    return redisTransaction.Commit();
                }
            }, timeOut);//传入的timeout还有一个作用,就是控制重试时间,重试超时后则抛异常。
        }

//内部优化过的微循环,不断重试,直到
public static void RetryUntilTrue(Func<bool> action, TimeSpan? timeOut = null)
        {
            int num = 0;
            DateTime utcNow = DateTime.UtcNow;
            while (!timeOut.HasValue || DateTime.UtcNow - utcNow < timeOut.Value)
            {
                num++;
                if (action())
                {
                    return;
                }

                SleepBackOffMultiplier(num);
            }

            throw new TimeoutException($"Exceeded timeout of {timeOut.Value}");
        }

可以看出,timeout有两个意思,1:如果成功加锁后锁的过期时间, :2:未成功加锁后阻塞等待的时间。数据锁服务通过检查value中时间戳来判断是否过期,并不是利用redis在key上设置expire time来通过key的过期实现。

2)非阻塞锁

尝试在redis中创建一个字符串结构缓存项,方法传入的key,value无意义,过期时间为传入的timeout。
若redis中没有这个key,则创建成功(即抢到锁),然后立即返回true。若已经有这个key,则立即返回false。
以上过程为全局单线程原子操作,整个过程为独占式操作。
IsLock可以检测key是否存在。

public static void Show(int i, string key, TimeSpan timeout)
{
	using (var client = new RedisClient("127.0.0.1", 6379, "12345", 10))
	{
		// 非阻塞加锁   如果已经存在当前的key,则执行失败,然后false
		// 把这个时间 timeout 设置长不就行了吗,但是你需要悠着点
		// 没有完全之策 ,一般在生产环境,给一个不要超过3s就可以
		bool isLocked = client.Add<string>("DataLock:" + key, key, timeout);
		if (isLocked)
		{
			try
			{
				//库存数量
				var inventory = client.Get<int>("inventoryNum");
				if (inventory > 0)
				{
					client.Set<int>("inventoryNum", inventory - 1);
					//订单数量
					var orderNum = client.Incr("orderNum");
					Console.WriteLine($"{i}抢购成功*****线程id:{ Thread.CurrentThread.ManagedThreadId.ToString("00")},库存:{inventory},订单数量:{orderNum}");
				}
				else
				{
					Console.WriteLine($"{i}抢购失败:原因,没有库存");
				}
			}
			catch
			{
				throw;
			}
			finally
			{
				client.Remove("DataLock:" + key);
			}
		}

		else
		{
			Console.WriteLine($"{i}抢购失败:原因:没有拿到锁");
		}
	}
}

注意:

timeout即成功加锁后锁的过期时间
利用redis在key上设置expire time来通过key的过期实现。
不要先用IsLock判断是否有锁再用Add加锁,因为这两个操作非原子性操作,期间会被其他操作干

针对上面的两种情况,非阻塞锁会出现库存卖不完的情况,但是性能比较高。

如果架构中采用微服务的形式,并且使用.net进行开发,肯定会选用上面两种情况实现。根据业务需求进行两种选择就可以了。

阻塞锁在asp.net core 3.1中的应用:demo

原文地址:https://www.cnblogs.com/vigorous/p/13558842.html