eShopOnContainers 看微服务⑤:消息通信

1.消息通信

传统的单体应用,组件间的调用都是使用代码级的方法函数。比如用户登录自动签到,增加积分。我们可以在登录函数调用积分模块的某个函数,为了解耦我们使用以来注入并放弃new Class()这种方式。但是不管哪种方式都是在同一个进程里。

讲一个单体应用改为微服务应用的最大挑战就是改变通信机制,直接把进程内方法调用改成服务间的 RPC 调用会导致在分布式环境中性能低下的、零散的和低效的通信。

通信类型

异步还是同步的:
• 同步协议。

  HTTP 是一种同步协议。客户端发起一个请求然后等待服务端响应。客户端的代码可以独立地实现同步(线程被阻塞)或异步(线程非阻塞,最终的响应通过回调来处理)的执行方式。这里的重点在于协议(HTTP / HTTPS)是同步的,客户端代码只能在收到 HTTP 服务端的响应后才可以继续先前的任务。•

异步协议。

  其他协议比如 AMQP(很多操作系统和云环境支持的一种协议)使用了异步消息。客户端代码或消息发送者通常不需要等待响应,只要把消息发送给 RabbitMQ 队列或其他消息代理。
第二个维度是看有单个接收者还是多个接收者:
  • 单个接受者。每个请求必须准确地被一个接收者或服务来处理,一个例子就是命令模式。
  • 多个接收者。每个请求能被 0 个或多个接收者来处理,这类通信必须是异步的,一个例子是事件驱动架构里的发布/订阅机制。它基于事件总线接口或消息代理在多个微服务间通过事件来传送数据。

2、事件总线

事件总线跟观察者(发布-订阅)模式非常相似也可以说是发布-订阅模式的一种实现,跟传统观察者的差别只是一个代码级一个是架构级的。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

为什么使用事件总线这种异步模式?

  异步整合方式增强微服务自治能力。

  创建微服务应用的重点在于整合微服务的方式。理想情况下,应该减少内部微服务间的通信,微服务之间的交互越少越好。核心规则是微服务间的交互需要异步。并不意味着一定要使用某种特定的协议(比如异步消息或同步的HTTP),只是表明微服务间通过异步传输数据来通信,但请不要依赖于其他内部微服务作为自己 HTTP请求/响应的一部分。
  如果可能,即便只是用于查询,也绝不要依赖微服务间的同步通信(请求/响应)。每个微服务的目标是自治的,对客户端是可用的,即使作为端到端应用一部分的其他服务发生故障或不稳定。如果您认为需要从一个微服务调用其他微服务(比如发起一个 HTTP 请求来查询数据)为客户端应用提供响应结果,那么这样的架构在其他微服务发生故障时就变得不稳定。此外,在微服务之间如果存在 HTTP 依赖,比如串联 HTTP 请求创建很长的请求/响应周期,这样不仅使您的微服务不能自治,而且一旦这个链条上的某个服务有性能问题,整个服务的性能都受到影响。微服务间添加的同步依赖(比如查询请求)越多,客户端应用的总响应时间就会越长。

工作原理

从上图可知,核心就4个角色:

  1. 事件(事件源+事件处理)
  2. 事件发布者
  3. 事件订阅者
  4. 事件总线

实现事件总线的关键是:

  1. 事件总线维护一个事件源与事件处理的映射字典;
  2. 通过单例模式,确保事件总线的唯一入口;
  3. 利用反射完成事件源与事件处理的初始化绑定;
  4. 提供统一的事件注册、取消注册和触发接口。

3. eshop的事件总线

EventBus Class Diagram

  事件源:IntegrationEvent,通过继承扩展这个类,完善事件的描述信息。

  事件处理:IIntegrationEventHandler,IDynamicIntegrationEventHandler,两个接口都定义了Handle方法来响应事件。IIntegrationEventHandler接收强类型的IntegrationEvent,IDynamicIntegrationEventHandler接收动态类型dynamic。

Integration Event(集成事件)。因为在微服务中事件的消费不再局限于当前领域内,而是多个微服务可能共享同一个事件,所以这里要和DDD中的领域事件区分开来。集成事件可用于跨多个微服务或外部系统同步领域状态,这是通过在微服务之外发布集成事件来实现的。

      事件总线:IEventBus,提供Publish用来发布事件,Subscriber用来订阅事件。

为了方便进行订阅管理,系统提供了额外的一层抽象IEventBusSubscriptionsManager,其用于维护事件的订阅和注销,以及订阅信息的持久化。其默认的实现InMemoryEventBusSubscriptionsManager就是使用内存进行存储事件源和事件处理的映射字典。

从类图中看InMemoryEventBusSubscriptionsManager中定义了一个内部类SubscriptionInfo,其主要用于表示事件订阅方的订阅类型和事件处理的类型。

我们来近距离看下InMemoryEventBusSubscriptionsManager的定义:

//定义的事件名称和事件订阅的字典映射(1:N)
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//保存所有的事件处理类型
private readonly List<Type> _eventTypes;
//定义事件移除后事件
public event EventHandler<string> OnEventRemoved;

//构造函数初始化
public InMemoryEventBusSubscriptionsManager()
{
    _handlers = new Dictionary<string, List<SubscriptionInfo>>();
    _eventTypes = new List<Type>();
}
//添加动态类型事件订阅(需要手动指定事件名称)
public void AddDynamicSubscription<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
{
    DoAddSubscription(typeof(TH), eventName, isDynamic: true);
}
//添加强类型事件订阅(事件名称为事件源类型)
public void AddSubscription<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = GetEventKey<T>();

    DoAddSubscription(typeof(TH), eventName, isDynamic: false);

    if (!_eventTypes.Contains(typeof(T)))
    {
        _eventTypes.Add(typeof(T));
    }
}
//移除动态类型事件订阅
public void RemoveDynamicSubscription<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
{
    var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
    DoRemoveHandler(eventName, handlerToRemove);
}

//移除强类型事件订阅
public void RemoveSubscription<T, TH>()
    where TH : IIntegrationEventHandler<T>
    where T : IntegrationEvent
{
    var handlerToRemove = FindSubscriptionToRemove<T, TH>();
    var eventName = GetEventKey<T>();
    DoRemoveHandler(eventName, handlerToRemove);
}

添加了这么一层抽象,即符合了单一职责原则,又完成了代码重用。IEventBus的具体实现通过注入对IEventBusSubscriptionsManager的依赖,即可完成订阅管理。

你这里可能会好奇,为什么要暴露一个OnEventRemoved事件?这里先按住不表,留给大家思考。

4. EventBusRabbitMQ实现EventBus

我们这里不纠结为什么使用RabbitMQ,其实可以替代的方案很多。我们只要知道RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。

EventBusRabbitMQ

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    const string BROKER_NAME = "eshop_event_bus";

    private readonly IRabbitMQPersistentConnection _persistentConnection;
    private readonly ILogger<EventBusRabbitMQ> _logger;
    private readonly IEventBusSubscriptionsManager _subsManager;
    private readonly ILifetimeScope _autofac;
    private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
    private readonly int _retryCount;

    private IModel _consumerChannel;
    private string _queueName;

    public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
        ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
    {
        _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
        _queueName = queueName;
        _consumerChannel = CreateConsumerChannel();
        _autofac = autofac;
        _retryCount = retryCount;
        _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
    }

    private void SubsManager_OnEventRemoved(object sender, string eventName)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);

            if (_subsManager.IsEmpty)
            {
                _queueName = string.Empty;
                _consumerChannel.Close();
            }
        }
    }
//....
}

构造函数主要做了以下几件事:

  1. 注入IRabbitMQPersistentConnection用来管理链接。
  2. 使用空对象模式注入IEventBusSubscriptionsManager,进行订阅管理。
  3. 创建消费者信道,用于消息消费。
  4. 注册OnEventRemoved事件,取消队列的绑定。(这也就回答了上面遗留的问题)

订阅:

        /// <summary>
        /// 动态类型订阅
        /// </summary>
        /// <typeparam name="TH"></typeparam>
        /// <param name="eventName"></param>
        public void SubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler
        {
            DoInternalSubscription(eventName);
            _subsManager.AddDynamicSubscription<TH>(eventName);
        }

        /// <summary>
        /// 强类型订阅
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <typeparam name="TH"></typeparam>
        public void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventKey<T>();
            DoInternalSubscription(eventName);
            _subsManager.AddSubscription<T, TH>();
        }

        /// <summary>
        /// rabbitmq队列的绑定。以eventName为routingKey进行路由
        /// </summary>
        /// <param name="eventName">事件名称</param>
        private void DoInternalSubscription(string eventName)
        {
            var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
            if (!containsKey)
            {
                if (!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }

                using (var channel = _persistentConnection.CreateModel())
                {
                    channel.QueueBind(queue: _queueName,
                                      exchange: BROKER_NAME,
                                      routingKey: eventName);
                }
            }
        }

发布:

        /// <summary>
        /// 发布
        /// </summary>
        /// <param name="event">事件</param>
        public void Publish(IntegrationEvent @event)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }
            //使用Polly进行重试
            var policy = RetryPolicy.Handle<BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                {
                    _logger.LogWarning(ex.ToString());
                });

            using (var channel = _persistentConnection.CreateModel())
            {
                var eventName = @event.GetType()
                    .Name;
                //使用direct全匹配、单播形式的路由机制进行消息分发
                channel.ExchangeDeclare(exchange: BROKER_NAME,
                                    type: "direct");
                //消息主体是json字符串
                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);

                policy.Execute(() =>
                {
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; // 进行消息持久化

                    channel.BasicPublish(exchange: BROKER_NAME,
                                     routingKey: eventName,
                                     mandatory:true,//告知服务器当根据指定的routingKey和消息找不到对应的队列时,直接返回消息给生产者。
                                     basicProperties: properties,
                                     body: body);
                });
            }
        }

 监听:

构造函数中有一句

_consumerChannel = CreateConsumerChannel();
        private IModel CreateConsumerChannel()
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }
            //创建信道Channel
            var channel = _persistentConnection.CreateModel();
            //申明Exchange使用direct模式
            channel.ExchangeDeclare(exchange: BROKER_NAME,
                                 type: "direct");
            //声明队列绑定Channel的消费者实例
            channel.QueueDeclare(queue: _queueName,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);


            var consumer = new EventingBasicConsumer(channel);
            //注册Received事件委托处理消息接收事件
            consumer.Received += async (model, ea) =>
            {
                var eventName = ea.RoutingKey;
                var message = Encoding.UTF8.GetString(ea.Body);
                //事件处理的逻辑
                await ProcessEvent(eventName, message);

                channel.BasicAck(ea.DeliveryTag,multiple:false);
            };
            //启动监听
            channel.BasicConsume(queue: _queueName,
                                 autoAck: false,
                                 consumer: consumer);

            channel.CallbackException += (sender, ea) =>
            {
                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
            };

            return channel;
        }

事件处理逻辑

       /// <summary>
        /// 事件处理逻辑
        /// </summary>
        /// <param name="eventName">事件名称</param>
        /// <param name="message">消息</param>
        /// <returns></returns>
        private async Task ProcessEvent(string eventName, string message)
        {
            if (_subsManager.HasSubscriptionsForEvent(eventName))
            {
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
                    var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                    foreach (var subscription in subscriptions)
                    {
                        if (subscription.IsDynamic)
                        {
                            //Event Handler实例
                            var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                            if (handler == null) continue;
                            //反序列化为动态类型
                            dynamic eventData = JObject.Parse(message);
                            //调用Handle方法
                            await handler.Handle(eventData);
                        }
                        else
                        {
                            //Event Handler实例
                            var handler = scope.ResolveOptional(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            //反序列化为强类型
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            //调用Handle方法
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }
                    }
                }
            }
        }

5. EventBus的使用

微服务的集成

各个Startup类中注册
①注册IRabbitMQPersistentConnection服务用于设置RabbitMQ连接

services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
    var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
    //...
    return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});

②注册单例模式的IEventBusSubscriptionsManager用于订阅管理

services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();

③ 注册单例模式的EventBusRabbitMQ

services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
    var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
    var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
    var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
    var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

    var retryCount = 5;
    if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
    {
        retryCount = int.Parse(Configuration["EventBusRetryCount"]);
    }

    return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});

④发布事件

若要发布事件,需要根据是否需要事件源(参数传递)来决定是否需要申明相应的集成事件,需要则继承自IntegrationEvent进行申明。然后在需要发布事件的地方进行实例化,并通过调用IEventBus的实例的Publish方法进行发布。

//事件源的声明
public class ProductPriceChangedIntegrationEvent : IntegrationEvent
{        
    public int ProductId { get; private set; }

    public decimal NewPrice { get; private set; }

    public decimal OldPrice { get; private set; }

    public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)
    {
        ProductId = productId;
        NewPrice = newPrice;
        OldPrice = oldPrice;
    }
}
//声明事件源
var priceChangedEvent = new ProductPriceChangedIntegrationEvent(1001, 200.00, 169.00)
//发布事件
_eventBus.Publish(priceChangedEvent)

 事件总线实现对象将被注入控制器构造函数

 

然后,您可以从控制器中的方法中使用它,如 UpdateProduct 方法:

在这种情况下,由于原始微服务是简单的 CRUD 微服务,因此该代码被直接放置在 Web API 控制器中。 在更高级的微服务中,比如使用 CQRS 方法时,它可以在提交原始数据之后,在CommandHandler 类中实现。 

⑤ 订阅事件
若要订阅事件,需要根据需要处理的事件类型,申明对应的事件处理类,继承自IIntegrationEventHandlerIDynamicIntegrationEventHandler,并注册到IOC容器。然后创建IEventBus的实例调用Subscribe方法进行显式订阅。

//定义事件处理
public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
{
    public async Task Handle(ProductPriceChangedIntegrationEvent @event)
    {
        //do something
    }
}
//事件订阅
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();

⑥跨服务事件消费

在微服务中跨服务事件消费很普遍,这里有一点需要说明的是如果订阅的强类型事件非当前微服务中订阅的事件,需要复制定义订阅的事件类型。换句话说,比如在A服务发布的TestEvent事件,B服务订阅该事件,同样需要在B服务复制定义一个TestEvent
这也是微服务的一个通病,重复代码。

6. 保证数据的一致性,事件日志持久化

我们使用事件总线处理了微服务间的异步通信问题。但是既然是异步通信那么就要考虑一致性问题。当遇到网络中断,系统断电,包括我们的服务异常等情况的时候怎么办。

比如当产品价格更改后,代码将数据提交给数据库,然后发布ProductPriceChangedIntegrationEvent 事件。

如果服务在数据库更新后崩溃,但又发生在集成事件成功发布前,就会导致本地微服务价格已成功更新,但集成事件未发布的问题。就会导致目录微服务中定义的价格和顾客购物车中缓存的价格不一致。

以上问题的关键在于是如何确保两个独立的操作的原子性。如果单从单体应用的角度来处理的话,我们完全是可以将他们放到同一个事务中去保证。然而在微服务中,就违背了其高可用的基本要求。因为一旦事件总线处于瘫痪状态,那么整个目录微服务就不可用了。这种强制通过事务保证的一致性,就引入了太多的问题依赖。

如果从微服务的角度来看,每个微服务负责各自的业务逻辑,对于目录微服务来说,它的关注点是产品的更新是否成功。至于借助事件总线通过异步事件实现微服务间的通信,并不是其关注点。这也就是关注点分离。换句话说,产品的更新不应该依赖外部状态。在这里,外部状态就是事件总线的可用性。

A、持久化事件源

解决上面的问题就要确保事件总线能够正确进行事件转发。
换句话说:事件总线挂了,但是事件消息不能丢失,这样我们还有机会挽救(重新发布消息)。这里我们就要对事件进行持久化了。
eShopOnContainers已经考虑了这一点,集成了事件日志用于持久化。
主要是定义了一个IntegrationEventLogEntry实体、EventStateEnum事件状态枚举和IntegrationEventLogContextEF上下文用于事件日志持久化。暴露IIntegrationEventLogService用于事件状态的更新。 

其他微服务通过在启动类中注册IntegrationEventLogContext即可完成事件日志的集成。

B、借助事件日志确保高可用

主要分两步走:

  1. 应用程序开始本地数据库事务,然后更新领域实体状态,并将集成事件插入集成事件日志表中,最后提交事务来确保领域实体更新和保存事件日志所需的原子性。
  2. 发布事件

第一步毋庸置疑,第二步发布事件,我们又有多种实现方式:

  1. 在提交事务后立即发布集成事件,并将其标记为已发布。当微服务发生故障时,可以通过遍历存储的集成事件(未发布)执行补救措施。
  2. 将事件日志表用作一种队列。使用单独的线程或进程查询事件日志表,将事件发布到事件总
    线,然后将事件标记为已发布。

这里很显然第二种方式更为稳妥。而eShopOnContainers出于简单考虑,采用了第一种方案,具体代码如下:

 

至此,eShopOnContainers确保事件总线能够正确转发消息的解决方案阐述完毕。你可能会问,这对应的是引言中的哪一种方案?都不是,你可以看作其是基于事件日志的简化版的事件溯源。

C、其它问题

通过持久化事件日志来避免事件发布失败导致的一致性问题,是一种有效措施。然而消息从发送到接收再到正常消费的过程中,每一个环节都可能故障,所以仅仅在消息发送端使用事件日志只是确保最终一致性的一小步。还有很多问题有待完善:

  1. 消息发送成功了,但未被成功接收
  2. 消息发送且成功接收,但未被正确消费
  3. 消息重复发送,导致多次消费问题
  4. 消息被多个微服务订阅,如何确保每个微服务都成功接收并消费
  5. 等等
这里要实现完整的代码其实还挺复杂的。大家可以参考下杨晓东的CAP方案
 
原文地址:https://www.cnblogs.com/tianyamoon/p/10206736.html