WCF ChannelDispatcher 通道的管理

               

                   WCF  Channel管理

在ChannelDispatcher里面有一个关键的字段

ListenerHandler listenerHandler//用来侦听接收新的通道

CommunicationObjectManager<IChannel> channels;//通道存放的位置

CommunicationObjectManager<IChannel>的定义大概如下:

{

  // Fields

    private bool inputClosed;

    private Hashtable table;//存放了通道

    // Methods

    public CommunicationObjectManager(object mutex);

    public void Add(ItemType item);//添加一个新的通道

    public void CloseInput();

    public void DecrementActivityCount();

    public void IncrementActivityCount();

    private void OnItemClosed(object sender, EventArgs args);

    public void Remove(ItemType item);//移除一个新通道

    public ItemType[] ToArray();

}

在ChannelDispatcher.OnOpened()中有如下代码

{

IListenerBinderlistenerBinder=ListenerBinder.GetBinder(this.listener, this.messageVersion);

    this.listenerHandler = new ListenerHandler(listenerBinder, this, this.host, serviceThrottle, this.timeouts);//创建,同时注意限流参数也传入

    this.listenerHandler.Open();//打开

}

ListenerHandler 构造函数

internal ListenerHandler(IListenerBinder listenerBinder, ChannelDispatcher channelDispatcher, ServiceHostBase host, ServiceThrottle throttle, IDefaultCommunicationTimeouts timeouts)
{
//保留了listern,用来侦听新的连接
    this.listenerBinder = listenerBinder;
      if (this.listenerBinder == null)
    {
        throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenerBinder");
    }
//保留了channelDispatcher;
    this.channelDispatcher = channelDispatcher;
    if (this.channelDispatcher == null)
    {
        throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channelDispatcher");
    }
    this.host = host;
    if (this.host == null)
    {
        throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("host");
    }
    this.throttle = throttle;
    if (this.throttle == null)
    {
        throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("throttle");
    }
    this.timeouts = timeouts;
    this.endpoints = channelDispatcher.EndpointDispatcherTable;
    this.acceptor = new ErrorHandlingAcceptor(listenerBinder, channelDispatcher);
}

调用构造函数后调用了Open();在该函数里调用了ListenerHandler.Onopen()

protected override void OnOpened()
{
    base.OnOpened();
    this.channelDispatcher.Channels.IncrementActivityCount();
    if ((this.channelDispatcher.IsTransactedReceive && this.channelDispatcher.ReceiveContextEnabled) && (this.channelDispatcher.MaxTransactedBatchSize > 0))
    {
        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString("IncompatibleBehaviors")));
    }
    this.NewChannelPump();//关键一句,开始侦听新连接或者数据
}
internal void NewChannelPump()
{
    ActionItem.Schedule(initiateChannelPump, this);//这是什么,来看看initiateChannelPump  
}

原来在ListenerHandler里有个静态构造函数
static ListenerHandler()
{
    acceptCallback = Fx.ThunkCallback(new AsyncCallback(ListenerHandler.AcceptCallback));
    initiateChannelPump = new Action<object>(ListenerHandler.InitiateChannelPump);
}
private static void InitiateChannelPump(object state)
{
    ListenerHandler handler = state as ListenerHandler;
    if (handler.ChannelDispatcher.IsTransactedAccept)
    {
        handler.TransactedChannelPump();
    }
    else
    {
        handler.ChannelPump();
    }
}
private void ChannelPump()
{
    IChannelListener listener = this.listenerBinder.Listener;
Label_000C:
    if (this.acceptedNull || (listener.State == CommunicationState.Faulted))
    {
        this.DoneAccepting();
    }
//关键的一个函数
    else if (this.AcceptAndAcquireThrottle())
     {
        //关键的一句分配
        this.Dispatch();
        goto Label_000C;
    }
}
private bool AcceptAndAcquireThrottle()
{
    IAsyncResult result = this.acceptor.BeginTryAccept(TimeSpan.MaxValue, acceptCallback, this);
    return (result.CompletedSynchronously && this.HandleEndAccept(result));
}
在 BeginTryAccept(TimeSpan timeout, AsyncCallback callback, object state)
{
  try
    {
      return this.binder.BeginAccept(timeout, callback, state);
  }
}开始异步调用侦听接收一个新的请求
在回调后,会调用
this.HandleEndAccept(result));
private bool HandleEndAccept(IAsyncResult result)
{
 this.channel // 完成接受,并把新的channel赋值给本地变量
    this.channel = this.CompleteAccept(result);
    if (this.channel != null)
    {
            //看懂该句很重要,因为 this.idleManager管理着需要会话的通道
         this.idleManager = ServiceChannel.SessionIdleManager.CreateIfNeeded(this.channel.Binder, this.channelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
        return this.AcquireThrottle();//限流控制,限制session
    }
    this.DoneAccepting();
    return true;
}
 
 
internal static ServiceChannel.SessionIdleManager CreateIfNeeded(IChannelBinder binder, TimeSpan idle)
{
      //判断新的channel是否是回话模式
    if (binder.HasSession && (idle != TimeSpan.MaxValue))
    {
        return new ServiceChannel.SessionIdleManager(binder, idle);
    }
    return null;
}
 
 

在调用完this.AcceptAndAcquireThrottle())后会触发this.Dispatch();
private void Dispatch()

{
// this.channel保留了刚才创建的channel,
    ListenerChannel channel = this.channel
//idleManager是管理需要session双工通信的一个管理器,每个双工通信的channel都对应这一个idleManage,在idleManager有一个类似心包概念的定时器,检测到多长时间没有收到消息就认为客户端断连了,就会调用关闭
    ServiceChannel.SessionIdleManager idleManager = this.idleManager;
    this.channel = null;
    this.idleManager = null;
    try
    {
        if (channel != null)
        {
            ChannelHandler handler = new ChannelHandler(this.listenerBinder.MessageVersion, channel.Binder, this.throttle, this, channel.Throttle != null, this.wrappedTransaction, idleManager);
             //判断新的channel是否是一个session信道,如果不是管理到 CommunicationObjectManager<IChannel> channels
           if (!channel.Binder.HasSession)
              {
//添加
                this.channelDispatcher.Channels.Add(channel.Binder.Channel);
            }
             //如果是双工的
            if (channel.Binder is DuplexChannelBinder)
            {
                DuplexChannelBinder binder = channel.Binder as DuplexChannelBinder;
                binder.ChannelHandler = handler;
                binder.DefaultCloseTimeout = this.DefaultCloseTimeout;
                if (this.timeouts == null)
                {
                    binder.DefaultSendTimeout = ServiceDefaults.SendTimeout;
                }
                else
                {
                    binder.DefaultSendTimeout = this.timeouts.SendTimeout;
                }
            }
                    //开始尝试异步接收数据,注意,如果handler不是双工的就在CommunicationObjectManager中管理的,如果是双工的就没在
                    //它里面,而是与一个idleManager关联,idleManager里面有相关的定时器,超时就关闭,实行习惯里了,故不需要放在一个集合里面了
            ChannelHandler.Register(handler);
            channel = null;
            idleManager = null;
        }
    }
    catch (Exception exception)
    {
        if (Fx.IsFatal(exception))
        {
            throw;
        }
        this.HandleError(exception);
    }
    finally
    {
        if (channel != null)
        {
            channel.Binder.Channel.Abort();
            if ((this.throttle != null) && this.channelDispatcher.Session)
            {
                this.throttle.DeactivateChannel();
            }
            if (idleManager != null)
            {
                idleManager.CancelTimer();
            }
        }
    }
}
 
   System.ServiceModel.Dispatcher.ImmutableDispatchRuntime.TransferChannelFromPendingList(MessageRpc&)

private void TransferChannelFromPendingList(ref MessageRpc rpc)
{
    if (rpc.Channel.IsPending)
    {
        rpc.Channel.IsPending = false;
        ChannelDispatcher channelDispatcher = rpc.Channel.ChannelDispatcher;
        IInstanceContextProvider instanceContextProvider = this.instance.InstanceContextProvider;

      //如果提供了session就不会执行下面的
        if (!InstanceContextProviderBase.IsProviderSessionful(instanceContextProvider) && !InstanceContextProviderBase.IsProviderSingleton(instanceContextProvider))
        {
            IChannel proxy = rpc.Channel.Proxy as IChannel;
            if (!rpc.InstanceContext.IncomingChannels.Contains(proxy))
            {
                channelDispatcher.Channels.Add(proxy);//没看懂这里什么意思,明白的恳请告知
            }
        }
        channelDispatcher.PendingChannels.Remove(rpc.Channel.Binder.Channel);
    }
}

原文地址:https://www.cnblogs.com/qianyz/p/2653424.html