使用WCF双工通讯实现发布订阅

发布——订阅

通俗一点的解释,就是推送。由服务端发布消息,所有订阅了这条消息的客户端,都会收到服务端广播的这条消息。

WCF的双工机制的运行方式如下

客户端发起请求->服务端收到请求并对客户端发起回调->客户端回复回调->服务端回复客户端请求

发布订阅其实是双工的变种,它的原理大概是这样的

客户端发起请求->服务端调用回调->服务端调用回调.....

基本原理就这些,我们来用代码说话吧,现在瞅瞅,代码比说话都亲切....

首先我们定义一个基础的WCF服务契约

IPushService
/// <summary>
    /// 推送服务契约
    /// 
    /// Tips:
    /// 契约提供两个服务,一个是订阅,一个是退订。
    /// 服务端会向订阅的客户端发布消息
    /// </summary>
    [ServiceContract(CallbackContract = typeof(IPushCallback))]
    public interface IPushService
    {
        /// <summary>
        /// 订阅服务
        /// </summary>
        [OperationContract(IsOneWay = true)]
        void Regist();

        /// <summary>
        /// 退订服务
        /// </summary>
        [OperationContract(IsOneWay = true)]
        void UnRegist();
    }

在这里定义了两个行为,订阅和退订。

同时还有一个回调契约如下

IPushCallback
 /// <summary>
    /// 回调接口 IOC思想的体现
    /// </summary>
    public interface IPushCallback
    {
        [OperationContract(IsOneWay = true)]
        void NotifyMessage(string message);
    }

在这里,我们对每一个行为都标记为OneWay,表示客户端在调用完服务之后不需要等待服务的回复,同理回调时服务端只管广播,同样不需要理会客户端

广播的过程,就是服务端调用每一个连接的回调通道进行操作的过程,因此这里需要注意2点。

1、有一个集合,用于维护回调通道。

2、客户端与服务端建立的通道不能在调用之后就被回收,而是需要保持通道的正常状态

在这里,我们建立一个ChannelManager类型,其中维护了一个回调通道的列表,并且提供了服务端操作的几个行为。这个类型以单例模式实现,保证唯一的通道列表。

ChannelManager
  /// <summary>
    /// 通道管理
    /// </summary>
    public class ChannelManager
    {
        #region Fields

        /// <summary>
        /// 回调通道列表
        /// </summary>
        private List<IPushCallback> callbackChannelList = new List<IPushCallback>();

        /// <summary>
        /// 用于互斥锁的对象
        /// </summary>
        public static readonly object SyncObj = new object();

        #endregion

        #region Single

        private static readonly Lazy<ChannelManager> instance = new Lazy<ChannelManager>(() => new ChannelManager());

        public static ChannelManager Instance
        {
            get { return instance.Value; }
        }

        protected ChannelManager() { }

        #endregion

        #region Methods
        /// <summary>
        /// 将回调通道加入到通道列表中进行管理
        /// </summary>
        /// <param name="callbackChannel"></param>
        public void Add(IPushCallback callbackChannel)
        {
            if (callbackChannelList.Contains(callbackChannel))
            {
                Console.WriteLine("已存在重复通道");
            }
            else
            {
                lock (SyncObj)
                {
                    callbackChannelList.Add(callbackChannel);
                    Console.WriteLine("添加了新的通道");
                }
            }
        }

        /// <summary>
        /// 从通道列表中移除对一个通道的管理
        /// </summary>
        /// <param name="callbackChannel"></param>
        public void Remove(IPushCallback callbackChannel)
        {
            if (!callbackChannelList.Contains(callbackChannel))
            {
                Console.WriteLine("不存在待移除通道");
            }
            else
            {
                lock (SyncObj)
                {
                    callbackChannelList.Remove(callbackChannel);
                    Console.WriteLine("移除了一个通道");
                }
            }
        }

        /// <summary>
        /// 广播消息
        /// </summary>
        /// <param name="message"></param>
        public void NotifyMessage(string message)
        {
            if (callbackChannelList.Count > 0)
            {
                //避免对callbackChannelList的更改对广播造成的影响
                IPushCallback[] callbackChannels = callbackChannelList.ToArray();

                foreach (var channel in callbackChannels)
                {
                    try
                    {
                        //广播消息
                        channel.NotifyMessage(message);

                    }
                    catch
                    {
                        //对异常的通道进行处理
                        callbackChannelList.Remove(channel);
                    }
                }
            }
        }
        #endregion
    }

下面就是我们服务的实现了,服务的实现很简单,仅仅是捕获到客户端的回调通道,对集合进行操作。这里需要注意的是ServiceBehavior标记的InstanceContextMode属性的设置。我们需要为每一个单独的通道创建新的实例,但是在调用玩服务后,不对通道立刻进行回收,因此我们需要设置为InstanceContextModel.Single。

PushService
 /// <summary>
    /// 服务的实现
    /// Tips:
    /// 实现发布订阅,要注意:每个信道在调用后不要回收,否则会在回调时报错
    /// </summary>
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class PushService : IPushService
    {
        public void Regist()
        {
            IPushCallback callbackChannel = OperationContext.Current.GetCallbackChannel<IPushCallback>();
            //添加到管理列表中
            ChannelManager.Instance.Add(callbackChannel);

        }

        public void UnRegist()
        {
            IPushCallback callbackChannel = OperationContext.Current.GetCallbackChannel<IPushCallback>();
            //从管理列表中移除
            ChannelManager.Instance.Remove(callbackChannel);
        }
    }

这样我们的服务基本就完成了,置于客户端的调用,需要创建一个双工通道对象,并且将实现了回调契约的类型,传给InstanceContext属性。

这篇文章很简单,没有涉及到对产生异常的通道的处理,也没有考虑到Silverlight调用时遇到的跨域问题,同时不支持HttpGet。

文章的目的只有一个,就是尽可能简洁的体现发布订阅服务的原理。

有兴趣的读者,可以对服务进行扩展,对服务在广播时可能产生的各种异常进行处理。

由于文章的例子是基于TCP/IP的通迅方式,使用的是netTcpBinding的全双工模式,因此使用HttpGet的时候需要进行一些额外的设置。

并且由于是自托管服务,寄宿在控制台应用程序中,因此跨域文件的提供方式也不太相同。

下面提供完整的代码:

SimplePush.rar

另外提供一个Silverlight调用服务的例子,涉及HttpGet及跨域文件的提供

Chat.rar

这两个例子都是基于Vs2010 .Net4.0开发,Silverlight版本为5.0

原文地址:https://www.cnblogs.com/ShadowLoki/p/2663931.html