C#SocketAsyncEventArgs实现高效能多并发TCPSocket通信 (服务器实现)

http://freshflower.iteye.com/blog/2285272

  想着当初到处找不到相关资料来实现.net的Socket通信的痛苦与心酸, 于是将自己写的代码公布给大家, 让大家少走点弯路, 以供参考. 若是觉得文中的思路有哪里不正确的地方, 欢迎大家指正, 共同进步. 

    说到Socket通信, 必须要有个服务端, 打开一个端口进行监听(废话!) 可能大家都会把socket.Accept方法放在一个while(true)的循环里, 当然也没有错, 但个人认为这个不科学, 极大可能地占用服务资源. 赞成的请举手. 所以我想从另外一个方面解决这个问题. 之后是在MSDN找到SocketAsyncEventArgs的一个实例, 然后拿来改改, 有需要的同学可以看看MSDN的官方实例.https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx

需要了解客户端写法的, 请参考: 客户端实现http://freshflower.iteye.com/blog/2285286

     不多说, 接下来贴代码, 这个实例中需要用到几个类:

     1. BufferManager类, 管理传输流的大小  原封不动地拷贝过来, 

     2. SocketEventPool类: 管理SocketAsyncEventArgs的一个应用池. 有效地重复使用.

     3. AsyncUserToken类: 这个可以根据自己的实际情况来定义.主要作用就是存储客户端的信息.

     4. SocketManager类: 核心,实现Socket监听,收发信息等操作.

   BufferManager类

C#代码  收藏代码
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Net.Sockets;  
  5. using System.Text;  
  6.   
  7. namespace Plates.Service  
  8. {  
  9.     class BufferManager  
  10.     {  
  11.         int m_numBytes;                 // the total number of bytes controlled by the buffer pool  
  12.         byte[] m_buffer;                // the underlying byte array maintained by the Buffer Manager  
  13.         Stack<int> m_freeIndexPool;     //   
  14.         int m_currentIndex;  
  15.         int m_bufferSize;  
  16.   
  17.         public BufferManager(int totalBytes, int bufferSize)  
  18.         {  
  19.             m_numBytes = totalBytes;  
  20.             m_currentIndex = 0;  
  21.             m_bufferSize = bufferSize;  
  22.             m_freeIndexPool = new Stack<int>();  
  23.         }  
  24.   
  25.         // Allocates buffer space used by the buffer pool  
  26.         public void InitBuffer()  
  27.         {  
  28.             // create one big large buffer and divide that   
  29.             // out to each SocketAsyncEventArg object  
  30.             m_buffer = new byte[m_numBytes];  
  31.         }  
  32.   
  33.         // Assigns a buffer from the buffer pool to the   
  34.         // specified SocketAsyncEventArgs object  
  35.         //  
  36.         // <returns>true if the buffer was successfully set, else false</returns>  
  37.         public bool SetBuffer(SocketAsyncEventArgs args)  
  38.         {  
  39.   
  40.             if (m_freeIndexPool.Count > 0)  
  41.             {  
  42.                 args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);  
  43.             }  
  44.             else  
  45.             {  
  46.                 if ((m_numBytes - m_bufferSize) < m_currentIndex)  
  47.                 {  
  48.                     return false;  
  49.                 }  
  50.                 args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);  
  51.                 m_currentIndex += m_bufferSize;  
  52.             }  
  53.             return true;  
  54.         }  
  55.   
  56.         // Removes the buffer from a SocketAsyncEventArg object.    
  57.         // This frees the buffer back to the buffer pool  
  58.         public void FreeBuffer(SocketAsyncEventArgs args)  
  59.         {  
  60.             m_freeIndexPool.Push(args.Offset);  
  61.             args.SetBuffer(null, 0, 0);  
  62.         }  
  63.     }  
  64. }  

   SocketEventPool类:

   

C#代码  收藏代码
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Net.Sockets;  
  5. using System.Text;  
  6.   
  7. namespace Plates.Service  
  8. {  
  9.     class SocketEventPool  
  10.     {  
  11.         Stack<SocketAsyncEventArgs> m_pool;  
  12.   
  13.   
  14.         public SocketEventPool(int capacity)  
  15.         {  
  16.             m_pool = new Stack<SocketAsyncEventArgs>(capacity);  
  17.         }  
  18.   
  19.         public void Push(SocketAsyncEventArgs item)  
  20.         {  
  21.             if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }  
  22.             lock (m_pool)  
  23.             {  
  24.                 m_pool.Push(item);  
  25.             }  
  26.         }  
  27.   
  28.         // Removes a SocketAsyncEventArgs instance from the pool  
  29.         // and returns the object removed from the pool  
  30.         public SocketAsyncEventArgs Pop()  
  31.         {  
  32.             lock (m_pool)  
  33.             {  
  34.                 return m_pool.Pop();  
  35.             }  
  36.         }  
  37.   
  38.         // The number of SocketAsyncEventArgs instances in the pool  
  39.         public int Count  
  40.         {  
  41.             get { return m_pool.Count; }  
  42.         }  
  43.   
  44.         public void Clear()  
  45.         {  
  46.             m_pool.Clear();  
  47.         }  
  48.     }  
  49. }  

   AsyncUserToken类

C#代码  收藏代码
  1. using System;  
  2. using System.Collections;  
  3. using System.Collections.Generic;  
  4. using System.Linq;  
  5. using System.Net;  
  6. using System.Net.Sockets;  
  7. using System.Text;  
  8.   
  9. namespace Plates.Service  
  10. {  
  11.     class AsyncUserToken  
  12.     {  
  13.         /// <summary>  
  14.         /// 客户端IP地址  
  15.         /// </summary>  
  16.         public IPAddress IPAddress { get; set; }  
  17.   
  18.         /// <summary>  
  19.         /// 远程地址  
  20.         /// </summary>  
  21.         public EndPoint Remote { get; set; }  
  22.   
  23.         /// <summary>  
  24.         /// 通信SOKET  
  25.         /// </summary>  
  26.         public Socket Socket { get; set; }  
  27.   
  28.         /// <summary>  
  29.         /// 连接时间  
  30.         /// </summary>  
  31.         public DateTime ConnectTime { get; set; }  
  32.   
  33.         /// <summary>  
  34.         /// 所属用户信息  
  35.         /// </summary>  
  36.         public UserInfoModel UserInfo { get; set; }  
  37.   
  38.   
  39.         /// <summary>  
  40.         /// 数据缓存区  
  41.         /// </summary>  
  42.         public List<byte> Buffer { get; set; }  
  43.   
  44.   
  45.         public AsyncUserToken()  
  46.         {  
  47.             this.Buffer = new List<byte>();  
  48.         }  
  49.     }  
  50. }  

  SocketManager类

C#代码  收藏代码
  1. using Plates.Common;  
  2. using System;  
  3. using System.Collections;  
  4. using System.Collections.Generic;  
  5. using System.Linq;  
  6. using System.Net;  
  7. using System.Net.Sockets;  
  8. using System.Text;  
  9. using System.Threading;  
  10.   
  11. namespace Plates.Service  
  12. {  
  13.     class SocketManager  
  14.     {  
  15.   
  16.         private int m_maxConnectNum;    //最大连接数  
  17.         private int m_revBufferSize;    //最大接收字节数  
  18.         BufferManager m_bufferManager;  
  19.         const int opsToAlloc = 2;  
  20.         Socket listenSocket;            //监听Socket  
  21.         SocketEventPool m_pool;  
  22.         int m_clientCount;              //连接的客户端数量  
  23.         Semaphore m_maxNumberAcceptedClients;  
  24.   
  25.         List<AsyncUserToken> m_clients; //客户端列表  
  26.  
  27.         #region 定义委托  
  28.   
  29.         /// <summary>  
  30.         /// 客户端连接数量变化时触发  
  31.         /// </summary>  
  32.         /// <param name="num">当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1)</param>  
  33.         /// <param name="token">增加用户的信息</param>  
  34.         public delegate void OnClientNumberChange(int num, AsyncUserToken token);  
  35.   
  36.         /// <summary>  
  37.         /// 接收到客户端的数据  
  38.         /// </summary>  
  39.         /// <param name="token">客户端</param>  
  40.         /// <param name="buff">客户端数据</param>  
  41.         public delegate void OnReceiveData(AsyncUserToken token, byte[] buff);  
  42.  
  43.         #endregion  
  44.  
  45.         #region 定义事件  
  46.         /// <summary>  
  47.         /// 客户端连接数量变化事件  
  48.         /// </summary>  
  49.         public event OnClientNumberChange ClientNumberChange;  
  50.   
  51.         /// <summary>  
  52.         /// 接收到客户端的数据事件  
  53.         /// </summary>  
  54.         public event OnReceiveData ReceiveClientData;  
  55.  
  56.  
  57.         #endregion  
  58.  
  59.         #region 定义属性  
  60.   
  61.         /// <summary>  
  62.         /// 获取客户端列表  
  63.         /// </summary>  
  64.         public List<AsyncUserToken> ClientList { get { return m_clients; } }  
  65.  
  66.         #endregion  
  67.   
  68.         /// <summary>  
  69.         /// 构造函数  
  70.         /// </summary>  
  71.         /// <param name="numConnections">最大连接数</param>  
  72.         /// <param name="receiveBufferSize">缓存区大小</param>  
  73.         public SocketManager(int numConnections, int receiveBufferSize)  
  74.         {  
  75.             m_clientCount = 0;  
  76.             m_maxConnectNum = numConnections;  
  77.             m_revBufferSize = receiveBufferSize;  
  78.             // allocate buffers such that the maximum number of sockets can have one outstanding read and   
  79.             //write posted to the socket simultaneously    
  80.             m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);  
  81.   
  82.             m_pool = new SocketEventPool(numConnections);  
  83.             m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);  
  84.         }  
  85.   
  86.         /// <summary>  
  87.         /// 初始化  
  88.         /// </summary>  
  89.         public void Init()  
  90.         {  
  91.             // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   
  92.             // against memory fragmentation  
  93.             m_bufferManager.InitBuffer();  
  94.             m_clients = new List<AsyncUserToken>();  
  95.             // preallocate pool of SocketAsyncEventArgs objects  
  96.             SocketAsyncEventArgs readWriteEventArg;  
  97.   
  98.             for (int i = 0; i < m_maxConnectNum; i++)  
  99.             {  
  100.                 readWriteEventArg = new SocketAsyncEventArgs();  
  101.                 readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);  
  102.                 readWriteEventArg.UserToken = new AsyncUserToken();  
  103.   
  104.                 // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  
  105.                 m_bufferManager.SetBuffer(readWriteEventArg);  
  106.                 // add SocketAsyncEventArg to the pool  
  107.                 m_pool.Push(readWriteEventArg);  
  108.             }  
  109.         }  
  110.   
  111.   
  112.         /// <summary>  
  113.         /// 启动服务  
  114.         /// </summary>  
  115.         /// <param name="localEndPoint"></param>  
  116.         public bool Start(IPEndPoint localEndPoint)  
  117.         {  
  118.             try  
  119.             {  
  120.                 m_clients.Clear();  
  121.                 listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
  122.                 listenSocket.Bind(localEndPoint);  
  123.                 // start the server with a listen backlog of 100 connections  
  124.                 listenSocket.Listen(m_maxConnectNum);  
  125.                 // post accepts on the listening socket  
  126.                 StartAccept(null);  
  127.                 return true;  
  128.             }  
  129.             catch (Exception)  
  130.             {  
  131.                 return false;  
  132.             }  
  133.         }  
  134.   
  135.         /// <summary>  
  136.         /// 停止服务  
  137.         /// </summary>  
  138.         public void Stop()  
  139.         {  
  140.             foreach (AsyncUserToken token in m_clients)  
  141.             {  
  142.                 try  
  143.                 {  
  144.                     token.Socket.Shutdown(SocketShutdown.Both);  
  145.                 }  
  146.                 catch (Exception) { }  
  147.             }  
  148.             try  
  149.             {  
  150.                 listenSocket.Shutdown(SocketShutdown.Both);  
  151.             }  
  152.             catch (Exception) { }  
  153.   
  154.             listenSocket.Close();  
  155.             int c_count = m_clients.Count;  
  156.             lock (m_clients) { m_clients.Clear(); }  
  157.   
  158.             if (ClientNumberChange != null)  
  159.                 ClientNumberChange(-c_count, null);  
  160.         }  
  161.   
  162.   
  163.         public void CloseClient(AsyncUserToken token)  
  164.         {  
  165.             try  
  166.             {  
  167.                 token.Socket.Shutdown(SocketShutdown.Both);  
  168.             }  
  169.             catch (Exception) { }  
  170.         }  
  171.   
  172.   
  173.         // Begins an operation to accept a connection request from the client   
  174.         //  
  175.         // <param name="acceptEventArg">The context object to use when issuing   
  176.         // the accept operation on the server's listening socket</param>  
  177.         public void StartAccept(SocketAsyncEventArgs acceptEventArg)  
  178.         {  
  179.             if (acceptEventArg == null)  
  180.             {  
  181.                 acceptEventArg = new SocketAsyncEventArgs();  
  182.                 acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);  
  183.             }  
  184.             else  
  185.             {  
  186.                 // socket must be cleared since the context object is being reused  
  187.                 acceptEventArg.AcceptSocket = null;  
  188.             }  
  189.   
  190.             m_maxNumberAcceptedClients.WaitOne();  
  191.             if (!listenSocket.AcceptAsync(acceptEventArg))  
  192.             {  
  193.                 ProcessAccept(acceptEventArg);  
  194.             }  
  195.         }  
  196.   
  197.         // This method is the callback method associated with Socket.AcceptAsync   
  198.         // operations and is invoked when an accept operation is complete  
  199.         //  
  200.         void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)  
  201.         {  
  202.             ProcessAccept(e);  
  203.         }  
  204.   
  205.         private void ProcessAccept(SocketAsyncEventArgs e)  
  206.         {  
  207.             try  
  208.             {  
  209.                 Interlocked.Increment(ref m_clientCount);  
  210.                 // Get the socket for the accepted client connection and put it into the   
  211.                 //ReadEventArg object user token  
  212.                 SocketAsyncEventArgs readEventArgs = m_pool.Pop();  
  213.                 AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;  
  214.                 userToken.Socket = e.AcceptSocket;  
  215.                 userToken.ConnectTime = DateTime.Now;  
  216.                 userToken.Remote = e.AcceptSocket.RemoteEndPoint;  
  217.                 userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;  
  218.   
  219.                 lock (m_clients) { m_clients.Add(userToken); }  
  220.   
  221.                 if (ClientNumberChange != null)  
  222.                     ClientNumberChange(1, userToken);  
  223.                 if (!e.AcceptSocket.ReceiveAsync(readEventArgs))  
  224.                 {  
  225.                     ProcessReceive(readEventArgs);  
  226.                 }  
  227.             }  
  228.             catch (Exception me)  
  229.             {  
  230.                 RuncomLib.Log.LogUtils.Info(me.Message + " " + me.StackTrace);  
  231.             }  
  232.   
  233.             // Accept the next connection request  
  234.             if (e.SocketError == SocketError.OperationAborted) return;  
  235.             StartAccept(e);  
  236.         }  
  237.   
  238.   
  239.         void IO_Completed(object sender, SocketAsyncEventArgs e)  
  240.         {  
  241.             // determine which type of operation just completed and call the associated handler  
  242.             switch (e.LastOperation)  
  243.             {  
  244.                 case SocketAsyncOperation.Receive:  
  245.                     ProcessReceive(e);  
  246.                     break;  
  247.                 case SocketAsyncOperation.Send:  
  248.                     ProcessSend(e);  
  249.                     break;  
  250.                 default:  
  251.                     throw new ArgumentException("The last operation completed on the socket was not a receive or send");  
  252.             }  
  253.   
  254.         }  
  255.   
  256.   
  257.         // This method is invoked when an asynchronous receive operation completes.   
  258.         // If the remote host closed the connection, then the socket is closed.    
  259.         // If data was received then the data is echoed back to the client.  
  260.         //  
  261.         private void ProcessReceive(SocketAsyncEventArgs e)  
  262.         {  
  263.             try  
  264.             {  
  265.                 // check if the remote host closed the connection  
  266.                 AsyncUserToken token = (AsyncUserToken)e.UserToken;  
  267.                 if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)  
  268.                 {  
  269.                     //读取数据  
  270.                     byte[] data = new byte[e.BytesTransferred];  
  271.                     Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);  
  272.                     lock (token.Buffer)  
  273.                     {  
  274.                         token.Buffer.AddRange(data);  
  275.                     }  
  276.                     //注意:你一定会问,这里为什么要用do-while循环?   
  277.                     //如果当客户发送大数据流的时候,e.BytesTransferred的大小就会比客户端发送过来的要小,  
  278.                     //需要分多次接收.所以收到包的时候,先判断包头的大小.够一个完整的包再处理.  
  279.                     //如果客户短时间内发送多个小数据包时, 服务器可能会一次性把他们全收了.  
  280.                     //这样如果没有一个循环来控制,那么只会处理第一个包,  
  281.                     //剩下的包全部留在token.Buffer中了,只有等下一个数据包过来后,才会放出一个来.  
  282.                     do  
  283.                     {  
  284.                         //判断包的长度  
  285.                         byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();  
  286.                         int packageLen = BitConverter.ToInt32(lenBytes, 0);  
  287.                         if (packageLen > token.Buffer.Count - 4)  
  288.                         {   //长度不够时,退出循环,让程序继续接收  
  289.                             break;  
  290.                         }  
  291.   
  292.                         //包够长时,则提取出来,交给后面的程序去处理  
  293.                         byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();  
  294.                         //从数据池中移除这组数据  
  295.                         lock (token.Buffer)  
  296.                         {  
  297.                             token.Buffer.RemoveRange(0, packageLen + 4);  
  298.                         }  
  299.                         //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度.  
  300.                         if(ReceiveClientData != null)  
  301.                             ReceiveClientData(token, rev);  
  302.                         //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.  
  303.                         //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.  
  304.                     } while (token.Buffer.Count > 4);  
  305.   
  306.                     //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明  
  307.                     if (!token.Socket.ReceiveAsync(e))  
  308.                         this.ProcessReceive(e);  
  309.                 }  
  310.                 else  
  311.                 {  
  312.                     CloseClientSocket(e);  
  313.                 }  
  314.             }  
  315.             catch (Exception xe)  
  316.             {  
  317.                 RuncomLib.Log.LogUtils.Info(xe.Message + " " + xe.StackTrace);  
  318.             }  
  319.         }  
  320.   
  321.         // This method is invoked when an asynchronous send operation completes.    
  322.         // The method issues another receive on the socket to read any additional   
  323.         // data sent from the client  
  324.         //  
  325.         // <param name="e"></param>  
  326.         private void ProcessSend(SocketAsyncEventArgs e)  
  327.         {  
  328.             if (e.SocketError == SocketError.Success)  
  329.             {  
  330.                 // done echoing data back to the client  
  331.                 AsyncUserToken token = (AsyncUserToken)e.UserToken;  
  332.                 // read the next block of data send from the client  
  333.                 bool willRaiseEvent = token.Socket.ReceiveAsync(e);  
  334.                 if (!willRaiseEvent)  
  335.                 {  
  336.                     ProcessReceive(e);  
  337.                 }  
  338.             }  
  339.             else  
  340.             {  
  341.                 CloseClientSocket(e);  
  342.             }  
  343.         }  
  344.   
  345.         //关闭客户端  
  346.         private void CloseClientSocket(SocketAsyncEventArgs e)  
  347.         {  
  348.             AsyncUserToken token = e.UserToken as AsyncUserToken;  
  349.   
  350.             lock (m_clients) { m_clients.Remove(token); }  
  351.             //如果有事件,则调用事件,发送客户端数量变化通知  
  352.             if (ClientNumberChange != null)  
  353.                 ClientNumberChange(-1, token);  
  354.             // close the socket associated with the client  
  355.             try  
  356.             {  
  357.                 token.Socket.Shutdown(SocketShutdown.Send);  
  358.             }  
  359.             catch (Exception) { }  
  360.             token.Socket.Close();  
  361.             // decrement the counter keeping track of the total number of clients connected to the server  
  362.             Interlocked.Decrement(ref m_clientCount);  
  363.             m_maxNumberAcceptedClients.Release();  
  364.             // Free the SocketAsyncEventArg so they can be reused by another client  
  365.             e.UserToken = new AsyncUserToken();  
  366.             m_pool.Push(e);  
  367.         }  
  368.   
  369.   
  370.   
  371.         /// <summary>  
  372.         /// 对数据进行打包,然后再发送  
  373.         /// </summary>  
  374.         /// <param name="token"></param>  
  375.         /// <param name="message"></param>  
  376.         /// <returns></returns>  
  377.         public void SendMessage(AsyncUserToken token, byte[] message)  
  378.         {  
  379.             if (token == null || token.Socket == null || !token.Socket.Connected)  
  380.                 return;  
  381.             try  
  382.             {  
  383.                 //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)  
  384.                 byte[] buff = new byte[message.Length + 4];  
  385.                 byte[] len = BitConverter.GetBytes(message.Length);  
  386.                 Array.Copy(len, buff, 4);  
  387.                 Array.Copy(message, 0, buff, 4, message.Length);  
  388.                 //token.Socket.Send(buff);  //这句也可以发送, 可根据自己的需要来选择  
  389.                 //新建异步发送对象, 发送消息  
  390.                 SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();  
  391.                 sendArg.UserToken = token;  
  392.                 sendArg.SetBuffer(buff, 0, buff.Length);  //将数据放置进去.  
  393.                 token.Socket.SendAsync(sendArg);  
  394.             }  
  395.             catch (Exception e){  
  396.                 RuncomLib.Log.LogUtils.Info("SendMessage - Error:" + e.Message);  
  397.             }  
  398.         }  
  399.     }  
  400. }  

    调用方法:

C#代码  收藏代码
  1. SocketManager m_socket = new SocketManager(200, 1024);  
  2. m_socket.Init();  
  3. m_socket.Start(new IPEndPoint(IPAddress.Any, 13909));  

    好了,大功告成, 当初自己在写这些代码的时候, 一个地方就卡上很久, 烧香拜菩萨都没有用, 只能凭网上零星的一点代码给点提示. 现在算是做个总结吧. 让大家一看就明白, Socket通信就是这样, 可简单可复杂.

上面说的是服务器,那客户端的请参考

C#如何利用SocketAsyncEventArgs实现高效能TCPSocket通信 (客户端实现)

原文地址:https://www.cnblogs.com/zxtceq/p/7764986.html