(原)NET CORE3.1 基于BlockingCollection实现内存消息对列

原文链接地址:https://www.cnblogs.com/ruyun/p/12219054.html

利用BlockingCollection的阻塞特性,实现的内存消息队列,并没有进过测试,仅是学习使用,实现思路及代码参考了MediatR源码,有兴趣的同学可以下载源码学习研究

利用MediatR.Extensions.Microsoft.DependencyInjection框架中的ServiceRegistrar类可以自动扫描IConsumer接口的所有实现类,并注入到IServiceCollection 中,当IMessageQueue中Publish一个消息时,从IServiceCollection中获取消息消费者并将消息传递过去

  • 定义消费者接口
    /// <summary>
    /// 消费者接口
    /// </summary>
    /// <typeparam name="TMessage"></typeparam>
    public interface IConsumer<in TMessage> where TMessage : MessageArgument
    {
        public string QueueName { get; }
        void Handle(TMessage message, ServiceFactory serviceFactory);
    }
  • 定义消息对列
    /// <summary>
    /// 消息对列
    /// </summary>
    public interface IMessageQueue
    {
        /// <summary>
        /// 发布消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="message"></param>
        bool Publish<T>(string queueName, T message) where T : MessageArgument;
    }
    
    /// <summary>
    /// 基于内存消息对列
    /// </summary>
    public class MemoryMessageQueue : IMessageQueue
    {
        private readonly ServiceFactory serviceFactory;
        private ConcurrentDictionary<string, MessageQueueHandler> pairs;

        public MemoryMessageQueue(ServiceFactory serviceFactory)
        {
            this.pairs = new ConcurrentDictionary<string, MessageQueueHandler>();
            this.serviceFactory = serviceFactory;
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="message"></param>
        public bool Publish<TMessage>(string queueName, TMessage message) where TMessage : MessageArgument
        {
            // 获取消息实际处理类
            var queue = pairs.GetOrAdd(queueName, (queueName) =>
            {
                return (MessageQueueHandler)Activator.CreateInstance(typeof(MessageQueueHandlerImpl<>).MakeGenericType(message.GetType()));
            });
            // 获取实现消费者
            var consumers = serviceFactory.GetServices<IConsumer<TMessage>>();
            var consumer = consumers.FirstOrDefault(w => w.QueueName == queueName);
            if (consumer == null)
                throw new InvalidOperationException($"无效的对列名称:{queueName}");

            // 绑定消费者
            if (!queue.IsBindReceivedEvent)
                queue.BindReceivedEvent((message, service) => consumer.Handle((TMessage)message, service), serviceFactory);

            return queue.Publish(message);
        }
    }
  • 定义消息参数
    /// <summary>
    /// 消息
    /// </summary>
    public abstract class MessageArgument
    {
        public Guid Id { get; }
        /// <summary>
        /// 时间戳 (UTC 1970-01-01 00:00:00)
        /// </summary>
        public long Timestamp { get; }

        protected MessageArgument()
        {
            Id = Guid.NewGuid();
            Timestamp = DateTime.UtcNow.GetTimeStamp();
        }
    }

    internal static class UtilHelper
    {
        /// <summary>
        /// 获取时间戳
        /// </summary>
        /// <returns></returns>
        public static long GetTimeStamp(this DateTime date)
        {
            TimeSpan ts = date - new DateTime(1970, 1, 1, 0, 0, 0, 0);
            return Convert.ToInt64(ts.TotalMilliseconds);
        }
    }
  • 消息实际处理者
    /// <summary>
    /// 消息实际处理者
    /// </summary>
    internal abstract class MessageQueueHandler
    {
        /// <summary>
        /// 返回一个值,该值指示当前对列名称是否绑定消费者
        /// </summary>
        public abstract bool IsBindReceivedEvent { get; }

        /// <summary>
        /// 绑定消费者,并启动线程等待消息发布
        /// </summary>
        /// <param name="received"></param>
        /// <param name="serviceFactory"></param>
        public abstract void BindReceivedEvent(Action<MessageArgument, ServiceFactory> received, ServiceFactory serviceFactory);

        /// <summary>
        /// 返回一个值,该值指示当前消息发布是否成功
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public abstract bool Publish(MessageArgument message);
    }

    internal class MessageQueueHandlerImpl<TMessage> : MessageQueueHandler where TMessage : MessageArgument
    {
        private BlockingCollection<TMessage> _queue;
        private Action<MessageArgument, ServiceFactory> _received;

        public MessageQueueHandlerImpl()
        {
            _queue = new BlockingCollection<TMessage>();
        }

        public override bool IsBindReceivedEvent
        {
            get => this._received != null;
        }

        [DebuggerStepThrough]
        public override void BindReceivedEvent(Action<MessageArgument, ServiceFactory> received, ServiceFactory serviceFactory)
        {
            this._received = received;
            if (_received != null)
            {
                Task.Factory.StartNew(() =>
                {
                    // 阻塞监听对列中是否存在需要处理的消息
                    while (!_queue.IsCompleted)
                    {
                        if (_queue.TryTake(out TMessage args))
                            _received(args, serviceFactory);
                    }
                });
            }
        }

        public override bool Publish(MessageArgument message)
        {
            _queue.Add((TMessage)message);
            return true;
        }
    }
 public delegate object ServiceFactory(Type serviceType);

    public static class ServiceFactoryExtensions
    {
        public static T GetService<T>(this ServiceFactory factory) => (T)factory(typeof(T));
        public static IEnumerable<T> GetServices<T>(this ServiceFactory factory) => (IEnumerable<T>)factory(typeof(IEnumerable<T>));
    }

调用如下

    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureAppConfiguration((context, configure) =>
                {
                    configure
                        .SetBasePath(AppDomain.CurrentDomain.BaseDirectory)
                        .AddJsonFile("appsettings.json", true, true)
                        .AddJsonFile($"appsettings.{context.HostingEnvironment}.json", true, true)
                        .AddEnvironmentVariables();
                })
                .ConfigureLogging((context, logging) =>
                {
                    logging.AddConsole();
                    logging.AddDebug();
                })
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                    services.AddMemoryQueue(typeof(Program).Assembly);
                });
    }
    
    public class Worker : BackgroundService
    {
        private readonly IMessageQueue _queue;

        public Worker(IMessageQueue queue)
        {
            this._queue = queue;
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                //_queue.Publish("TestConsumer2", new TestMessage2(DateTime.Now.ToString()));
                _queue.Publish("TestConsumer3", new TestMessage(DateTime.Now.ToString()));
                _queue.Publish("TestConsumer", new TestMessage(DateTime.Now.ToString()));
            }
            return Task.CompletedTask;
        }
    }
public class TestMessage : MessageArgument
    {
        public string body;

        public TestMessage(string body)
        {
            this.body = body;
        }
    }
    public class TestMessage2 : TestMessage
    {
        public TestMessage2(string body) : base(body)
        {
        }
    }

    public class TestConsumer : IConsumer<TestMessage>
    {
        private readonly ILogger<TestConsumer> logger;

        public TestConsumer(ILogger<TestConsumer> logger)
        {
            this.logger = logger;
        }

        public string QueueName => "TestConsumer";

        public void Handle(TestMessage message, ServiceFactory serviceFactory)
        {
            Thread.Sleep(100);
            logger.LogInformation($"Logger TestConsumer Handle --- {message.body} -- {message.Timestamp} -- {message.Id}");
        }
    }
    public class TestConsumer3 : IConsumer<TestMessage>
    {
        private readonly ILogger<TestConsumer> logger;

        public TestConsumer3(ILogger<TestConsumer> logger)
        {
            this.logger = logger;
        }
        public string QueueName => "TestConsumer3";

        public void Handle(TestMessage message, ServiceFactory service)
        {
            Thread.Sleep(1000);
            var aaa = service.GetService<ILogger<TestConsumer>>();
            logger.LogWarning($"TestConsumer3 Handle --- {message.body} -- {message.Timestamp} -- {message.Id}");
        }
    }


    //public class TestConsumer2 : IConsumer<TestMessage2>
    //{
    //    public string QueueName => "TestConsumer2";
    //    public void Handle(TestMessage2 message, ServiceFactory serviceFactory)
    //    {
    //        Console.WriteLine($"TestConsumer2 Handle --- {message.body} -- {message.Timestamp} -- {message.Id}");
    //    }
    //}

全部代码

原文地址:https://www.cnblogs.com/ruyun/p/12219054.html