设计系列事件总线

一、前言

  在C#中事件是基于委托的发布订阅机制,定义事件(基于windows消息处理机制),发布事件,订阅事件并且关联处理业务的逻辑。发布订阅机制提供一种天然的业务解耦方式,所以在系统使用事件定义业务状态、业务场景,让关联的业务订阅事件,当事件产生时订阅事件者执行业务,这样发布者不必知道订阅者具体的细节、订阅者也不必了解发布者。但是单纯的事件无法使用在生产中,主要是生产中的业务是复杂的,多样的,所以就希望设计一种框架来统一管理事件包括事件的存储、容错、重发、异步、路由等功能,对事件进行封装,设计成事件总线。

二、设计

  对事件总线进行抽象成如上接口关系图,IEventBus作为事件总线进行协调管理事件的发布订阅,相当于事件中心,承担的职责除了事件的发布和订阅外,包括对事件存储、容错、路由、重发等。

   

三、代码

  定义了一个完整事件总线,具体实现就是定义一种事件(TestEvent),并且定义该事件的处理器(TestEventHandler)。在事件总线(EventBus)中实现发布和订阅事件的功能,如果希望事件进行存储管理,还必须引入相应的存储事件的结构。完成上述具体实现后在使用时候,创建一个事件总线,将所有事件处理器注册到事件总线中,发布事件时候,事件总线依据发布的事件路由到注册的事件处理器执行业务。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TQF.EventBus.EventFramwork
{

    /// <summary>
    /// 事件消息接口,定义一个事件,事件载体
    /// </summary>
    public interface IEvent
    {
        /// <summary>
        /// 事件Id
        /// </summary>
        Guid EventId { get; }

        /// <summary>
        /// 事件时间
        /// </summary>
        DateTime EventDate { get; }
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TQF.EventBus.EventFramwork
{
    /// <summary>
    /// 事件处理接口,定义一个事件对应的处理逻辑
    /// </summary>
    public interface IEventHandler
    {
        /// <summary>
        /// 定义事件异步的处理器
        /// </summary>
        /// <param name="event">使用@区分关键字</param>
        /// <param name="cancellationToken">任务取消令牌,取消任务</param>
        /// <returns></returns>
        Task<bool> HandlerAsync(IEvent @event, CancellationToken cancellationToken = default);
        bool CanHandle(IEvent @event);
    }

    /// <summary>
    /// 泛型事件处理器
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IEventHandler<in T> : IEventHandler where T : IEvent
    {
        Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default);
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TQF.EventBus.EventFramwork
{
    /// <summary>
    /// 事件发布器,发布消息
    /// </summary>
    public interface IEventPublisher:IDisposable
    {
        /// <summary>
        /// 事件发布器,发布事件
        /// </summary>
        /// <typeparam name="TEvent"></typeparam>
        /// <param name="event">事件</param>
        /// <param name="cancellationToken">任务取消令牌,取消任务</param>
        /// <returns></returns>
        Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent;
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TQF.EventBus.EventFramwork
{
    /// <summary>
    /// 事件订阅器订阅消息
    /// </summary>
    public interface IEventSubscriber:IDisposable
    {
        void Subscriber();
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TQF.EventBus.EventFramwork
{
    /// <summary>
    /// 事件总线接口,定义事件通讯渠道,消息订阅功能,消息派发功能(消息的路由,过滤,选择)等。
    /// 通过事件总线管理两者
    /// </summary>
    public interface IEventBus:IEventPublisher,IEventSubscriber
    {
        
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TQF.EventBus.EventFramwork
{
    /// <summary>
    /// 定义事件传递的参数
    /// </summary>
    public class EventProcessedEventArgs:EventArgs
    {
        public IEvent Event { get; }

        public EventProcessedEventArgs(IEvent @event)
        {
            this.Event = @event;
        }
    }
}

四、总结

  1、事件总线的目的是对业务的解耦,通过设计合理的架构来达到这个目标,既然是合理的框架,就涉及到存储、容错、路由、重发问题处理,所以上述提供是一个事件总线雏形。

  2、对于模式和架构,一个模式,一个架构其构成的最小单位可以理解为对象,通过对对象的划分、割裂,单独成个体,通过关系连接这些个体,建立起一个模型->模式->架构。

原文地址:https://www.cnblogs.com/tuqunfu/p/15606395.html