消息总线

  第一次接触,总结下。

  消息总线,即Message Bus,传递消息,消息即是数据。

  

  消息通道(宿主)监听发送应用程序发来的消息,将消息分发给接收应用程序(处理器),处理器对消息进行相应的业务处理。

  一、总体系统图

    

  二、总线服务说明

    

  1、为了实现热更新,使用配置文件。程序读取配置文件,加载程序集,通过反射,获得该实例,执行相应的操作。同时系统初始化时启动文件监听系统,对文件进行实时监听。若文件改变,则重新加载程序。

  2、在多宿主多处理器的情况下,为了解决宿主,消息队列与处理器的强耦合性,引入消息路由,通过宿主地址寻址去找到相对应的路由,具体的实现方式是:

  

  在每个宿主的配置文件中配置该宿主指定的处理器配置文件所在的路径。

  3、为了消息的持久化,启动宿主以及处理器对应用程序的监听,当应用程序退出时,宿主将接收到的还没有分发给其处理器的消息,以及处理器消息队列中还没有处理的消息进行本地化,以XML的格式写到磁盘中,同时记录消息写入的时间。

  4、消息通道(宿主)与发送应用程序的通信,使用TCP/IP短链接的方式。宿主监听端口,异步接收链接请求,处理消息。

  5、每个Host类中具体实现启动宿主方法Start(),目前是加载未发送的且木有过期的消息,启动宿主监听端口。

  6、在BaseHost类的构造函数中注入

        public BaseHost(FBUS.HostConfig.HostConfig config)
        {
            _HostConfig = config;
            //清空消息队列
            _MsgQueue.Clear();
            //注册退出监听
            System.Windows.Forms.Application.ApplicationExit += new EventHandler(OnApplicationExit);
            //生产该宿主的处理器实例
            string ProPath = Root + _HostConfig.HostProcessor.Dir;
            _ProcessorList = ProcessorFactory.CreateFactoryInstance(ProPath);
            //计时器机制:发送消息
            //SendTimer = new Timer(_SendMsg_TimeCallBack, null, DueTime, Timeout.Infinite);
            //委托处理机制:向处理器分发消息
            ListenHandler = new ListenMsqQueueEventHandler<MsgQueue>(_SendMsg_Handler);
        }

  宿主实时监听宿主的消息队列,当消息队列接收到新消息时,则向处理该消息的处理器分发该消息

  三、处理器说明

  

  1、BaseProcessor抽象实现IProcessor的处理消息方法HandleMsg(Queue<MsgQueue> MsgQueue),实时监听消息队列,当接收当消息时,则异步处理消息。

每个处理器显示调用基类构造函数,同时重写处理消息的HanleMsg(Queue<MsgQueue> MsgQueue)方法,对消息进行业务处理。

  2、向构造函数中注入application.exit事件,以及对消息队列监听事件;

        public BaseProcessor(ProcessorConfig proConfig)
        {
            //清空消息队列
            ProcessorMsgQueue.Clear();
            this.ProConfig = proConfig;
            //加载未处理的文件消息
            LoadMsgFromFile();
            //注册退出监听
            System.Windows.Forms.Application.ApplicationExit += new EventHandler(OnApplicationExit);
            //计时器处理机制:处理消息
            //ReceiveTimer = new System.Threading.Timer(_HandleMsg_TimerCallBack, null, DueTime, Timeout.Infinite);
            //事件处理机制:监听消息队列
            Listen += new ListenMsqQueueEventHandler<MsgQueue>(HandlerMsg);
            //ListenHandler = new ListenMsqQueueEventHandler<MsgQueue>(HandlerMsg);
            
        }

  3、宿主根据接收到的不同消息向不同的处理器分发消息,那么宿主接收到一条消息,怎么知道应该向哪个处理器发送消息呢?为了解决这个问题,那么在程序加载时需要处理器向宿主注册。

在开发时,一直在纠结处理器该如何向宿主注册呢,最后通过消息去注册,每个消息的key即是他所对应的处理器。

    /// <summary>
    /// 消息队列
    /// </summary>
    [Serializable]
    [XmlRoot("Queue")]
    public  class MsgQueue
    {
        /// <summary>
        /// 处理器
        /// </summary>
        [XmlElement("Key")]
        public string Key { get; set; }
        /// <summary>
        /// 消息内容
        /// </summary>
       [XmlElement("Contents")]
        public List< MsgContent> Content { get; set; }
    }

  附上一篇关于消息总线的好文章:http://www.infoq.com/cn/articles/message-based-distributed-architecture

原文地址:https://www.cnblogs.com/echogreat/p/4752819.html