IOCP用法

IOCP:即是IO完成端口。I/O完成端口(s)是一种机制,通过这个机制,应用程序在启动时会首先创建一个线程池,然后该应用程序使用线程池处理异步I/O请求。这些线程被创建的唯一目的就是用于处理I/O请求。对于处理大量并发异步I/O请求的应用程序来说,相比于在I/O请求发生时创建线程来说,使用完成端口(s)它就可以做的更快且更有效率。IOCP是windows平台最高效的通信模型,可以在一个应用程序同时管理为数众多的套接字,以达到最佳的系统性能!从本质上说,完成端口模型要求我们创建一个Win32完成端口对象,通过指定数量的线程,对重叠I/O请求进行管理,以便为已经完成的重叠I/O请求提供服务。


一、完成端口的相关函数或结构:

1、创建完成端口或绑定句柄

HANDLE CreateIoCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
ULONG_PTR CompletionKey,
DWORD NumberOfConcurrentThreads
);

2、获取排队完成状态

BOOL GetQueuedCompletionStatus(
    HANDLE CompletionPort,
    LPDWORD lpNumberOfBytes,
    PULONG_PTR lpCompletionKey,
    LPOVERLAPPED* lpOverlapped,
    DWORD dwMilliseconds
);

3、用OVERLAPPED Overlapped;开头封装一个自定义OVERLAPPED结构

typedef struct _PER_IO_OPERATION_DATA
{
 //重叠结构
 OVERLAPPED OverLapped;
 //数据缓冲区
 WSABUF RecvDataBuf;
 WSABUF SendDataBuf;
 char RecvBuf[BUFFER_SIZE];
 char SendBuf[BUFFER_SIZE];
 //操作类型表示
 bool OperType;
}PER_IO_OPERATION_DATA,*PPER_IO_OPERATION_DATA;

此结构用来投递IO工作者
PER_IO_OPERATION_DATA PerIoWorker;
WSARecv(socket, ..., (OVERLAPPED *)&PerIoWorker;);

4、向工作者线程都发送一个完成数据包

BOOL PostQueuedCompletionStatus(
    HANDLE CompletionPort,
    DWORD dwNumberOfBytesTransferred,
    ULONG_PTR dwCompletionKey,
    LPOVERLAPPED lpOverlapped
);

二、IOCP实例
包括两个线程工作函数:
1、工作者函数封装GetQueuedCompletionStatus,实现各种状态处理
2、监听工作线程函数,接收链接请求,创建完成端口CreateIoCompletionPort、投递工作者WSARecv

一个发送消息函数,创建PPER_IO_OPERATION_DATA结构工作对象,直接发送WSASend

一个初始化过程:
1、用WSASocket启动SOCKET,绑定并监听
2、启动工作者线程,线程数一般为CPU处理器数量*2+2
3、启动侦听线程

退出IOCP:
1、退出工作线程
2、退出侦听线程
3、关闭网络的侦听
4、切断当前所有连接

具体:
1、工作者线程函数

DWORD WINAPI WorkerProc(LPVOID lParam)
{
 CIocpModeSvr* pSvr=(CIocpModeSvr*)lParam;
 HANDLE CompletionPort=pSvr->CompletionPort;
 DWORD ByteTransferred;
 LPPER_HANDLE_DATA PerHandleData;
 PPER_IO_OPERATION_DATA PerIoData;
 DWORD RecvByte;
 while(true)
 {
  bool bSuccess=GetQueuedCompletionStatus(CompletionPort,
            &ByteTransferred,
            (LPDWORD)&PerHandleData,
            (LPOVERLAPPED* )&PerIoData,
            INFINITE);
  //1、退出信号到达,退出线程
  if(ByteTransferred==-1 && PerIoData==NULL)
  {
   return 1L;
  }
  //2、客户机已经断开连接或者连接出现错误
  if(ByteTransferred==0 &&
     (PerIoData->OperType==RECV_POSTED || PerIoData->OperType==SEND_POSTED))
  {
   //1、将该客户端数据删除
   //2、通知上层该客户端已经断开
   //3、关闭套接口   
   continue;
  }
  //3、接收数据
  if(PerIoData->OperType==RECV_POSTED)
  {
   //调用回调函数,处理数据
   pSvr->m_pProcessRecvData(PerHandleData->IpAddr,
                         PerHandleData->sClient,
          PerIoData->RecvBuf,
          ByteTransferred);
   //将源数据置空
   memset(PerIoData->RecvBuf,0,BUFFER_SIZE);
   ByteTransferred=0;
   //重置IO操作数据
   unsigned long Flag=0;
   ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
   
   PerIoData->RecvDataBuf.buf=PerIoData->RecvBuf;
   PerIoData->RecvDataBuf.len=BUFFER_SIZE;
   PerIoData->OperType=RECV_POSTED;
   //提交另一个Recv请求
   WSARecv(PerHandleData->sClient,
    &(PerIoData->RecvDataBuf),
    1,
    &RecvByte,
    &Flag,
    &(PerIoData->OverLapped),
    NULL);
  }//4、发送数据完成,置空缓冲区,释放缓冲区  
  else if(PerIoData->OperType==SEND_POSTED)
  {
   memset(PerIoData,0,sizeof(PER_IO_OPERATION_DATA));
   GlobalFree(PerIoData);
   ByteTransferred=0;
  }
 }
 return 0L;
}

2、监听线程并启动工作者

DWORD WINAPI ListenWorkerProc(LPVOID lParam)
{
 CIocpModeSvr* pSvr=(CIocpModeSvr*)lParam;
 SOCKET Accept;
 while(true)
 {
  //1、接收客户的请求
  Accept = WSAAccept(pSvr->ListenSocket,NULL,&nLen,ConnectAcceptCondition,(DWORD)lParam);  

  //2、取得客户端信息
  sockaddr soad;
  sockaddr_in in;
  int len=sizeof(soad);
  if(getpeername(Accept,&soad,&len)==SOCKET_ERROR)
  {
   CString LogStr;
   LogStr.Format("getpeername() faild : %d",GetLastError());
   pSvr->WriteLogString(LogStr);
  }
  else
  {
   memcpy(&in,&soad,sizeof(sockaddr));
  }
  //3、给Socket创建完成端口

  //申请新的句柄操作数据
  LPPER_HANDLE_DATA PerHandleData=(LPPER_HANDLE_DATA) GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));
  //句柄数据
  PerHandleData->sClient=Accept;
  PerHandleData->IpAddr=in.sin_addr.S_un.S_addr;
  //存储客户信息
  ::EnterCriticalSection(&pSvr->cInfoSection);
  pSvr->ClientInfo.Add(*PerHandleData);
  ::LeaveCriticalSection(&pSvr->cInfoSection);
  //转储信息
  CString LogStr;
  LogStr.Format("UserIP: %s ,Socket : %d Connected!",inet_ntoa(in.sin_addr),Accept);
  pSvr->WriteLogString(LogStr);
  TRACE("\nUserIP: %s ,Socket : %d Connected!",inet_ntoa(in.sin_addr),Accept);
  //关联客户端口到完成端口,句柄数据在此时被绑定到完成端口
  CreateIoCompletionPort((HANDLE)Accept, pSvr->CompletionPort,(DWORD)PerHandleData,0);
  
  4、创建并投递工作者
  for(int i=0;i<pSvr->IOWorkerNum;i++)
  {
   //Io操作数据标志   
   PPER_IO_OPERATION_DATA PerIoData=(PPER_IO_OPERATION_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));
   unsigned long  Flag=0;
   DWORD RecvByte;
   ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
   
   PerIoData->RecvDataBuf.buf=PerIoData->RecvBuf;
   PerIoData->RecvDataBuf.len=BUFFER_SIZE;
   PerIoData->OperType=RECV_POSTED;
   //提交首个接收数据请求
   //这时
   //如果客户端断开连接
   //则也可以以接收数据时得到通知 
   WSARecv(PerHandleData->sClient,
    &(PerIoData->RecvDataBuf),
    1,
    &RecvByte,
    &Flag,
    &(PerIoData->OverLapped),
    NULL);
  }
 }
}

3、发消息

//提交发送消息请求,
//如果提交发送消息失败,
//则将导致在工作线程里将目标客户端的连接切断
bool CIocpModeSvr::SendMsg(SOCKET sClient,char * pData,unsigned long Length)
{
 if(sClient==INVALID_SOCKET || pData==NULL || Length==0 || !IsStart)return false;

 //申请操作键
 PPER_IO_OPERATION_DATA PerIoData=(PPER_IO_OPERATION_DATA) \
           GlobalAlloc(GPTR,
                                sizeof(PER_IO_OPERATION_DATA));

 //准备缓冲
 unsigned long  Flag=0;
 DWORD SendByte;
 ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
 memcpy(PerIoData->SendBuf,pData,Length);
 PerIoData->SendDataBuf.buf=PerIoData->SendBuf;
 PerIoData->SendDataBuf.len=Length;
 PerIoData->OperType=SEND_POSTED;
 int bRet=WSASend(sClient,
               &(PerIoData->SendDataBuf),
               1,
               &SendByte,
               Flag,
               &(PerIoData->OverLapped),
               NULL);
 if(bRet==SOCKET_ERROR && GetLastError()!=WSA_IO_PENDING)
 {
  CString LogStr;
  LogStr.Format("WSASend With Error : %d",GetLastError());
  WriteLogString(LogStr);
  return false;
 }
 else return true;
 
 return false;
}

4、启动IOCP

int CIocpModeSvr::InitNetWork(unsigned int SvrPort,std::string *pHostIpAddress)
{
 //启动网络
 CString LogStr;
 int Error=0;
 WSADATA wsaData;
 char Name[100];
 hostent *pHostEntry;
 in_addr rAddr;
 //Net Start Up
 Error=WSAStartup(MAKEWORD(0x02,0x02),&wsaData);
 if(Error!=0)
 {
  Error = WSAGetLastError();
  pHostIpAddress->assign( "" );
  
  LogStr.Format("WSAStartUp Faild With Error: %d",Error);
  WriteLogString(LogStr);
  
  return Error;
 }
 //Make Version
 if ( LOBYTE( wsaData.wVersion ) != 2 ||
  HIBYTE( wsaData.wVersion ) != 2 )
 {
  WSACleanup( );
  
  WriteLogString("The Local Net Version Is not 2");
  
  return -1;
 }
 //Get Host Ip
 Error = gethostname ( Name, sizeof(Name) );
 if( 0 == Error )
 {
  pHostEntry = gethostbyname( Name );
  if( pHostEntry != NULL )
  {
   memcpy( &rAddr, pHostEntry->h_addr_list[0], sizeof(struct in_addr) );
   pHostIpAddress->assign( inet_ntoa( rAddr ) );
  }
  else
  {
   Error = WSAGetLastError();
   LogStr.Format("GetHostIp faild with Error: %d",Error);
   WriteLogString(LogStr);
   return Error;
   
  }
 }
 else
 {
  Error = WSAGetLastError();
  LogStr.Format("gethostname faild with Error: %d",Error);
  WriteLogString(LogStr);
  return Error;
 }
 if(0==Error)
 {
  //创建侦听端口
  ListenSocket=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
  if(ListenSocket==INVALID_SOCKET)
  {
   Error = WSAGetLastError();
   LogStr.Format("CreateSocket faild with Error: %d",Error);
   WriteLogString(LogStr);
   return Error;
  }
 }
 //绑定到目标地址
 if(0==Error)
 {
  sockaddr_in InternetAddr;
  InternetAddr.sin_family=AF_INET;
  InternetAddr.sin_addr.S_un.S_addr=htonl(INADDR_ANY);
  InternetAddr.sin_port=htons(SvrPort);
  if(bind(ListenSocket,
       (PSOCKADDR )&InternetAddr,
    sizeof(InternetAddr))==SOCKET_ERROR)
  {
   Error=GetLastError();
   LogStr.Format("bind Socket faild with Error: %d",Error);
   WriteLogString(LogStr);
   return Error;
  }
 }
 //侦听端口上的连接请求
 if(0==Error)
 {
  if( listen(ListenSocket,5)==SOCKET_ERROR)
  {
   Error=GetLastError();
   LogStr.Format("listen Socket faild with Error: %d",Error);
   WriteLogString(LogStr);
   return Error;
  }
 }
 //创建完成端口句柄
 if(0==Error)
 {
  CompletionPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
  if(CompletionPort==INVALID_HANDLE_VALUE)
  {
   Error=GetLastError();
   LogStr.Format("CreateIoCompletionPort faild with Error: %d",Error);
   WriteLogString(LogStr);
   return Error;
  }
 }
 //启动工作线程,线程数为CPU处理器数量*2+2
 if(0==Error)
 { 
  SYSTEM_INFO sys_Info;
  GetSystemInfo(&sys_Info);
  for(int i=0;i<sys_Info.dwNumberOfProcessors*2+2;i++)
  {
   HANDLE ThreadHandle;
   DWORD ThreadID;
   
   ThreadHandle=CreateThread(NULL,
    0,
    ServerWorkerProc,
    this,
    0,
    &ThreadID);
   if(ThreadHandle==NULL)
   {
    Error = WSAGetLastError();
    LogStr.Format("Create Server Work Thread faild with Error: %d",Error);
    WriteLogString(LogStr);
    return Error;
   } 
   CloseHandle(ThreadHandle);
  }
 }
 //启动侦听线程
 if(0==Error)
 {
  DWORD thID;
  ListenThreadHandle=CreateThread(NULL,
                               0,
          ListenProc,
          this,
          0,
          &thID);
  if(ListenThreadHandle==NULL)
  {
   Error = WSAGetLastError();
   LogStr.Format("Create Listen Thread faild with Error: %d",Error);
   WriteLogString(LogStr);
   return Error;  
  }
 }
 return Error;
}

4、退出IOCP

void CIocpModeSvr::UnInit()
{
 if(!IsStart)return;
 //退出工作线程
 SYSTEM_INFO sys_Info;
 GetSystemInfo(&sys_Info);
 for(int i=0;i<sys_Info.dwNumberOfProcessors*2+2;i++)
 {
  //寄出退出消息
  PostQueuedCompletionStatus(CompletionPort, -1, -1,NULL);
 }
 //退出侦听线程
 ::TerminateThread(ListenThreadHandle,1L);
 ::WaitForSingleObject(ListenThreadHandle,10);
 CloseHandle(ListenThreadHandle);
 //关闭网络的侦听
 shutdown(ListenSocket,0);
 closesocket(ListenSocket);
 //切断当前所有连接
 DisConnectAll(); 
 ::DeleteCriticalSection(&cInfoSection);
 m_pProcessRecvData=NULL;
 IsStart=false;
}
原文地址:https://www.cnblogs.com/virtualNatural/p/1918999.html