【异步编程】实战之EventBus

事件驱动

  这篇文章记录一下我使用了两年多自研的EventBus,刚开始接触事件驱动程序的时候是结合状态机一起玩的,后来编程经验越来越足后,事件驱动就用的更多了。一方面,我们比较了解MQ,但是MQ作为中间件,在系统之间的事件传递是很有必要存在的。我所提及的事件驱动,特指进程内事件驱动。也就是进程事件总线。

  需求

  • 假设我们现在写的系统有异步需求
  • 希望利用事件进行组件之间的解耦合
  • 要保证程序具有极高的吞吐量要求

  那恭喜,这篇文章介绍的框架工具非常适合你用,称之为:事件框架,那为什么我需要这个世界框架呢?其实有经验的同学会发现,Spring其实也拥有发布事件和订阅事件的能力,而且还支持异步,支持拓展,我为何要自己写一个事件框架?

  因为:

  • 不要过分第三方,即使是Spring,因为这些代码不在你的控制范围内,例如你有中介者模式需求的时候;
  • 你可以得到比拓展Spring更高的拓展性,例如:你完全可以开发一个和Netty一样的EventLoop支持你的每一个事务;
  • 分离调度,采用Spring,你很难把过程的调度完全掌控在你手上,没有安全感;

  特色:

  • 自定义事件执行的顺序一致性
  • 支持事件的发送和接收者配置化
  • 支持对事件依赖对象的异步跟踪能力

事件框架

  事件框架的主要组件有:事件总线EventBus,事件中心EventBub,事件,事件主题,事件处理者EventHandler,下面分别介绍各个组件的作用;

  事件:Event,具体事件类,通常可以用一个值对象表示,但也可以用实例表示,用实例表示的时候可以带上事件源对象,用Object存事件源

  事件总线:EventBus,事件对外的接口。具有三个发送事件的核心方法:

    • 点对点模式,send(toObj , event , payload),其中toObj是处理者的实际引用
    • 点对点模式,send(toId , event , payload),其中toId是处理者的符号引用,这种是中介者模式的实现
    • 发布订阅模式,publish(event,payload),发布事件出去,订阅该事件的所有EventHandler都可以收到消息处理。

  事件源:创建事件的对象,使用事件总线发送事件出去的对象;

  事件处理者:EventHandler,事件处理者,实现该接口的对象都是事件可以者,可以支持被注册到事件主题中。

  事件主题:发布订阅模式的主题,一个事件类唯一对应一个事件主题类,事件主题类主要持有订阅各种事件处理者的集合;

  事件中心:EventHub,主要作用有:

    • 用作调度,异步执行事件;
    • 创建事件主题,管理维护事件主题;
    • 注册事件处理者,用作中介者模式实现;也就是为了支持 点对点模式 send(toId , event , payload)而诞生的

  事件循环:EventLoop,一个线程对应一个Loop,里面有一个事件队列等待处理,无事件的时候会阻塞。通常一个事务绑定一个Loop,而一个Loop对应多个事务。这样可以保证事务线程安全,带同步功能,具体可以参考Netty的设计,就是把channel绑定在一个EventLoop上面的。

  看一下类图,下面类图反应了部分类的关系:

设计模式

  上面提到了很多次中介者模式,下面提下这个框架主要用到的设计模式

  中介者模式  

  中介者模式的核心是一个Map,也就是所有对象之间的通讯不是直接耦合,而是通过一个中介者类通讯,达到解耦合目的,使得架构成功星型结构。

  1. 所以每一个需要被通知的对象,都要实现一个同一个抽象接口,
  2. 然后把自己注册到中介者类中,中介者类用一个Map作为容器来存储维护被通讯对象的引用,key为被通知者唯一ID,value为被通知者。
  3. 而通知者只需知道被通知者的ID就可以依赖中介者,委托中介调用被通知者的抽象方法。

  观察者模式

  这个比较简单了,EventSubject就是被观察者,EventHandler就是观察者,而Event就是他们更新和需要同步的数据。

  下面给出一段代码,表示整个事件总线接口哪个是使用哪种模式:

/**
 * 消息总线
 * @date 2019年1月9日
 */
@Component
public class EventBus  {


  public EventBus() {
  }

  /**
   * 直接发送到EventBus中
   *
   * @param eventType
   * @param toItem
   * @param object
   */
  public void push(Object toItem, int eventType, Object payload) {
    ......
  }

  /**
   * 观察者模式,也就是发布订阅模式,支持自动创建事件主题
   *
   * @param eventType
   * @param object
   */
  public void publish(int eventType, Object payload) {
    ......
  }

  /**
   * 点对点模式,中介者的使用接口
   *
   * @param eventType
   * @param toItem
   * @param object
   */
  public void push(String toItemId, int eventType, Object payload) {
    ......
  }


}

结合Netty

  我们一般作的应用都会作为服务发布出去,Java搭建微服务最快的方式无疑是SpringBoot,但我们知道,如果我们的系统采用领域内事件,那么就注定了我们的设计是异步的了。那怎么办?其实这是一个异步实现同步的问题。

  方法1:用一个线程,发送异步的时候阻塞等待条件condition,等异步线程返回后notify该condition激活阻塞线程即可,这种方法编程简单,但不可取,因为线程开销很大。

  方法2:使用NIO,Netty就是我们最好的朋友了。很简单,把pipeline存起开,然后异步线程处理完后,直接把回复的任务pipeline.writeAndFlush的任务提交到对应的EventLoop中即可。

后面有具体的案例,请看我的文章

【异步编程】实战之Netty不二之选

后期会持续细化这个框架...........

原文地址:https://www.cnblogs.com/iCanhua/p/12521531.html