SuperSocket源码解析之消息处理

一 简述

  Tcp消息的处理本身是与Tcp消息传输过程独立的,是消息的两个不同阶段,从前面的会话生命周期我们已经知道消息的传输主要有SocketSession实现,而真正处理则交由AppSession实现,SuperSocket的层次划分也是非常清晰明了。

  SuperSocket消息处理主要流程:接收=》原始过滤=》协议解析=》命令路由并执行=》找不到命令则直接一分不动发给客户端

二 消息接收

1 开始接收

代码位置:AsyncSocketSession=》StartReceive

 1  private void StartReceive(SocketAsyncEventArgs e)
 2         {
 3             StartReceive(e, 0);
 4         }
 5 
 6         private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)
 7         {
 8             bool willRaiseEvent = false;
 9 
10             try
11             {
12                 if (offsetDelta < 0 || offsetDelta >= Config.ReceiveBufferSize)
13                     throw new ArgumentException(string.Format("Illigal offsetDelta: {0}", offsetDelta), "offsetDelta");
14 
15                 var predictOffset = SocketAsyncProxy.OrigOffset + offsetDelta;
16 
17                 if (e.Offset != predictOffset)
18                 {
19                     e.SetBuffer(predictOffset, Config.ReceiveBufferSize - offsetDelta);
20                 }
21 
22                 if (IsInClosingOrClosed)
23                     return;
24 
25                 OnReceiveStarted();
26                 willRaiseEvent = Client.ReceiveAsync(e);
27             }
28             catch (Exception exc)
29             {
30                 LogError(exc);
31 
32                 OnReceiveError(CloseReason.SocketError);
33                 return;
34             }
35 
36             if (!willRaiseEvent)
37             {
38                 ProcessReceive(e);
39             }
40         }
View Code

在接收数据前后触发ReceiveStarted,ReceiveEnd事件

2 接收有消息处理并转入处理

 1 public void ProcessReceive(SocketAsyncEventArgs e)
 2         {
 3             if (!ProcessCompleted(e))
 4             {
 5                 OnReceiveError(CloseReason.ClientClosing);
 6                 return;
 7             }
 8 
 9             OnReceiveEnded();
10 
11             int offsetDelta;
12 
13             try
14             {
15                 //交给app会话处理接收到的数据,而appsession又交给接收请求处理器处理
16                 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
17             }
18             catch (Exception exc)
19             {
20                 LogError("Protocol error", exc);
21                 this.Close(CloseReason.ProtocolError);
22                 return;
23             }
24 
25             //read the next block of data sent from the client
26             StartReceive(e, offsetDelta);
27         }
View Code

三 消息处理

1 入口:按照协议解析,每次只处理一个数据包,因此便有了如下的入口代码

 1 int IAppSession.ProcessRequest(byte[] readBuffer, int offset, int length, bool toBeCopied)
 2         {
 3             int rest, offsetDelta;
 4 
 5             while (true)
 6             {
 7                 var requestInfo = FilterRequest(readBuffer, offset, length, toBeCopied, out rest, out offsetDelta);
 8 
 9                 if (requestInfo != null)
10                 {
11                     try
12                     {
13                         AppServer.ExecuteCommand(this, requestInfo);
14                     }
15                     catch (Exception e)
16                     {
17                         HandleException(e);
18                     }
19                 }
20 
21                 if (rest <= 0)
22                 {
23                     return offsetDelta;
24                 }
25 
26                 //Still have data has not been processed
27                 offset = offset + length - rest;
28                 length = rest;
29             }
30         }
View Code

2 原始过滤

 此处以原始数据接收事件方式,预留给AppServer子类处理该原始数据包,意味着可以对原始数据包执行一次拦截处理,如果经过一些逻辑处理后不能满足,则将终止该数据包继续传播

3 协议解析

协议解析由AppSession的m_ReceiveFilter成员完成,该成员的实例化有2种实现方式

默认实例化 :默认命令行协议,该协议举例

下面是2行命令,行使用 作为结束标识,也就是回车换行,命令行内部使用空格分隔,第一个控制之前 如echo 为消息头也就是key,其后的字符串也使用空格分隔,作为参数,其整体=body

echo cc xxd    
cdc ds mmm

解析后为2个StringRequestInfo对象

 

 默认的协议解析工厂

 

 实例化默认协议解析对象

 通过AppServer构造函数传递解析工厂

 4 命令路由

上面我们已经解析到客户端发来的2条命令分别为echo 参数为cc xxd;cdc ds mmm;

其中key分别为echo和cdc,对于命令模式来说命令本身使用name字段进行标识,如果我们的key与name匹配那么我们即可路由到一个已有的命令,进而执行该命令,来看代码

 对于能够路由到的命令我们执行命令

 对于路由失败来说SuperSocket又是怎么做的呢?

找不到命令来处理该消息,将该命令名字发送会客户端,意思说明服务器没有实现该命令,那么命令从何而来?

5  命令

这还的从CommandLoader说起,CommandLoader又追溯到AppServer的构建过程

 

在没有显示配置CommandLoader的情况下默认为ReflectCommandLoader

 ReflectCommandLoader创建命令

  ReflectCommandLoader将扫描应用程序根目录下所有程序,并将实现了命令接口的实例通过反射创建出来

 1 public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
 2         {
 3             commands = null;
 4 
 5             var commandAssemblies = new List<Assembly>();
 6 
 7             if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
 8                 commandAssemblies.Add(m_AppServer.GetType().Assembly);
 9 
10             string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
11 
12             if (!string.IsNullOrEmpty(commandAssembly))
13             {
14                 OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
15                 return false;
16             }
17 
18 
19             if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
20             {
21                 try
22                 {
23                     var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());
24 
25                     if (definedAssemblies.Any())
26                         commandAssemblies.AddRange(definedAssemblies);
27                 }
28                 catch (Exception e)
29                 {
30                     OnError(new Exception("Failed to load defined command assemblies!", e));
31                     return false;
32                 }
33             }
34 
35             if (!commandAssemblies.Any())
36             {
37                 commandAssemblies.Add(Assembly.GetEntryAssembly());
38             }
39 
40             var outputCommands = new List<TCommand>();
41 
42             foreach (var assembly in commandAssemblies)
43             {
44                 try
45                 {
46                     outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
47                 }
48                 catch (Exception exc)
49                 {
50                     OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
51                     return false;
52                 }
53             }
54 
55             commands = outputCommands;
56 
57             return true;
58         }
View Code

因此默认的我们只需要定义一些实现命令接口ICommand<TAppSession, TRequestInfo>的命令出来,

6 自定义命令

 直接继承CommandBase抽象类即可

 到此SuperSocket对消息的处理流程差不多就是这样了,SuperSocket的框架的使用 我们只需要自定义自己的AppServer,以及AppServer配套的AppSession,ReciverFilter,以及命令等即可,这些在官方提供的例子中已经很清晰

原文地址:https://www.cnblogs.com/rjjs/p/5623932.html