.Net 5 简单实现消息订阅和事件总线

  同样分两步走:先实现消息订阅和发布功能。相关概念我就不说了,Baidu一大堆。

先定义一下事件必须继承的接口:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace Syspetro.Core.PubSub
 8 {
 9     /// <summary>
10     /// 事件处理器接口
11     /// </summary>
12     /// <typeparam name="T"></typeparam>
13     public interface IEventHandler<T> where T : IEventBase
14     {
15         T DataSource { get; set; }
16         bool Run();
17         Task<bool> RunAsync();
18         bool Callback();
19     }
20 }

有了事件,是不是还需要给事件提供参数呢(消息)?接下来定义消息基类:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using System.Threading;
 7 using Syspetro.Core.ErrorException;
 8 
 9 namespace Syspetro.Core.PubSub
10 {
11     public interface IEventBase
12     {
13         /// <summary>
14         /// 事件发生的时间
15         /// </summary>
16         DateTime EventTime { get; set; }
17         /// <summary>
18         /// 等待时间
19         /// </summary>
20         int WaitingTime { get; set; }
21         /// <summary>
22         /// 重试次数
23         /// </summary>
24         int Retry { get; set; }
25         /// <summary>
26         /// 执行结束标志
27         /// </summary>
28         bool Finished { get; set; }
29         /// <summary>
30         /// 等待执行结果返回
31         /// </summary>
32         void Awaiting();
33     }
34 
35     /// <summary>
36     /// 事件源
37     /// </summary>
38     public interface IEventData<T> : IEventBase
39     {
40         /// <summary>
41         /// 触发事件需要的输入参数
42         /// </summary>
43         T EventSource { get; set; }
44     }
45     /// <summary>
46     /// 此处有点粗糙了,有待优化
47     /// </summary>
48     public class EventBase : IEventBase
49     {
50         public DateTime EventTime { get; set; } = DateTime.Now;
51         public int WaitingTime { get; set; } = 0;
52         public int Retry { get; set; } = 0;
53         public bool Finished { get; set; } = false;
54         public void Awaiting()
55         {
56             if (WaitingTime > 0)
57             {
58                 var oneTime = 0.2;
59                 var _WaitingTime = WaitingTime - oneTime;
60                 while (_WaitingTime > 0)
61                 {
62                     Thread.Sleep((int)(oneTime * 1000));
63                     _WaitingTime -= oneTime;
64                     if (_WaitingTime <= 0 && !Finished) throw new ExceptionTimeOut();
65                     if (Finished) _WaitingTime = 0;
66                 }
67             }
68         }
69     }
70 }

下面实现的功能就是订阅消息和发布事件了:

  1 using Microsoft.Extensions.Logging;
  2 using Syspetro.Core.Extensions;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Linq;
  6 using System.Linq.Expressions;
  7 using System.Reflection;
  8 using System.Text;
  9 using System.Threading.Tasks;
 10 
 11 namespace Syspetro.Core.PubSub
 12 {
 13     /// <summary>
 14     /// 事件处理器容器
 15     /// </summary>
 16     public sealed class EventHandlerContainer
 17     {
 18         private readonly IServiceProvider _serviceProvider;
 19         private readonly Dictionary<string, List<Type>> _mappings = new Dictionary<string, List<Type>>();
 20         public EventHandlerContainer()
 21         {
 22             _serviceProvider = InternalApp.ServicesProvider;
 23         }
 24         public static string GetEventKey(Type type, string group)
 25         {
 26             return type.Name + "_" + group;
 27         }
 28         public static string GetEventGroup(string key)
 29         {
 30             var keys = key.Split("_");
 31             if (keys.Length >= 2)
 32                 return keys[keys.Length-1];
 33             return null;
 34         }
 35         /// <summary>
 36         /// 订阅事件
 37         /// </summary>
 38         /// <typeparam name="T"></typeparam>
 39         /// <typeparam name="THandler"></typeparam>
 40         /// <param name="group"></param>
 41         public void Subscribe<T, THandler>(string group = null) where T : IEventBase where THandler : IEventHandler<T>
 42         {
 43             var name = GetEventKey(typeof(T), group);
 44             if (!_mappings.ContainsKey(name))
 45             {
 46                 _mappings.Add(name, new List<Type> { });
 47             }
 48             _mappings[name].Add(typeof(THandler));
 49         }
 50         /// <summary>
 51         /// 取消订阅事件
 52         /// </summary>
 53         /// <typeparam name="T"></typeparam>
 54         /// <typeparam name="THandler"></typeparam>
 55         public void Unsubscribe<T, THandler>(string group = null) where T : IEventBase where THandler : IEventHandler<T>
 56         {
 57             var name = GetEventKey(typeof(T), group);
 58             _mappings[name].Remove(typeof(THandler));
 59 
 60             if (_mappings[name].Count == 0)
 61             {
 62                 _mappings.Remove(name);
 63             }
 64         }
 65         #region 发布事件
 66         /// <summary>
 67         /// 
 68         /// </summary>
 69         /// <typeparam name="T"></typeparam>
 70         /// <param name="o"></param>
 71         /// <param name="group"></param>
 72         /// <returns></returns>
 73         public bool Publish<T>(T o, string group = null) where T : IEventBase
 74         {
 75             var name = GetEventKey(typeof(T), group);
 76             if (_mappings.ContainsKey(name))
 77             {
 78                 try
 79                 {
 80                     foreach (var handler in _mappings[name])
 81                     {
 82                         var bl = false;
 83                         var service = _serviceProvider.GetService(handler);
 84                         var _service = (IEventHandler<T>)service;
 85                         _service.DataSource = o;
 86                         bl = _service.Run();
 87                         if (bl) _service.Callback();
 88                         o.Finished = true;
 89                         return bl;
 90                     }
 91                 }
 92                 catch (Exception ex)
 93                 {
 94                     InternalApp.CreatLogger("").LogError("发布事件失败", ex);
 95                     return false;
 96                 }
 97                 finally
 98                 {
 99                     o.Finished = true;
100                 }
101             }
102             return false;
103         }
104         /// <summary>
105         /// 
106         /// </summary>
107         /// <typeparam name="T"></typeparam>
108         /// <param name="o"></param>
109         /// <param name="group"></param>
110         /// <returns></returns>
111         public async Task<bool> PublishAsync<T>(T o, string group) where T : IEventBase
112         {
113             var name = GetEventKey(typeof(T), group);
114             if (_mappings.ContainsKey(name))
115             {
116                 try
117                 {
118                     foreach (var handler in _mappings[name])
119                     {
120                         var service = _serviceProvider.GetService(handler);
121                         var _service = (IEventHandler<T>)service;
122                         _service.DataSource = o;
123                         var bl = await _service.RunAsync();
124                         if (bl) _service.Callback();
125                         o.Finished = true;
126                         return bl;
127                     }
128                 }
129                 catch (Exception ex)
130                 {
131                     InternalApp.CreatLogger("").LogError("发布事件失败", ex);
132                     return false;
133                 }
134                 finally
135                 {
136                     o.Finished = true;
137                 }
138             }
139             return false;
140         }
141         #endregion
142     }
143 }

到此就已经实现了消息订阅和发布,下面来看看事件总线,直接上代码吧:

  1 using System;
  2 using System.Collections.Concurrent;
  3 using System.Collections.Generic;
  4 using System.Linq;
  5 using System.Threading.Tasks;
  6 
  7 namespace Syspetro.Core.PubSub
  8 {
  9     public class EventBus
 10     {
 11         private readonly EventHandlerContainer eventHandler = new EventHandlerContainer();
 12         private readonly object Lock = new object();
 13         private bool IsTrigger { get; set; } = false;
 14         /// <summary>
 15         /// 定义线程安全集合
 16         /// </summary>
 17         private readonly ConcurrentDictionary<string, List<IEventBase>> _eventAndHandlerMapping = new ConcurrentDictionary<string, List<IEventBase>>();
 18         public void Subscribe<T, THandler>(string group = null) where T : IEventBase where THandler : IEventHandler<T>
 19         {
 20             eventHandler.Subscribe<T, THandler>(group);
 21         }
 22         private void SetTrigger(bool bl)
 23         {
 24             lock (Lock)
 25             {
 26                 IsTrigger = bl;
 27             }
 28         }
 29         /// <summary>
 30         /// 消息添加队列并尝试发布
 31         /// </summary>
 32         /// <typeparam name="TEventData"></typeparam>
 33         /// <param name="eventHandler"></param>
 34         /// <param name="group"></param>
 35         public void RegisterAsync<TEventData>(TEventData eventHandler, string group = null) where TEventData : IEventBase
 36         {
 37             var task = new Task(() =>
 38             {
 39                 Register(eventHandler, group);
 40             });
 41             task.Start();
 42             return;
 43         }
 44         /// <summary>
 45         /// 消息添加队列并尝试发布
 46         /// </summary>
 47         /// <typeparam name="TEventData"></typeparam>
 48         /// <param name="eventHandler"></param>
 49         /// <param name="group"></param>
 50         public void Register<TEventData>(TEventData eventData, string group = null) where TEventData : IEventBase
 51         {
 52             if (!IsTrigger)
 53             {
 54                 Trigger(eventData, group);
 55                 NextTrigger();
 56             }
 57             else
 58             {
 59                 var type = EventHandlerContainer.GetEventKey(typeof(TEventData), group);
 60                 if (!_eventAndHandlerMapping.TryGetValue(type, out List<IEventBase> handlerTypes) || handlerTypes == null)
 61                 {
 62                     handlerTypes = new List<IEventBase> { eventData };
 63                 }
 64                 else
 65                 {
 66                     handlerTypes.Add(eventData);
 67                 }
 68                 _eventAndHandlerMapping[type] = handlerTypes;
 69                 if (!IsTrigger) Trigger();
 70             }
 71         }
 72         /// <summary>
 73         /// 触发绑定的事件
 74         /// </summary>
 75         /// <param name="eventHandler"></param>
 76         public void Trigger()
 77         {
 78             if (!_eventAndHandlerMapping.IsEmpty)
 79             {
 80                 Trigger(_eventAndHandlerMapping.Keys.First());
 81             }
 82         }
 83         /// <summary>
 84         /// 根据事件源类型触发绑定的事件处理
 85         /// </summary>
 86         /// <param name="eventHandler"></param>
 87         private void Trigger(string key)
 88         {
 89             try
 90             {
 91                 if (_eventAndHandlerMapping.TryRemove(key, out List<IEventBase> events))
 92                 {
 93                     if (!IsTrigger)
 94                     {
 95                         SetTrigger(true);
 96                     }
 97                     foreach (var e in events)
 98                     {
 99                         var group = EventHandlerContainer.GetEventGroup(key);
100                         Trigger(e, group);
101                     }
102                 }
103             }
104             finally
105             {
106                 NextTrigger();
107             }
108         }
109         /// <summary>
110         /// 根据事件源触发绑定的事件处理
111         /// </summary>
112         /// <typeparam name="TEventData"></typeparam>
113         /// <param name="eventData"></param>
114         /// <param name="group"></param>
115         private void Trigger<TEventData>(TEventData eventData, string group) where TEventData : IEventBase
116         {
117             int retry = eventData.Retry;
118             if (!IsTrigger)
119             {
120                 SetTrigger(true);
121             }
122             if (!eventHandler.Publish(eventData, group))
123             {
124                 while (retry > 0)//重试
125                 {
126                     if (eventHandler.Publish(eventData, group)) break;
127                     retry--;
128                 }
129             }
130         }
131         private void NextTrigger()
132         {
133             if (!_eventAndHandlerMapping.IsEmpty)
134             {
135                 Trigger(_eventAndHandlerMapping.Keys.First());
136             }
137             else
138             {
139                 SetTrigger(false);
140             }
141         }
142     }
143 }

如何使用呢:

第一步是声明一个时间总线:

1 namespace Syspetro.Core.PubSub
2 {
3     public  class EventSchedulerController
4     {
5         public static readonly EventBus DefaultBus = new EventBus();
6     }
7 }

第二步定义自己的消息和事件如何执行:

 1 using SP.ToolMatLabDomain.AlgCore;
 2 using Syspetro.Core.PubSub;
 3 using Syspetro.Deploy.Service;
 4 
 5 namespace Sp.ToolMatLabScheduler.EventHandler
 6 {
 7     public class test : EventDataBase
 8     {
 9         public test()
10         {
11             //this.WaitingTime = 10;
12         }
13     }
14     public interface IFcc1T1EventHandler : ITransientService, IEventHandler<test>
15     {
16 
17     }
18     public class Fcc1T1EventHandler : EventHandlerBase<test>, IFcc1T1EventHandler
19     {
20         public override bool DoRun()
21         {
22             if (DataSource?.EventSource != null)//此处执行自己的业务逻辑
23             {
24                 var db = DataSource.EventSource.Data2["_"];
25                 var calBack = Cal_Fcc1T1.Get(db);
26                 DataSource.OutPut = calBack.Item2;
27                 DataSource.Msg = calBack.Item1;
28             }
29             return false;
30         }
31     }
32 }

第三步:Startup中订阅消息

 1 public void ConfigureServices(IServiceCollection services)
 2         {
 3 //方式1:按消息类型发布
 4             EventSchedulerController.DefaultBus.Subscribe<test, IFcc1T1EventHandler>();
 5 
 6 //方式2:相同消息分组分子发布
 7             EventSchedulerController.DefaultBus.Subscribe<EventDataBase, IFcc1T3EventHandler>("Fcc1T3");
 8             EventSchedulerController.DefaultBus.Subscribe<EventDataBase, IFcc1T4EventHandler>("Fcc1T4");
 9             EventSchedulerController.DefaultBus.Subscribe<EventDataBase, IFcc2EventHandler>("Fcc2");
10         }

第四步:如何触发事件呢(发布消息)

 1 using Microsoft.AspNetCore.Mvc;
 2 using Sp.ToolMatLabScheduler;
 3 using Sp.ToolMatLabScheduler.EventHandler;
 4 using Syspetro.Core.AppDeal;
 5 using Syspetro.Core.DynamicApi;
 6 using Syspetro.Core.PubSub;
 7 using Syspetro.Entity;
 8 using System;
 9 using System.Collections.Generic;
10 using System.Reflection;
11 
12 namespace SP.ToolMatLabApp
13 {
14     public class ServiceApp : BaseDynamicApi
15     {
16         [HttpPost]
17         public SpActionResult<MatLabRes> test([FromBody] MatLabRep param)
18         {
19             if (param == null)
20                 return new SpActionResult<MatLabRes>() { Result = false, Msg = "参数不可为空" };
21             return CalEventScheduler(new test { EventSource = param }, null);
22         }
23         [HttpPost]
24         public SpActionResult<MatLabRes> Cal([FromBody] MatLabRep param)
25         {
26             if (param == null)
27                 return new SpActionResult<MatLabRes>() { Result = false, Msg = "参数不可为空" };
28             return CalEventScheduler(new EventDataBase { EventSource = param }, param.CalName);
29         }
30         private SpActionResult<MatLabRes> CalEventScheduler(EventDataBase param, string group)
31         {
32             try
33             {
34                 if (param.EventSource != null)
35                 {
36                     EventSchedulerController.DefaultBus.RegisterAsync(param, group);
37 
38                     param.Awaiting();
39                     var ret = new MatLabRes() { Datas = param.OutPut, Msg = param.Msg };
40                     return new SpActionResult<MatLabRes>() { Data = ret };
41 
42                 }
43                 else
44                 {
45                     return new SpActionResult<MatLabRes>()
46                     {
47                         Msg = "计算出错了",
48                         Result = false
49                     };
50                 }
51             }
52             catch (Exception ex)
53             {
54                 return new SpActionResult<MatLabRes>()
55                 {
56                     Msg = ex.Message,
57                     Result = false
58                 };
59             }
60         }
61     }
62 }
作者:听枫xl
本文版权归作者和博客园共有,欢迎转载,但必须给出原文链接,并保留此段声明,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/xl-tf/p/14529518.html