IOCP模型示例

IOCPManager.h文件:
#ifndef   _IOCP_MANAGER_H_
#define  _IOCP_MANAGER_H_

#include 
<atlcoll.h>

#define DATA_BUFSIZE   8192 // 把数据缓冲区定义为8K是一个比较被Microsoft推荐的大小
#define  WAIT_TIME_FOR_THREAD  5000   //等待线程结束的超时值

//单句柄数据
typedef struct _SPerIOHandle 
{
    SOCKET  m_hSocket;  
//保存套接字句柄

    
static  _SPerIOHandle*  AllocPerIOHandle( SOCKET  hSocket )
    {
        _SPerIOHandle
*  pPerIOHandle = (_SPerIOHandle*)::GlobalAlloc( GPTR, sizeof( _SPerIOHandle ));
        
if( pPerIOHandle != NULL )
        {
            pPerIOHandle
->m_hSocket   =  hSocket;
        }
        
return pPerIOHandle;
    }

    
static  void  FreePerIOHandle( _SPerIOHandle* pPerIOHandle )
    {
        ::GlobalFree(  pPerIOHandle );
    }

}SPerIOHandle, 
*PSPerIOHandle;


//投送操作类型
enum EOperation
{
    IO_ACCEPT,
    IO_WRITE,
    IO_READ
};

//投送操作数据
typedef struct _SPerIOData
{
    OVERLAPPED  m_overlapped;   
//重叠结构
    WSABUF        m_wsaDataBuf; 
    CHAR             m_szBuffer[DATA_BUFSIZE]; 
//投送操作的数据缓冲区
    SOCKET         m_hAcceptSocket;                 //用于Accept投放操作,保存连接的套接字句柄
    EOperation     m_oprType;          //投送类型

    
static  _SPerIOData*  AllocPerIOData( EOperation oprType )
    {
        _SPerIOData
*  pPerIOData = (_SPerIOData*)::GlobalAlloc( GPTR, sizeof( _SPerIOData ));
        
if( pPerIOData != NULL )
        {
            pPerIOData
->m_oprType             =  oprType;
            
//使用AcceptEx函数需要事先创建连接套件字
            pPerIOData->m_hAcceptSocket   =   ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); ;
            pPerIOData
->m_overlapped.Internal = 0;
            pPerIOData
->m_overlapped.InternalHigh = 0;
            pPerIOData
->m_overlapped.Offset = 0;
            pPerIOData
->m_overlapped.OffsetHigh = 0;
            pPerIOData
->m_overlapped.hEvent = NULL;

            pPerIOData
->m_wsaDataBuf.buf  = pPerIOData->m_szBuffer;
            pPerIOData
->m_wsaDataBuf.len  = sizeof( pPerIOData->m_szBuffer );

            ZeroMemory( pPerIOData
->m_wsaDataBuf.buf,  pPerIOData->m_wsaDataBuf.len );
        }
        
return pPerIOData;
    }

    
static  void  FreePerIOData( _SPerIOData* pPerIOData )
    {
        ::GlobalFree( pPerIOData );
    }
}SPerIOData, 
*PSPerIOData;

/*
*  IOCP管理员
*/
class  CIOCPManager
{
private:
    HANDLE  m_hCompletePort;   
//完成端口句柄
    CAtlArray<HANDLE>  m_arrOprThread;  //保存工作线程句柄
    CAtlArray<HANDLE>  m_arrAcceptThread; //保存接受连接的线程句柄

    
//接受连接线程处理函数
    static DWORD WINAPI sAcceptThread( LPVOID lpParameter );
    
//工作线程处理函数
    static DWORD WINAPI sWorkThread( LPVOID lpParameter );

protected:
    
/* 处理Accept投送结果
    * pPerIOHandle: 保存单句柄数据
    * pPerIOData: 保存投送数据
    * bRelease参数指示在处理函数中是否释放I/O重叠结构
    
*/
    
virtual  int  HandleAccept( PSPerIOHandle pPerIOHandle, PSPerIOData pPerIOData, BOOL& bRelease );  
    
//处理Recv投送结果
    virtual  int  HandleRecvData( PSPerIOHandle pPerIOHandle, PSPerIOData pPerIOData, BOOL& bRelease ); 
    
//处理Send投送结果
    virtual  int  HandleSendData( PSPerIOHandle pPerIOHandle, PSPerIOData pPerIOData, BOOL& bRelease );

public:
    CIOCPManager();
    
    
/*创建完成端口管理
    * nThreadCount: 指定工作线程的数目,为0时默认指定为: CPU个数 * 2
    
*/
    BOOL  Create( 
int nThreadCount = 0  );
    
//销毁完成端口管理
    BOOL  Destroy();

    
/* 开启接收连接操作
    * hSocket:指定套接字,以便在其上进行Accept操作
    
*/
    BOOL  StartAccept( SOCKET hSocket );

    
/*
    * 绑定套接字到完成端口上
    
*/
    BOOL  AssociateSocketToIOCP( SOCKET hSocket );
};



#endif
IOCPManager.cpp文件:
#include "stdafx.h"
#include 
"IOCPManager.h"
#include 
<Mswsock.h>


DWORD WINAPI CIOCPManager::sAcceptThread( LPVOID lpParameter )
{
    SOCKET  hSocket  
=  (SOCKET)lpParameter;
    DWORD dwRecvNumBytes 
= 0;

    HANDLE  hEvent  
=  ::WSACreateEvent();
    
//利用EventSelect模型管理套接字的ACCEPT事件
    int  nRet  = ::WSAEventSelect( hSocket, hEvent, FD_ACCEPT );
    
if ( nRet  !=  0 )
    {
        
return  -1;
    }

    
while( TRUE  )
    {
        ::WSAWaitForMultipleEvents( 
1&hEvent, TRUE, WSA_INFINITE, FALSE );
        ::WSAResetEvent( hEvent );
        PSPerIOData pPerIOData 
= SPerIOData::AllocPerIOData( IO_ACCEPT );
        
int nRet =  ::AcceptEx( hSocket,  pPerIOData->m_hAcceptSocket,
            (LPVOID)( pPerIOData
->m_szBuffer) ,
            DATA_BUFSIZE 
- (2 * (sizeof(sockaddr_in  ) + 16)),
            
sizeof(sockaddr_in  ) + 16sizeof(sockaddr_in  ) + 16,
            
&dwRecvNumBytes, 
            (LPOVERLAPPED) 
&( pPerIOData->m_overlapped ));
        
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) 
        {
            SPerIOData::FreePerIOData( pPerIOData );
        }
    }

    
return 0;
}

DWORD WINAPI CIOCPManager::sWorkThread( LPVOID lpParameter )
{
    CIOCPManager  
*pIOCPMgr = (CIOCPManager*)lpParameter;
    ATLASSERT( pIOCPMgr 
!= NULL );

    
while ( TRUE )
    {
        DWORD  dwTransferred(
0);
        PSPerIOHandle  pPerIOHandle  
=  NULL;
        PSPerIOData  pIOOperation   
=  NULL;
        BOOL  bRet  
=  ::GetQueuedCompletionStatus( pIOCPMgr->m_hCompletePort, &dwTransferred, (PULONG_PTR)&pPerIOHandle,  (LPOVERLAPPED*)&pIOOperation, INFINITE );

        
if!bRet )
        {
            DWORD  dwError  
=  ::GetLastError();
            
return 0;
        }

        
//结束机制
        if( pPerIOHandle == NULL )
            
return 0;

        ATLASSERT( pPerIOHandle 
!= NULL );
        ATLASSERT( pIOOperation 
!= NULL );

        
//socket退出
        if( dwTransferred == 0 )
        {
            ::closesocket( pPerIOHandle
->m_hSocket );
            SPerIOHandle::FreePerIOHandle( pPerIOHandle );
            SPerIOData::FreePerIOData( pIOOperation );
            
continue;
        }
        
        BOOL  bRelease 
= FALSE;
        
switch( pIOOperation->m_oprType )
        {
        
case IO_ACCEPT:
            {
                pIOCPMgr
->HandleAccept( pPerIOHandle, pIOOperation, bRelease );
            }
            
break;
        
case IO_READ:
            {
                pIOCPMgr
->HandleRecvData( pPerIOHandle, pIOOperation, bRelease );
            }
            
break;
        
case IO_WRITE:
            {
                pIOCPMgr
->HandleSendData( pPerIOHandle, pIOOperation, bRelease );
            }
            
break;
        }
        
if!bRelease )
        {
            
//释放 pIOOperation
            SPerIOData::FreePerIOData( pIOOperation );
        }
    
    }

    
return 0;
}

CIOCPManager::CIOCPManager()
{
    m_hCompletePort   
=  NULL;
    m_arrOprThread.RemoveAll();
    m_arrAcceptThread.RemoveAll();
}

int  CIOCPManager::HandleAccept( PSPerIOHandle pPerIOHandle, PSPerIOData pPerIOData, BOOL& bRelease )
{
    bRelease   
=   TRUE;   //由于需要循环使用PSPerIOData对象,在此将bRelease设为FALSE避免删除

    ATLASSERT( pPerIOHandle 
!= NULL );
    ATLASSERT( pPerIOData 
!= NULL );

    
//绑定到完成端口
    if!this->AssociateSocketToIOCP( pPerIOData->m_hAcceptSocket ) )
        
return -1;

    
//投递接收申请
    pPerIOData->m_oprType  = IO_READ;
    ZeroMemory( pPerIOData
->m_wsaDataBuf.buf,  pPerIOData->m_wsaDataBuf.len );
    DWORD dwIoSize  
=  0;
    DWORD     dwlFlags     
=  MSG_PARTIAL;
    ::WSARecv( pPerIOData
->m_hAcceptSocket, &pPerIOData->m_wsaDataBuf, 1&dwIoSize, &dwlFlags, &pPerIOData->m_overlapped, NULL );
    
    
return 0;
}

int  CIOCPManager::HandleRecvData( PSPerIOHandle pPerIOHandle, PSPerIOData pPerIOData, BOOL& bRelease )
{
    bRelease   
=   TRUE;

    ATLASSERT( pPerIOHandle 
!= NULL );
    ATLASSERT( pPerIOData 
!= NULL );

    
//显示接收数据到输出窗口
    CString  strTrace;
    strTrace.Format( _T(
"recv message: %s \n"), CA2T(pPerIOData->m_szBuffer) );
    AtlTrace( strTrace );


    
//继续投递接收申请
    ZeroMemory( pPerIOData->m_wsaDataBuf.buf,  pPerIOData->m_wsaDataBuf.len );
    DWORD dwIoSize  
=  0;
    DWORD     dwlFlags     
=  0;
    ::WSARecv( pPerIOData
->m_hAcceptSocket, &pPerIOData->m_wsaDataBuf, 1&dwIoSize, &dwlFlags, &pPerIOData->m_overlapped, NULL );

    
return 0;
}

int  CIOCPManager::HandleSendData( PSPerIOHandle pPerIOHandle, PSPerIOData pPerIOData, BOOL& bRelease )
{
    bRelease   
=   FALSE;
    
return 0;
}

BOOL  CIOCPManager::Create( 
int nThreadCount  )
{
    
if( nThreadCount == 0 )
    {
        
//设置为0时,将线程设置为电脑CPU数 * 2
        SYSTEM_INFO  sysInfo;
        ::GetSystemInfo( 
&sysInfo );
        nThreadCount  
=  sysInfo.dwNumberOfProcessors  *  2;
    }
    
//创建完成端口
    m_hCompletePort  =  ::CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, NULL, 0 );
    ATLASSERT( m_hCompletePort  
!=  NULL );
    
if( m_hCompletePort == NULL )return FALSE;

    
//开启工作线程等待完成端口上的投放操作结果
    m_arrOprThread.RemoveAll();
    
for ( int  nIndex = 0; nIndex < nThreadCount; nIndex++ )
    {
        HANDLE  hThread  
=  ::CreateThread( NULL, 0, sWorkThread, (LPVOID)this0, NULL );
        ATLASSERT( hThread 
!= NULL );
        m_arrOprThread.Add( hThread );
    }

    
return TRUE;
}

BOOL  CIOCPManager::Destroy()
{
    
//结束连接线程
    int  nAcceptThrCount = (int)m_arrAcceptThread.GetCount();
    
for ( int nIndex = 0; nIndex < nAcceptThrCount; nIndex++ )
    {  
//这里为简便,所以直接TerminateThread线程,有待完善
        ::TerminateThread( m_arrAcceptThread.GetAt( nIndex ), -1 );
        ::CloseHandle( m_arrAcceptThread.GetAt( nIndex ) );
    }

    m_arrAcceptThread.RemoveAll();

    
//结束工作线程
    int  nWorkThrCount = (int)m_arrOprThread.GetCount();
    
for ( int nIndex = 0; nIndex < nWorkThrCount; nIndex++ )
    {  
//投送结束线程通知(指定单句柄数据为NULL)
        ::PostQueuedCompletionStatus( m_hCompletePort, 0, NULL, NULL );
    }

    
for ( int nIndex = 0; nIndex < nWorkThrCount; nIndex++ )
    {
        
if( WAIT_TIMEOUT == ::WaitForSingleObject( m_arrOprThread.GetAt( nIndex ), WAIT_TIME_FOR_THREAD ) )
        {
            ::TerminateThread( m_arrOprThread.GetAt( nIndex ), 
-1 );
        }
        ::CloseHandle( m_arrOprThread.GetAt( nIndex ) );
    }

    m_arrOprThread.RemoveAll();

    
//删除完成端口对象
    if( m_hCompletePort != NULL )
    {
        ::CloseHandle( m_hCompletePort );
    }

    
return TRUE;
}

BOOL  CIOCPManager::StartAccept( SOCKET hSocket )
{
    
//关联套接字到完成端口
    if!this->AssociateSocketToIOCP( hSocket ) )
        
return FALSE;

    
//创建监听线程
    HANDLE  hThread  =  ::CreateThread( NULL, 0, sAcceptThread, (LPVOID)hSocket, 0, NULL );
    ATLASSERT( hThread 
!= NULL );
    m_arrAcceptThread.Add( hThread );

    
return TRUE;
}


BOOL  CIOCPManager::AssociateSocketToIOCP( SOCKET hSocket )
{
    ATLASSERT(  hSocket 
!= NULL );

    
//分配单句柄数据
    PSPerIOHandle  pPerIOHandle = SPerIOHandle::AllocPerIOHandle( hSocket );
    
if( pPerIOHandle == NULL )
        
return FALSE;

    m_hCompletePort  
=  ::CreateIoCompletionPort( (HANDLE)hSocket, m_hCompletePort, (ULONG_PTR)pPerIOHandle, 0 );

    ATLASSERT( m_hCompletePort  
!=  NULL );
    
if( m_hCompletePort == NULL )return FALSE;

    
return TRUE;
}

使用如:
        SOCKET hSocket = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); 
        
if( hSocket == INVALID_SOCKET )
        {
            
return 0;
        }

        SOCKADDR_IN        addr;
        addr.sin_family 
= AF_INET;
        addr.sin_addr.s_addr 
= INADDR_ANY;
        addr.sin_port 
= htons((short5969 );
        
int nRetVal = bind( hSocket, (SOCKADDR *)&addr,  sizeof(addr) );
        
if( nRetVal == SOCKET_ERROR ) 
            
return 0;

        nRetVal 
= listen( hSocket, 10 );
        
if( nRetVal == SOCKET_ERROR ) 
            
return 0;

        CIOCPManager iocpMgr;
        iocpMgr.Create();
        
if!iocpMgr.StartAccept( hSocket ) )
        {
            AtlMessageBox( NULL, _T(
"StartAccept Failed") );
        }
        
        Sleep( 
100000 );
        iocpMgr.Destroy();
原文地址:https://www.cnblogs.com/fangkm/p/1524064.html