消息队列工具类(MSMQ)

所要做的是简化msmq的调用代码以及做到可替代性,实现后,调用消息队列代码变为如下所示:

QueueService srv = QueueService.Instance();

//检查存储DTO1的队列是否存在,如不存在则自动建立
srv.Prepare<DTO1>();

//发送类型为DTO1的消息
srv.Send<DTO1>(new DTO1() {  p1="1",  p2="2" });

//发送类型为DTO1的消息,并且将发送的消息Id保存到msgId变量中
string msgId=srv.Send<DTO1>(new DTO1() { p1 = "1", p2 = "2" });

//接收末尾消息
DTO1 msg = srv.Receive<DTO1>();

//接收末尾消息,并且将这个消息Id保存在msgId变量中
DTO1 msg = srv.Receive<DTO1>(ref msgId);

//发送回复消息,并且指定这个回复消息是特定消息ID所专有的回复消息
srv.SendResponse<DTO1>(msg, msgId);

//接收特定消息ID的回复消息
msg=srv.ReceiveResponse<DTO1>(msgId);

主要的地方有2个:

  • msmq消息大小限制的突破(4M突破)
  • 泛型T对象的序列化、反序列化

突破大小限制

  • 如果大小在4M内,则直接msmq封装(MessageLocation=InQueue)
  • 如果在4M外,则通过网络共享文件来封装(MessageLocation=InNetwork)

泛型T对象的序列化、反序列化

  • 固定住所要传递的对象类型为MessageWrapper
  • 在MessageWrapper内部嵌入用户想要传递的其他对象以及相应的type、module名,这样MessageWrapper就能进行自动xml化以及反xml化了

MessageWrapper代码如下:

public class MessageWrapper
    {
        private ShareFileBroker fileBroker;
        public MessageWrapper()
        {
            PersistenceType = MessageLocation.InQueue;
            fileBroker = new ShareFileBroker(FileService.FileService.Instance());
        }

        public string RealObjectType { get; set; }
        public string RealObjectModule { get; set; }
        public string RealObjectXml { get; set; }
        public string NetworkLocation { get; set; }
        public MessageLocation PersistenceType { get; set; }

        public void Inject<T>(T obj)
        {
            this.RealObjectType = typeof(T).FullName;
            this.RealObjectModule = typeof(T).Module.Name;
            string xml = SerializeUtils.Serialize2XML(typeof(T), obj);
            SaveXML(xml);
        }
        public T Extract<T>()
        {
            Assembly assembly = AppDomain.CurrentDomain.Load(this.RealObjectModule.TrimEnd(".dll".ToCharArray()));
            Type type = assembly.GetType(this.RealObjectType);
            string xml = GetXML();
            return (T)SerializeUtils.DeserializeFromXML(type, xml);
        }

        private string GetXML()
        {
            string xml = "";
            if (this.PersistenceType == MessageLocation.InQueue)
                xml = this.RealObjectXml;
            else if (this.PersistenceType == MessageLocation.InNetwork)
                xml = fileBroker.GetContentAndDelete(this.NetworkLocation);
            return xml;
        }
        private void SaveXML(string xml)
        {
            if (xml.Length > QueueConfiguration.QueueConfiguration.MaxQueueBodyLength)
            {
                this.NetworkLocation = fileBroker.Save(xml);
                this.PersistenceType = MessageLocation.InNetwork;
            }
            else
            {
                this.RealObjectXml = xml;
                this.PersistenceType = MessageLocation.InQueue;
            }
        }
    }

 代码比较简单,就不介绍了。

原文地址:https://www.cnblogs.com/aarond/p/QueueService.html