消息队列

消息队列简介及应用 

MSMQ(MicroSoft Message Queue,微软消息队列)是在多个不同的应用之间实现相互通信的一种异步传输模式,相互通信的应用可以分布于同一台机器上,也可以分布于相连的网络空间中的任一位置。
它的实现原理是:
    消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存
至一个系统公用空间的消息队列(Message Queue)中;本地或者是异地的消息接收程序再从该队列中取出发给它
的消息进行处理。
在消息传递机制中,有两个比较重要的概念。一个是消息,一个是队列。消息是由通信的双方所需要传递的信息,它可以是各式各样的媒体,如文本、声音、图象等等。
消息最终的理解方式,为消息传递的双方事先商定,这样做的好处是,一是相当于对数据进行了简单的加密,二则采用自己定义的格式可以节省通信的传递量。消息可以含有发送和接收者的标识,这样只有指定的用户才能看到只传递给他的信息和返回是否操作成功的回执。消息也可以含有时间戳,以便于接收方对某些与时间相关的应用进行处理。消息还可以含有到期时间,它表明如果在指定时间内消息还未到达则作废,这主要应用与时间性关联较为紧密的应用。
    消息队列是发送和接收消息的公用存储空间,它可以存在于内存中或者是物理文件中。
    消息可以以两种方式发送,即快递方式(express)和可恢复模式(recoverable),它们的区别在于,快递方式为了消息的快速传递,把消息放置于内存中,而不放于物理磁盘上,以获取较高的处理能力;可恢复模式在传送过程的每一步骤中,都把消息写入物理磁盘中,以得到较好的故障恢复能力。
    消息队列可以放置在发送方、接收方所在的机器上,也可以单独放置在另外一台机器上。正是由于消息队列在放置方式上的灵活性,形成了消息传送机制的可靠性。当保存消息队列的机器发生故障而重新启动以后,以可恢复模式发送的消息可以恢复到故障发生之前的状态,而以快递方式发送的消息则丢失了。另一方面,采用消息传递机制,发送方必要再担心接收方是否启动、是否发生故障等等非必要因素,只要消息成功发送出去,就可以认为处理完成,而实际上对方可能甚至未曾开机,或者实际完成交易时可能已经是第二天了。
采用MSMQ带来的好处是:
    由于是异步通信,无论是发送方还是接收方都不用等待对方返回成功消息,就可以执行余下的代码,因而大大地提高了事物处理的能力;当信息传送过程中,信息发送机制具有一定功能的故障恢复能力;MSMQ的消息传递机制使得消息通信的双方具有不同的物理平台成为可能。
在微软的.net平台上利用其提供的MSMQ功能,可以轻松创建或者删除消息队列、发送或者接收消息、甚至于对消息队列进行管理。
在.NET产品中,提供了一个MSMQ类库“System.Messaging.dll”。它提供了两个类分别对消息对象和消息队列对象
进行操作。在能够使用MSMQ功能之前,你必须确定你的机器上安装了MSMQ消息队列组件,并确保服务正在运行中。在使用ASP.NET编程时,应在头部使用:
<%@ Assembly Name=”System.Messaging”%>
<%@ Import NameSpace=”System.Messsaging”%>

    利用
MSMQMicrosoft Message Queue),应用程序开发人员可以通过发送和接收消息方便地与应用程序进行快速可靠的通信。消息处理为您提供了有保障的消息传递和执行许多业务处理的可靠的防故障方法。

MSMQXML Web Services.Net Remoting一样,是一种分布式开发技术。但是在使用XML Web Services.Net Remoting组件时,Client端需要和Server端实时交换信息,Server需要保持联机。MSMQ则可以在Server离线的情况下工作,将Message临时保存在Client端的消息队列中,以后联机时再发送到Server端处理。

显然,MSMQ不适合于Client需要Server端及时响应的这种情况,MSMQ以异步的方式和Server端交互,不用担心等待Server端的长时间处理过程。

虽然XML Web Services.Net Remoting都提供了[OneWay]属性来处理异步调用,用来解决Server端长方法调用长时间阻碍Client端。但是不能解决大量Client负载的问题,此时Server接受的请求快于处理请求。

一般情况下,[OneWay]属性不用于专门的消息服务中。

1. 基本术语和概念(Basic terms and concepts

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

“消息队列”是 Microsoft 的消息处理技术,它在任何安装了 Microsoft Windows 的计算机组合中,为任何应用程序提供消息处理和消息队列功能,无论这些计算机是否在同一个网络上或者是否同时联机。

“消息队列网络”是能够相互间来回发送消息的任何一组计算机。网络中的不同计算机在确保消息顺利处理的过程中扮演不同的角色。它们中有些提供路由信息以确定如何发送消息,有些保存整个网络的重要信息,而有些只是发送和接收消息。

“消息队列”安装期间,管理员确定哪些服务器可以互相通信,并设置特定服务器的特殊角色。构成此“消息队列”网络的计算机称为“站点”,它们之间通过“站点链接”相互连接。每个站点链接都有一个关联的“开销”,它由管理员确定,指示了经过此站点链接传递消息的频率。

“消息队列”管理员还在网络中设置一台或多台作为“路由服务器”的计算机。路由服务器查看各站点链接的开销,确定经过多个站点传递消息的最快和最有效的方法,以此决定如何传递消息。

2. 队列类型(Queue Type

有两种主要的队列类型:由您或网络中的其他用户创建的队列和系统队列。

用户创建的队列可能是以下任何一种队列:

“公共队列”在整个“消息队列”网络中复制,并且有可能由网络连接的所有站点访问。

“专用队列”不在整个网络中发布。相反,它们仅在所驻留的本地计算机上可用。专用队列只能由知道队列的完整路径名或标签的应用程序访问。

“管理队列”包含确认在给定“消息队列”网络中发送的消息回执的消息。指定希望 MessageQueue 组件使用的管理队列(如果有的话)。

“响应队列”包含目标应用程序接收到消息时返回给发送应用程序的响应消息。指定希望 MessageQueue 组件使用的响应队列(如果有的话)。

系统生成的队列一般分为以下几类:

“日记队列”可选地存储发送消息的副本和从队列中移除的消息副本。每个“消息队列”客户端上的单个日记队列存储从该计算机发送的消息副本。在服务器上为每个队列创建了一个单独的日记队列。此日记跟踪从该队列中移除的消息。

“死信队列”存储无法传递或已过期的消息的副本。如果过期或无法传递的消息是事务性消息,则被存储在一种特殊的死信队列中,称为“事务性死信队列”。死信存储在过期消息所在的计算机上。有关超时期限和过期消息的更多信息,请参见默认消息属性。

“报告队列”包含指示消息到达目标所经过的路由的消息,还可以包含测试消息。每台计算机上只能有一个报告队列。

“专用系统队列”是一系列存储系统执行消息处理操作所需的管理和通知消息的专用队列。

在应用程序中进行的大多数工作都涉及访问公共队列及其消息。但是,根据应用程序的日记记录、确认和其他特殊处理需要,在日常操作中很可能要使用几种不同的系统队列。

3. 同步和异步通信(Synchronous VS. Asynchronous Communication

队列通信天生就是异步的,因为将消息发送到队列和从队列中接收消息是在不同的进程中完成的。另外,可以异步执行接收操作,因为要接收消息的人可以对任何给定的队列调用 BeginReceive 方法,然后立即继续其他任务而不用等待答复。这与人们所了解的“同步通信”截然不同。

在同步通信中,请求的发送方在执行其他任务前,必须等待来自预定接收方的响应。发送方等待的时间完全取决于接收方处理请求和发送响应所用的时间。

4. 同消息队列交互(Interacting with Message Queues

消息处理和消息为基于服务器的应用程序组件之间的进程间通信提供了强大灵活的机制。同组件间的直接调用相比,它们具有若干优点,其中包括:

  • 稳定性组件失败对消息的影响程度远远小于组件间的直接调用,因为消息存储在队列中并一直留在那里,直到被适当地处理。消息处理同事务处理相似,因为消息处理是有保证的。
  • 消息优先级更紧急或更重要的消息可在相对不重要的消息之前接收,因此可以为关键的应用程序保证足够的响应时间。
  • 脱机能力发送消息时,它们可被发送到临时队列中并一直留在那里,直到被成功地传递。当因任何原因对所需队列的访问不可用时,用户可以继续执行操作。同时,其他操作可以继续进行,如同消息已经得到了处理一样,这是因为网络连接恢复时消息传递是有保证的。
  • 事务性消息处理将多个相关消息耦合为单个事务,确保消息按顺序传递、只传递一次并且可以从它们的目标队列中被成功地检索。如果出现任何错误,将取消整个事务。
  • 安全性 — MessageQueue 组件基于的消息队列技术使用 Windows 安全来保护访问控制,提供审核,并对组件发送和接收的消息进行加密和验证。

5. .Net环境下编写简单的Message Queue程序

1)先安装Message Queuing Services

通过Control Panel“Add/Remove Programs” – “Add/Remove Windows Components”步骤安装MSMQ

MSMQ可以安装为工作组模式或域模式。如果安装程序没有找到一台运行提供目录服务的消息队列的服务器,则只可以安装为工作组模式,此计算机上的“消息队列”只支持创建专用队列和创建与其他运行“消息队列”的计算机的直接连接。

2)配置MSMQ

打开Computer Management – Message Queuing,在Private Queues下创建MSMQDemo队列
 3)代码实例
 

 1  public partial class Form1 : Form
 2    {
 3        private MessageQueue queue = null;
 4        private string QueuePath = ".\\Private$\\MSMQDemo"//"FormatName:DIRECT=TCP:10.129.3.133\\Private$\\MSMQDemo";
 5        private IPAddress IPAddress;
 6        public Form1()
 7        {
 8            InitializeComponent();
 9        }

10        private void Form1_Load(object sender, EventArgs e)
11        {
12            GetIPAddress();
13            //检查队列,如果队列不存在,则建立 
14            EnsureQueueExists(QueuePath);
15           
16            queue = new System.Messaging.MessageQueue(QueuePath);
17        }

18        private void  GetIPAddress()
19        {
20            IPAddress = new System.Net.IPAddress(Dns.GetHostByName(Dns.GetHostName()).AddressList[0].Address);
21        }

22
23        private void SendButton_Click(object sender, EventArgs e)
24        {
25            //不允许发空消息
26            if (textBox1.Text == "")
27            {
28                MessageBox.Show("发送消息不能为空!");
29                return;
30            }

31            try
32            {
33                // Create message 
34                System.Messaging.Message message = new System.Messaging.Message();
35                message.Body = textBox1.Text.Trim();
36                message.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] typeof(string) });
37                // Put message into queue 
38                queue.Send(message);
39            }

40            catch (Exception ex1)
41            {
42                MessageBox.Show(ex1.ToString());
43            }

44        }

45
46        //判断队列是否存在
47        private static void EnsureQueueExists(string path)
48        {
49            if (!MessageQueue.Exists(path))
50            {
51                if (!MessageQueue.Exists(path))
52                {
53                    MessageQueue.Create(path);
54                    MessageQueue mqTemp = new MessageQueue(path);
55                    //Everone全部权限
56                    mqTemp.SetPermissions("Everyone", System.Messaging.MessageQueueAccessRights.FullControl); 
57                }

58            }

59        }
 
60        //获取消息队列中的消息
61        public System.Collections.ArrayList GetMessage()
62        {
63            System.Messaging.MessageQueue mq = new System.Messaging.MessageQueue(QueuePath, false);
64            mq.Formatter = new XmlMessageFormatter(new Type[] typeof(string) });
65            System.Messaging.Message[] arrM = mq.GetAllMessages();
66            mq.Close();
67            System.Collections.ArrayList al = new System.Collections.ArrayList();
68            if (arrM.Length == 0)
69                return null;
70            foreach (System.Messaging.Message m in arrM)
71            {
72                al.Add(m.Body.ToString());
73            }

74            return al;
75        }
 
76
77        private void ReciveButton_Click(object sender, EventArgs e)
78        {
79            //获取全部消息
80            System.Collections.ArrayList MsgList = GetMessage();
81            //if (MsgList != null)
82            //{
83            //    foreach (object str in MsgList)
84            //    {
85            //        textBox2.AppendText(str.ToString());
86            //    }
87            //}
88            //直接读取的方法
89            //System.Messaging.MessageQueue queue = new System.Messaging.MessageQueue(".\\Private$\\MSMQDemo");
90            // Receive message, 同步的Receive方法阻塞当前执行线程,直到一个message可以得到 
91            System.Messaging.Message message = queue.Receive();
92            message.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] typeof(string) });
93            textBox2.AppendText("IP:" + IPAddress.ToString() + "" + ": Message:" + message.Body.ToString()); 
94        }

95    }
运行结果:
原文地址:https://www.cnblogs.com/abcdwxc/p/1014922.html