领域事件、集成事件、事件总线区别与关系

微软eShopOnContainers

eShopOnContainers是一个简化版的基于.NET Core和Docker等技术开发的面向微服务架构的参考应用
其中不仅包含了很多术语、设计模式、架构风格,还使用了一系列的常见技术(RabbitMQ、EventBus、IdentityServer4、Polly、Api Gateway、Redis、CQRS、CAP、CI/CD等),还有一些相关工具(Docker、K8S等)。可以说是一份全面的技术整合实现的应用参考。学习它会对涉及到的技术的实际运用有更加清晰的了解。

相对于eshoponcontainer的其他内容,它的领域事件、集成(整合)事件、事件总线之间的协作关系还是比较难懂的。接下来我们来分析一下。

领域事件(DomainEvent)

使用域事件显式实现域中的更改的副作用。 如果使用 DDD 术语表述,即使用域事件跨多个聚合显式实现副作用。 (可选)为了提高可伸缩性并减小对数据库锁定的影响,可在相同域的聚合之间使用最终一致性。

例如在 eShopOnContainers 应用程序中,当创建订单时,用户会成为买家,因此 OrderStartedDomainEvent 会被引发并在 ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler 中进行处理
eshop中,一个领域事件对应一个handler,使用mediator的INotificationHandler实现

域事件和集成事件是相同的:都是对已发生事件的通知。但是,它们的实现必须不同。域事件是推送到域事件调度程序的消息,可基于IoC 容器或任何其他方法作为内存中转存进程实现(就是Mediator)

集成事件的目的是将已提交事务和更新传播到其他子系统,无论它们是其他微服务、绑定上下文,还是外部应用程序。(集成事件是跨服务的,领域事件则不是)

例如:

  1. 当用户发起订单时,订单聚合将发送 OrderStarted 域事件。 OrderStarted 域事件基于标识微服务中的原始用户信息(包含 CreateOrder 命令中提供的信息),由买方聚合处理,以在订购微服务时创建买家对象。
  2. 每个 OrderItem 子实体可以在项目价格高于特定金额,或产品项目金额过高时,引发事件。 然后,聚合根可以接收这些事件,并执行全局计算Order的总额。
  3. 某系统中,当文章类目新增或者删除时,需要刷新缓存,从而实现统一,则可以在新增类目或者删除类目时,出发刷新换缓存的一个事件:
/// <summary>
/// 添加类目
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<CommandResult> Handle(CreateCategoryCommand request, CancellationToken cancellationToken)
{
    var rep = _uow.GetBaseRepository<Category>();
    var model = rep.GetFirstOrDefault(predicate:x => x.DisplayName == request.Name,disableTracking:true);
    if (model!=null)
    {
        return CommandResult.Fail("名称重复");
    }
    var entity = new Category(categoryName: request.Name, displayName: request.Name);
    var res= await rep.InsertAsync(entity);
    if (res.Entity!=null)
    {
        entity.AddCacheChangeDomainEvent(new List<string> { _cacheKeyMgr.PostCategoryListlKey() }); // 发出事件后,对应的handler会刷新缓存
        return CommandResult.Success();
    }
    return CommandResult.Fail("添加失败");
}

集成事件(IntegrationEvent)

基于事件的通信时,当值得注意的事件发生时,微服务会发布事件,例如更新业务实体时。 其他微服务订阅这些事件。 微服务收到事件时,可以更新其自己的业务实体,这可能会导致发布更多事件。这是最终一致性概念的本质。 通常通过使用事件总线实现来执行此发布/订阅系统。最终一致事务由一系列分布式操作组成。在每个操作中,微服务会更新业务实体,并发布可触发下一个操作的事件。

集成事件用于跨多个微服务或外部系统保持域状态同步。这可通过在微服务外发布集成事件完成。 将事件发布到多个接收方微服务时,每个接收方微服务中的相应事件处理程序会处理该事件。(消息队列的生产者和消费者)

集成事件是单个应用程序级别的,不建议跨应用使用同一个集成事件,这将导致事件来源混乱(微服务必须独立)

事件总线(EventBus)

事件总线可实现发布/订阅式通信,无需组件之间相互显式识别,
微服务 A 发布到事件总线,这会分发到订阅微服务 B 和 C,发布服务器无需知道订阅服务器。:

如何实现发布服务器和订阅服务器之间的匿名? 一个简单方法是让中转站处理所有通信。事件总线是一个这样的中转站。

事件总线通常由两部分组成:

  1. 抽象或接口。
  2. 一个或多个实现。(RabbitMQAzure.ServiceBuskafka等)

接口的功能很简单,就是只有发布(发布本系统的集成事件)和订阅(订阅其余子系统的事件),发布和订过程中自身无需知道时那些个子系统进行了参与。

领域事件、集成事件、事件总线的协作(以eshop中的实现为例)

三种事件相互作用,最终是为了解决整个微服务系统的最终一致性,微服务A自身数据发生了变化,那个这个变化所引起的一系列反应有可能导致整个系统产生个各种不同结果。为了确保结果与期望一致,就需要实现一致性,还有就是分布式系统当中的CAP原则。

eShopOnContainers中,领域事件激发时,对应的handler将此次事件的信息保存到集成事件的日志表中,保存这个操作,使用到了对应发生事件的领域实体所在的上下文的事务对象,以保证内部强一致性(在领域事件的handler中,通过获取实体db上下文的事务对象,将事件保存到日志记录表中)
具体实现:

// 此处OrderCancelledDomainEvent被激发时的处理程序 OrderCancelledDomainEventHandler.cs
public async Task Handle(OrderCancelledDomainEvent orderCancelledDomainEvent, CancellationToken cancellationToken)
        {
            _logger.CreateLogger<OrderCancelledDomainEvent>()
                .LogTrace("Order with Id: {OrderId} has been successfully updated to status {Status} ({Id})",
                    orderCancelledDomainEvent.Order.Id, nameof(OrderStatus.Cancelled), OrderStatus.Cancelled.Id);

            var order = await _orderRepository.GetAsync(orderCancelledDomainEvent.Order.Id);
            var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString());

            var orderStatusChangedToCancelledIntegrationEvent = new OrderStatusChangedToCancelledIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name);
            // 通过集成事件服务来保存此次领域事件的信息
            await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToCancelledIntegrationEvent);
        }
        
// 继承事件服务OrderingIntegrationEventService.cs
public async Task AddAndSaveEventAsync(IntegrationEvent evt)
{
    _logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})",
evt.Id, evt);
    // _orderingContext.GetCurrentTransaction() 获取实体发出域事件时当前上下问的事务对象
    // 由于获取上下文事务对象的存在,导致eshop中使用了TransactionBehaviour。确保了当前事务对象的存在。
    await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction());
}

然后再TransactionBehaviour中,提交事务,实体的更改和对应域事件都会被记录到数据库中。然后通过集成事件服务,根据次事务id查询出需要激发的领域事件数据,然后遍历操作:

  1. 标记为处理中
  2. 通过事件总线(消息队列或者其他组件进行发布)
  3. 标记为处理完成、遇到异常标记为失败(此处可增加policy的策略重试功能,但是会影响本次操作的响应时间),由后台任务定时重试这些失败的事件。确保最终一致性。但是消费者端的一致性无法保证(需要其他策略机行处理)。
原文地址:https://www.cnblogs.com/pluto-net/p/13970581.html