【DDD-Apwork框架】事件总线和事件聚合器

第一步:事件总线和事件聚合器

   【1】事件总线 IEventBus

   IUnitOfWork.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Keasy5.Infrastructure
{
    /// <summary>
    /// 表示所有集成于该接口的类型都是Unit Of Work的一种实现。
    /// </summary>
    /// <remarks>有关Unit Of Work的详细信息,请参见UnitOfWork模式:http://martinfowler.com/eaaCatalog/unitOfWork.html/// </remarks>
    public interface IUnitOfWork
    {
        /// <summary>
        /// 获得一个<see cref="System.Boolean"/>值,
        /// 该值表示当前的Unit Of Work是否支持Microsoft分布式事务处理机制。
        /// </summary>
        bool DistributedTransactionSupported { get; }
        /// <summary>
        /// 获得一个<see cref="System.Boolean"/>值,
        /// 该值表述了当前的Unit Of Work事务是否已被提交。
        /// </summary>
        bool Committed { get; }
        /// <summary>
        /// 提交当前的Unit Of Work事务。
        /// </summary>
        void Commit();
        /// <summary>
        /// 回滚当前的Unit Of Work事务。
        /// </summary>
        void Rollback();
    }
}
View Code

   IBus.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Keasy5.Infrastructure;

namespace Keasy5.Events.Bus
{
    /// <summary>
    /// Represents the message bus.
    /// </summary>
    public interface IBus : IUnitOfWork, IDisposable
    {
        Guid ID { get; }
        /// <summary>
        /// Publishes the specified message to the bus.
        /// </summary>
        /// <param name="message">The message to be published.</param>
        void Publish<TMessage>(TMessage message)
            where TMessage : class, IEvent;
        /// <summary>
        /// Publishes a collection of messages to the bus.
        /// </summary>
        /// <param name="messages">The messages to be published.</param>
        void Publish<TMessage>(IEnumerable<TMessage> messages)
            where TMessage : class, IEvent;
        /// <summary>
        /// Clears the published messages waiting for commit.
        /// </summary>
        void Clear();
    }
}
View Code

  接口: IEventBus.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Keasy5.Events.Bus
{
    public interface IEventBus : IBus
    {
    }
}
View Code

实现类:EventBus

EventBus.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Keasy5.Infrastructure;

namespace Keasy5.Events.Bus
{
    public class EventBus : DisposableObject, IEventBus
    {
        private readonly Guid id = Guid.NewGuid();
        private readonly ThreadLocal<Queue<object>> messageQueue = new ThreadLocal<Queue<object>>(() => new Queue<object>());
        private readonly IEventAggregator aggregator;
        private ThreadLocal<bool> committed = new ThreadLocal<bool>(() => true);
        private readonly MethodInfo publishMethod;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="aggregator"></param>
        /// <remarks>
        ///    参数IEventAggregator aggregator 的一个实现:EventAggregator<see cref="EventAggregator"/>
        /// </remarks>
        public EventBus(IEventAggregator aggregator)
        {

            this.aggregator = aggregator;

            //得到aggregator对象中的名为Publish的函数信息
            // 1.相关资料:C# 反射泛型
            //    http://www.cnblogs.com/easy5weikai/p/3790589.html
            publishMethod = (from m in aggregator.GetType().GetMethods()
                             let parameters = m.GetParameters()
                             let methodName = m.Name
                             where methodName == "Publish" &&
                             parameters != null &&
                             parameters.Length == 1
                             select m).First();
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                messageQueue.Dispose();
                committed.Dispose();
            }
        }

        #region IBus Members

        public void Publish<TMessage>(TMessage message)
            where TMessage : class, IEvent
        {
            messageQueue.Value.Enqueue(message);
            committed.Value = false;
        }

        public void Publish<TMessage>(IEnumerable<TMessage> messages)
            where TMessage : class, IEvent
        {
            foreach (var message in messages)
                Publish(message);
        }

        public void Clear()
        {
            messageQueue.Value.Clear();
            committed.Value = true;
        }

        #endregion

        #region IUnitOfWork Members

        public bool DistributedTransactionSupported
        {
            get { return false; }
        }

        public bool Committed
        {
            get { return committed.Value; }
        }

        public void Commit()
        {
            while (messageQueue.Value.Count > 0)
            {
                var evnt = messageQueue.Value.Dequeue();
                var evntType = evnt.GetType();
                //设置aggregator的publish方法的参数类型,
                //子所以这样做,是因为存在重载,传人参数类型以确定哪个具体的方法。
                var method = publishMethod.MakeGenericMethod(evntType);
                //调用aggregator对象的publish
                method.Invoke(aggregator, new object[] { evnt });
            }
            committed.Value = true;
        }

        public void Rollback()
        {
            Clear();
        }

        public Guid ID
        {
            get { return id; }
        }

        #endregion
    }
}
View Code

   【2】事件聚合器IEventAggregator

接口:IEventAggregator.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Keasy5.Events
{
    public interface IEventAggregator
    {
        void Subscribe<TEvent>(IEventHandler<TEvent> domainEventHandler)
            where TEvent : class, IEvent;
        void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers)
            where TEvent : class, IEvent;
        void Subscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers)
            where TEvent : class, IEvent;
        void Subscribe<TEvent>(Action<TEvent> domainEventHandlerFunc)
            where TEvent : class, IEvent;
        void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs)
            where TEvent : class, IEvent;
        void Subscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs)
            where TEvent : class, IEvent;
        void Unsubscribe<TEvent>(IEventHandler<TEvent> domainEventHandler)
            where TEvent : class, IEvent;
        void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers)
            where TEvent : class, IEvent;
        void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers)
            where TEvent : class, IEvent;
        void Unsubscribe<TEvent>(Action<TEvent> domainEventHandlerFunc)
            where TEvent : class, IEvent;
        void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs)
            where TEvent : class, IEvent;
        void Unsubscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs)
            where TEvent : class, IEvent;
        void UnsubscribeAll<TEvent>()
            where TEvent : class, IEvent;
        void UnsubscribeAll();
        IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>()
            where TEvent : class, IEvent;
        void Publish<TEvent>(TEvent domainEvent)
            where TEvent : class, IEvent;
        void Publish<TEvent>(TEvent domainEvent, Action<TEvent, bool, Exception> callback, TimeSpan? timeout = null)
            where TEvent : class, IEvent;
    }
    
}
View Code

 

实现类:EventAggregator

EventAggregator.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace Keasy5.Events
{
    public class EventAggregator : IEventAggregator
    {
        #region private property
        private readonly object sync = new object();
        private readonly Dictionary<Type, List<object>> eventHandlers = new Dictionary<Type, List<object>>();
        private readonly MethodInfo registerEventHandlerMethod;
        private readonly Func<object, object, bool> eventHandlerEquals = (o1, o2) =>
        {
            var o1Type = o1.GetType();
            var o2Type = o2.GetType();
            if (o1Type.IsGenericType &&
                o1Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>) &&
                o2Type.IsGenericType &&
                o2Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>))
                return o1.Equals(o2);
            return o1Type == o2Type;
        }; // checks if the two event handlers are equal. if the event handler is an action-delegated, just simply
        // compare the two with the object.Equals override (since it was overriden by comparing the two delegates. Otherwise,
        // the type of the event handler will be used because we don't need to register the same type of the event handler
        // more than once for each specific event. 
        #endregion

        #region Ctor

        public EventAggregator()
        {
            registerEventHandlerMethod = (from p in this.GetType().GetMethods()
                                          let methodName = p.Name
                                          let parameters = p.GetParameters()
                                          where methodName == "Subscribe" &&
                                          parameters != null &&
                                          parameters.Length == 1 &&
                                          parameters[0].ParameterType.GetGenericTypeDefinition() == typeof(IEventHandler<>)
                                          select p).First();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="handlers"></param>
        /// <remarks>
        /// 1.相关资料:C# 反射泛型
        ///    http://www.cnblogs.com/easy5weikai/p/3790589.html
        /// 2.    依赖注入:
        ///       <!--Event Aggregator-->
        ///          <register type="Keasy5.Events.IEventAggregator, Keasy5.Events" mapTo="Keasy5.Events.EventAggregator, Keasy5.Events">
        ///            <constructor>
        ///              <param name="handlers">
        ///                <array>
        ///                  <dependency name="orderDispatchedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderDispatchedEvent, Keasy5.Domain]], Keasy5.Events" />
        ///                  <dependency name="orderConfirmedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderConfirmedEvent, Keasy5.Domain]], Keasy5.Events" />
        ///                </array>
        ///              </param>
        ///            </constructor>
        ///          </register>
        /// </remarks>
        public EventAggregator(object[] handlers)
            : this()
        {
            foreach (var obj in handlers)
            {
                var type = obj.GetType();
                var implementedInterfaces = type.GetInterfaces();
                foreach (var implementedInterface in implementedInterfaces)
                {
                    if (implementedInterface.IsGenericType &&
                        implementedInterface.GetGenericTypeDefinition() == typeof(IEventHandler<>))
                    {
                        var eventType = implementedInterface.GetGenericArguments().First();
                        var method = registerEventHandlerMethod.MakeGenericMethod(eventType);
                        method.Invoke(this, new object[] { obj });
                    }
                }
            }
        } 
        #endregion

        #region interface IEventAggregator members
        public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) 
            where TEvent : class, IEvent
        {
            lock (sync)
            {
                var eventType = typeof(TEvent);
                if (eventHandlers.ContainsKey(eventType))
                {
                    var handlers = eventHandlers[eventType];
                    if (handlers != null)
                    {
                        if (!handlers.Exists(deh => eventHandlerEquals(deh, eventHandler)))
                            handlers.Add(eventHandler);
                    }
                    else
                    {
                        handlers = new List<object>();
                        handlers.Add(eventHandler);
                    }
                }
                else
                    eventHandlers.Add(eventType, new List<object> { eventHandler });
            }
        }

        public void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers)
            where TEvent : class, IEvent
        {
            foreach (var eventHandler in eventHandlers)
                Subscribe<TEvent>(eventHandler);
        }

        public void Subscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers)
            where TEvent : class, IEvent
        {
            foreach (var eventHandler in eventHandlers)
                Subscribe<TEvent>(eventHandler);
        }

        public void Subscribe<TEvent>(Action<TEvent> eventHandlerFunc)
            where TEvent : class, IEvent
        {
            Subscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc));
        }

        public void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs)
            where TEvent : class, IEvent
        {
            foreach (var eventHandlerFunc in eventHandlerFuncs)
                Subscribe<TEvent>(eventHandlerFunc);
        }

        public void Subscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs)
            where TEvent : class, IEvent
        {
            foreach (var eventHandlerFunc in eventHandlerFuncs)
                Subscribe<TEvent>(eventHandlerFunc);
        }

        public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler)
            where TEvent : class, IEvent
        {
            lock (sync)
            {
                var eventType = typeof(TEvent);
                if (eventHandlers.ContainsKey(eventType))
                {
                    var handlers = eventHandlers[eventType];
                    if (handlers != null &&
                        handlers.Exists(deh => eventHandlerEquals(deh, eventHandler)))
                    {
                        var handlerToRemove = handlers.First(deh => eventHandlerEquals(deh, eventHandler));
                        handlers.Remove(handlerToRemove);
                    }
                }
            }
        }

        public void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers)
            where TEvent : class, IEvent
        {
            foreach (var eventHandler in eventHandlers)
                Unsubscribe<TEvent>(eventHandler);
        }

        public void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers)
            where TEvent : class, IEvent
        {
            foreach (var eventHandler in eventHandlers)
                Unsubscribe<TEvent>(eventHandler);
        }

        public void Unsubscribe<TEvent>(Action<TEvent> eventHandlerFunc)
            where TEvent : class, IEvent
        {
            Unsubscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc));
        }

        public void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs)
            where TEvent : class, IEvent
        {
            foreach (var eventHandlerFunc in eventHandlerFuncs)
                Unsubscribe<TEvent>(eventHandlerFunc);
        }

        public void Unsubscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs)
            where TEvent : class, IEvent
        {
            foreach (var eventHandlerFunc in eventHandlerFuncs)
                Unsubscribe<TEvent>(eventHandlerFunc);
        }

        public void UnsubscribeAll<TEvent>()
            where TEvent : class, IEvent
        {
            lock (sync)
            {
                var eventType = typeof(TEvent);
                if (eventHandlers.ContainsKey(eventType))
                {
                    var handlers = eventHandlers[eventType];
                    if (handlers != null)
                        handlers.Clear();
                }
            }
        }

        public void UnsubscribeAll()
        {
            lock (sync)
            {
                eventHandlers.Clear();
            }
        }

        public IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>()
            where TEvent : class, IEvent
        {
            var eventType = typeof(TEvent);
            if (eventHandlers.ContainsKey(eventType))
            {
                var handlers = eventHandlers[eventType];
                if (handlers != null)
                    return handlers.Select(p => p as IEventHandler<TEvent>).ToList();
                else
                    return null;
            }
            else
                return null;
        }

        public void Publish<TEvent>(TEvent evnt)
            where TEvent : class, IEvent
        {
            if (evnt == null)
                throw new ArgumentNullException("evnt");
            var eventType = evnt.GetType();
            if (eventHandlers.ContainsKey(eventType) &&
                eventHandlers[eventType] != null &&
                eventHandlers[eventType].Count > 0)
            {
                var handlers = eventHandlers[eventType];
                foreach (var handler in handlers)
                {
                    var eventHandler = handler as IEventHandler<TEvent>;
                    if (eventHandler == null)
                        throw new ArgumentNullException("eventHandler");
                    if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false))
                    {
                        Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt);
                    }
                    else
                    {
                        eventHandler.Handle(evnt);
                    }
                }
            }
        }

        public void Publish<TEvent>(TEvent evnt,
            Action<TEvent, bool, Exception> callback,
            TimeSpan? timeout = null)
            where TEvent : class, IEvent
        {
            if (evnt == null)
                throw new ArgumentNullException("evnt");
            var eventType = evnt.GetType();
            if (eventHandlers.ContainsKey(eventType) &&
                eventHandlers[eventType] != null &&
                eventHandlers[eventType].Count > 0)
            {
                var handlers = eventHandlers[eventType];
                List<Task> tasks = new List<Task>();
                try
                {
                    foreach (var handler in handlers)
                    {
                        var eventHandler = handler as IEventHandler<TEvent>;
                        if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false))
                        {
                            tasks.Add(Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt));
                        }
                        else
                        {
                            eventHandler.Handle(evnt);
                        }
                    }
                    if (tasks.Count > 0)
                    {
                        if (timeout == null)
                            Task.WaitAll(tasks.ToArray());
                        else
                            Task.WaitAll(tasks.ToArray(), timeout.Value);
                    }
                    callback(evnt, true, null);
                }
                catch (Exception ex)
                {
                    callback(evnt, false, ex);
                }
            }
            else
                callback(evnt, false, null);
        } 
        #endregion
    }
}
View Code

第二步:使用:

private readonly IEventBus eventBus;
。。。。
eventBus.Publish<OrderDispatchedEvent>(evnt);
原文地址:https://www.cnblogs.com/easy5weikai/p/3791005.html