Redis--分布式系统--封装Redis消息队列--流量削峰

一、未做消息队列
缺陷:用户秒杀,请求到了上游服务秒杀服务,然后上游服务调用下游服务下订单,减去库存,更新余额。
上游服务秒杀服务的并发量能力有10000,下游服务的并发量能力有1000,当真的客户端并发量是10000,上游服务秒杀服务能接收10000个请求,但是下游服务只能接收1000个请求,那么下游服务就宕机了。

二、配合消息队列

上游服务并发来了10000个请求,只把1000个请求写入消息队列。

三、封装Redis消息队列,优化流量请求

namespace MyRedisUnitty
{
    /// <summary>
    /// 封装Redis消息队列
    /// </summary>
    public class RedisMsgQueueHelper : IDisposable
    {
        /// <summary>
        /// Redis客户端
        /// </summary>
        public RedisClient redisClient { get; }

        public RedisMsgQueueHelper(string redisHost)
        {
            redisClient = new RedisClient(redisHost);
        }

        /// <summary>
        /// 入队
        /// </summary>
        /// <param name="qKeys">入队key</param>
        /// <param name="qMsg">入队消息</param>
        /// <returns></returns>
        public long EnQueue(string qKey, string qMsg)
        {
            //1、编码字符串
            byte[] bytes = System.Text.Encoding.UTF8.GetBytes(qMsg);

            //2、Redis消息队列入队
            long count = redisClient.LPush(qKey, bytes);

            return count;
        }

        /// <summary>
        /// 出队(非阻塞) === 拉
        /// </summary>
        /// <param name="qKey">出队key</param>
        /// <returns></returns>
        public string DeQueue(string qKey)
        {
            //1、redis消息出队
            byte[] bytes = redisClient.RPop(qKey);
            string qMsg = null;
            //2、字节转string
            if (bytes == null)
            {
                Console.WriteLine($"{qKey}队列中数据为空");
            }
            else
            {
                qMsg = System.Text.Encoding.UTF8.GetString(bytes);
            }

            return qMsg;
        }

        /// <summary>
        /// 出队(阻塞) === 推
        /// </summary>
        /// <param name="qKey">出队key</param>
        /// <param name="timespan">阻塞超时时间</param>
        /// <returns></returns>
        public string DeQueueBlock(string qKey, TimeSpan? timespan)
        {
            // 1、Redis消息出队
            string qMsg = redisClient.BlockingPopItemFromList(qKey, timespan);

            return qMsg;
        }

        /// <summary>
        /// 获取队列数量
        /// </summary>
        /// <param name="qKey">队列key</param>
        /// <returns></returns>
        public long GetQueueCount(string qKey)
        {
            return redisClient.GetListCount(qKey);
        }

        /// <summary>
        /// 关闭Redis
        /// </summary>
        public void Dispose()
        {
            redisClient.Dispose();
        }
    }
}

四、上游服务--使用Redis消息队列优化流量请求
发送消息,模拟下游可以接受请求量100,队列中数量超出100则抛出异常,否则写入队列。

namespace MyRedisFlowPeak.FlowLimit
{
    /// <summary>
    /// 秒杀上游服务:客户接受请求量100
    /// </summary>
    class SecKillUpstream
    {
        /// <summary>
        /// 处理的最大请求数
        /// </summary>
        private int HandlerRequestCounts = 100;

        /// <summary>
        /// 秒杀方法
        /// </summary>
        /// <param name="RequestCounts">请求数量</param>
        public void CreateSkillOrder(int requestCounts)
        {
            //1、创建秒杀订单
            Console.WriteLine($"秒杀请求数量:{requestCounts}");

            //如何使用Redis消息队列优化流量请求?
            //Redis优化
            using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
            {
                //1、循环写入队列
                for (int i = 0; i < requestCounts; i++)
                {
                    //1.1、获取消息队列数量
                    long count = msgQueue.GetQueueCount("My_Order");

                    //1.2、判断是否已满
                    if (count >= HandlerRequestCounts)
                    {
                        Console.WriteLine($"系统正常繁忙,请稍后...");
                    }
                    else
                    {
                        //1.3、写入队列订单编号
                        Console.WriteLine($"入队成功");
                        msgQueue.EnQueue("My_Order", "OrderNo:" + i + "");
                    }
                }
            }
        }
}

五、下游服务

消费上游消息,做下游服务的业务逻辑。

namespace MyRedisSecKillDownStream
{
    class Program
    {
        static void Main(string[] args)
        {
            //Redis优化
            using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
            {
                Console.WriteLine("上游消息......");
                //1、获取上游消息
                while (true)
                {
                    string rm_sms = msgQueue.DeQueueBlock("My_Order", TimeSpan.FromSeconds(60));

                    Console.WriteLine($"*****************开始秒杀下游业务调用**********************");
                    //2、下游业务逻辑,秒杀业务处理
                    SecKillDownstream secKillDownstream = new SecKillDownstream();
                    secKillDownstream.HandlerRequest(rm_sms);
                    Console.WriteLine($"*****************秒杀下游业务调用完成**********************");
                }
            }
        }
    }
}
namespace MyRedisSecKillDownStream.FlowLimit
{
    /// <summary>
    /// 下游服务:最大请求处理量为100
    /// </summary>
    class SecKillDownstream
    {
        /// <summary>
        /// 处理请求
        /// </summary>
        /// <param name="requestCount"></param>
        public void HandlerRequest(string requestCount)
        {
            Thread.Sleep(10);
            //1、创建订单
            Console.WriteLine($"秒杀订单创建成功");
            //2、扣减库存
            Console.WriteLine($"秒杀订单库存扣减生成");
            //3、扣减余额
            Console.WriteLine($"用户余额扣减成功");

            Console.WriteLine($"处理的请求数:{requestCount}");
        }
    }
}

六、调用客户端

namespace MyRedisFlowPeak
{
    class Program
    {
        static void Main(string[] args)
        {
            SecKillUpstream secKillUpstream = new SecKillUpstream();
            secKillUpstream.CreateSkillOrder(1000);
        }
    }
}

七、运行效果

、项目截图

原文地址:https://www.cnblogs.com/menglin2010/p/12981424.html