iocp-socket 服务(借鉴别人的,根据自己的需要改的)未完待续

#pragma once
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#pragma comment(lib,"ws2_32.lib")

#define BUFFER_SIZE (1024*8) // 8KB
#define BUFFER_SIZE_DATA (3*BUFFER_SIZE )

#define NOTIFY_MSG_ACCEPT	0xa1
#define NOTIFY_MSG_CONNECT	0xa2
#define NOTIFY_MSG_DISCONNECT 0xa3
#define NOTIFY_MSG_READ		0xa4
#define NOTIFY_MSG_WRITE	0xa5


struct PER_IO_BUFFER
{
	WSAOVERLAPPED ol;
	SOCKET		sClient ;		//the socket of client use by AcceptEx  
	LPBYTE		lpBuffer;		// the pointer of buffer
	DWORD		dwBufferSize;	// the size of buffer
	DWORD		dwTrans	;		// the size of io-trans
	BYTE		opType;			// opteion type 
#define OP_ACCEPT	6
#define OP_CONNECT	7
#define OP_WRITE	8
#define OP_READ		9
	PER_IO_BUFFER* 	pNext;		// next buffer

};

struct PER_HANDLE_DATA
{
	SOCKET s;
	SOCKADDR_IN saddr;
	BOOL bConnect;
	BYTE readBytes[BUFFER_SIZE_DATA];
	DWORD dwBufferOffSet;
	HANDLE m_hWriteComplete ;
	//DWORD dwBufferSize;
	PER_HANDLE_DATA* pNext;
};

typedef void (__stdcall* PNOTIFYPROC)(PER_HANDLE_DATA*,PER_IO_BUFFER* ,DWORD );
class Ciocp
{
private:
	HANDLE	m_hIocp;			// iocp handle
	SOCKET	m_sListen;			// the socket of listen 
	SOCKET	m_sConnect;			// the socket of connect 
	DWORD	m_dwProt;			// the port of listen 
	DWORD	m_dwMaxConns;		// the max count of connectios
	DWORD	m_dwMaxFreeBuffers;	// the max count of freebuffers
	DWORD	m_dwMaxFreeContexts;// the max count of freecontexts
	DWORD	m_dwInitOp;			// the count of init-op
	ULONGLONG m_ulWriteBytes;	// the total of write-bytes 
	ULONGLONG m_ulReadBytes;	// the total of read-bytes

	DWORD m_dwWorkThreadCount ;	// the count of worker thread
	DWORD m_dwCurWorkThreadCount ;
	CRITICAL_SECTION m_csWorkLock ;
	CRITICAL_SECTION m_csIoLock ;

	CRITICAL_SECTION m_csBuffersListLock;		// the cs-lock of free-buffers-list 
	CRITICAL_SECTION m_csContextsListLock;		// the cs-lock of free-count
	DWORD			m_dwBuffersCount;			// the count of cur buffers
	DWORD			m_dwContextsCount;			// the count of cur contexts

	PER_IO_BUFFER* m_pFreeBuffersList;		// the list of free-buffers
	PER_HANDLE_DATA* m_pFreeContextsList;	// the list of free-contexts

	SOCKADDR_IN		m_siRemoteAddr ;
	bool			m_bStarted ;			// the status of socket server

	LPFN_ACCEPTEX				m_lpfnAcceptEx ;
	LPFN_GETACCEPTEXSOCKADDRS	m_lpfnGetAcceptExSockAddrs;
	LPFN_CONNECTEX				m_lpfnConnectEx ;
	PNOTIFYPROC					m_pNotifyProc;

	BOOL PostAccept();
	BOOL PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize);
	BOOL PostRead(PER_HANDLE_DATA* pContext);
	BOOL PostConnect();
	static unsigned int WINAPI _WorkerThreadProc(LPVOID lpParam);
	PER_IO_BUFFER* AllocBuffer(DWORD dwSize = BUFFER_SIZE);
	PER_HANDLE_DATA* AllocContext(SOCKET s);
	void ReleaseContext(PER_HANDLE_DATA* pContext);
	void ReleaseIoBuffer(PER_IO_BUFFER* pBuffer);
	void CreateWokerThreads();
	void CreateIocp();
	void AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext=0);
	void DecCurWorkCount();
	void HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans);
	void NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg);
	void ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
	BOOL SetKeepAlive(SOCKET s);
	void ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
	void ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
public:
	Ciocp(void);
	~Ciocp(void);
	BOOL Start(PNOTIFYPROC pNotifyProc, DWORD dwPort=8080,DWORD dwMaxConns= 2000,DWORD dwMaxFreeBuffers = 100,DWORD dwMaxFreeContexts =100 ,DWORD dwInitOp = 5 );
	void Shutdown();
	BOOL Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp = "127.0.0.1",DWORD dwPort=443 );
	void GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite);
	void Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize);
	void ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);

};

  

#include "StdAfx.h"
#include "iocp.h"
#include <process.h>
#include <mstcpip.h>


Ciocp::Ciocp(void)
{

	m_hIocp = INVALID_HANDLE_VALUE ;
	m_sListen = INVALID_SOCKET ;
	m_sConnect = INVALID_SOCKET ;
	m_dwInitOp =5;
	m_dwMaxConns = 2000;
	m_dwMaxFreeContexts = 100;
	m_dwMaxFreeBuffers = 100 ;
	m_ulWriteBytes =0;
	m_ulReadBytes = 0;
	// get max worker count
	SYSTEM_INFO sys_info ;
	GetSystemInfo(&sys_info);
	m_dwWorkThreadCount = sys_info.dwNumberOfProcessors *2 ;
	m_dwCurWorkThreadCount = 0;
	InitializeCriticalSection(& m_csBuffersListLock);
	InitializeCriticalSection(& m_csContextsListLock);
	InitializeCriticalSection(& m_csWorkLock);
	InitializeCriticalSection((& m_csIoLock));
	m_dwBuffersCount = 0;
	m_dwContextsCount = 0;

	m_pFreeBuffersList = NULL ;
	m_pFreeContextsList = NULL ;

	m_lpfnAcceptEx = NULL ;
	m_lpfnConnectEx = NULL ;
	m_lpfnGetAcceptExSockAddrs = NULL ;
	m_bStarted = false ;

	m_dwProt = 0;
	WSADATA wsaData ;
	WORD sockVerSion = MAKEWORD(2,2);
	WSAStartup(sockVerSion,&wsaData);
}


Ciocp::~Ciocp(void)
{
	DeleteCriticalSection(& m_csBuffersListLock);
	DeleteCriticalSection(& m_csContextsListLock);
	DeleteCriticalSection(& m_csWorkLock);
	DeleteCriticalSection(& m_csIoLock);
	if (m_sListen != INVALID_SOCKET)
	{
		closesocket(m_sListen);
	}

	if (m_sConnect != INVALID_SOCKET)
	{
		closesocket(m_sConnect);
	}
}

BOOL Ciocp::Start(PNOTIFYPROC pNotifyProc, DWORD dwPort/*=8080*/,DWORD dwMaxConns/*= 2000*/,DWORD dwMaxFreeBuffers /*= 100*/,DWORD dwMaxFreeContexts /*=100 */,DWORD dwInitOp /*= 5 */)
{
	m_pNotifyProc = pNotifyProc ;
	m_dwProt = dwPort;
	m_dwMaxConns = dwMaxConns;
	m_dwMaxFreeBuffers = dwMaxFreeBuffers;
	m_dwMaxFreeContexts = dwMaxFreeContexts;
	m_dwInitOp = dwInitOp ;

	m_sListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
	SOCKADDR_IN saddr ;
	saddr.sin_family = AF_INET;
	saddr.sin_port = ntohs(m_dwProt);
	saddr.sin_addr.S_un.S_addr = INADDR_ANY ;
	m_bStarted = true ;

	if (bind(m_sListen,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
	{
		m_bStarted = false ;
		return FALSE ;
	}

	listen(m_sListen,m_dwMaxConns);

	CreateIocp();

	GUID guidAcceptEx = WSAID_ACCEPTEX ;
	DWORD dwBytes ;
	WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidAcceptEx,sizeof(guidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwBytes,NULL,NULL);

	GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ;

	WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidGetAcceptExSockAddrs,sizeof(guidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwBytes,NULL,NULL);

	AddSocketToIocp(m_sListen);
	//create worker
	CreateWokerThreads();
	
	// post accept 
	
	for(int i=0;i < m_dwInitOp;i++)
	{
		
		PostAccept();
	}


	
}

BOOL Ciocp::PostAccept()
{
	// set io type
	PER_IO_BUFFER* pBuffer = NULL ;
	pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
	pBuffer->opType = OP_ACCEPT ;

	// post io 
	DWORD dwBytes ;
	DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
	pBuffer->sClient = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
// 	BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
// 		pBuffer->lpBuffer,pBuffer->dwBufferSize-dwAddrSize*2 
// 		,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
	BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
		pBuffer->lpBuffer,0 
		,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
	if (!b && WSAGetLastError() != WSA_IO_PENDING)
	{
		return FALSE ;
	}

	return TRUE;


}

BOOL Ciocp::PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize)
{
	PER_IO_BUFFER* pBuffer = AllocBuffer();
	pBuffer->opType = OP_WRITE; 

	// post i/o 
	DWORD dwBytes;  
	DWORD dwFlags = 0;  
	WSABUF buf;  
	buf.buf = (char*)lpBuffer;  
	buf.len = dwSize  ;  
	if(::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
	{  
		if(::WSAGetLastError() != WSA_IO_PENDING)  
		{  

			return FALSE;  
		}  
	}  

	return TRUE ;
	
}

BOOL Ciocp::PostRead(PER_HANDLE_DATA* pContext)
{
	PER_IO_BUFFER* pBuffer = AllocBuffer();
	pBuffer->opType = OP_READ; 

	// post i/o 
	DWORD dwBytes;  
	DWORD dwFlags = 0;  
	WSABUF buf;  
	buf.buf = (char*)pBuffer->lpBuffer;  
	buf.len = pBuffer->dwBufferSize  ;  
	if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
	{  
		if(::WSAGetLastError() != WSA_IO_PENDING)  
		{  
			
			return FALSE;  
		}  
	}  

	return TRUE ;
}

BOOL Ciocp::PostConnect()
{
	PER_IO_BUFFER* pBuffer = NULL ;

	pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);

	SOCKADDR_IN saddr ;
	saddr.sin_family = AF_INET;
	saddr.sin_port = htons(0);
	saddr.sin_addr.s_addr = htonl(ADDR_ANY);

	if (m_sConnect != INVALID_SOCKET)
	{
		closesocket(m_sConnect);
		m_sConnect = INVALID_SOCKET ;
	}
	m_sConnect = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);

	if (bind(m_sConnect,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
	{
		m_bStarted = false ;
		return FALSE ;
	}
	
	
	PER_HANDLE_DATA *pContext = AllocContext(m_sConnect);
	pContext->bConnect = TRUE ;
	AddSocketToIocp(m_sConnect,pContext);
	//create worker
	
	if (m_lpfnConnectEx == NULL )
	{
		GUID guidConnectex = WSAID_CONNECTEX ;
		DWORD dwBytes ;
		WSAIoctl(m_sConnect,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidConnectex,sizeof(guidConnectex),&m_lpfnConnectEx,sizeof(m_lpfnConnectEx),&dwBytes,NULL,NULL);
	}


	DWORD dwSend = 0;
	pBuffer->opType = OP_CONNECT ;
	strcpy((LPSTR)pBuffer->lpBuffer,"hello kid");
	pBuffer->dwBufferSize = 5;
	bool b = m_lpfnConnectEx(m_sConnect,(SOCKADDR*)&m_siRemoteAddr,sizeof(m_siRemoteAddr),
		pBuffer->lpBuffer,pBuffer->dwBufferSize,&dwSend,& pBuffer->ol);

	if(!b && ::WSAGetLastError() != WSA_IO_PENDING)  
	{  
		return FALSE;  
	}  

	return TRUE;  
}

PER_IO_BUFFER* Ciocp::AllocBuffer(DWORD dwSize)
{
	OutputDebugString(L"Buffer ++ 
 ");
	PER_IO_BUFFER* pBuffer = NULL;
	if (dwSize > BUFFER_SIZE)
	{
		return NULL ;
	}
	
	EnterCriticalSection(&m_csBuffersListLock);
	if (m_pFreeBuffersList == NULL )
	{
		// 2) HeapAlloc buffer 
		pBuffer = (PER_IO_BUFFER*) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_BUFFER)+ BUFFER_SIZE );
	}else
	{
		// 1) check pFreeBuffersList
		pBuffer = m_pFreeBuffersList ;
		m_pFreeBuffersList = m_pFreeBuffersList->pNext ;
		pBuffer->pNext = 0;
		m_dwBuffersCount -- ;
	}
	LeaveCriticalSection(&m_csBuffersListLock);
	// 
	if (pBuffer!= NULL )
	{
		pBuffer->dwBufferSize = dwSize ;
		pBuffer->lpBuffer =(LPBYTE) (pBuffer+1 );


	}
	return pBuffer ;
	
}

unsigned int WINAPI Ciocp::_WorkerThreadProc(LPVOID lpParam)
{
	Sleep(1000);
	Ciocp* pThis = (Ciocp*)lpParam ;
	DWORD dwTrans = 0;
	DWORD dwKey = 0;
	LPOVERLAPPED lpol;
	PER_IO_BUFFER* pBuffer = NULL ;
	while (pThis->m_bStarted )
	{
		BOOL bOk = GetQueuedCompletionStatus(pThis->m_hIocp,&dwTrans,&dwKey,&lpol,WSA_INFINITE);
		if (dwTrans == -1 )
		{
			pThis->DecCurWorkCount();
			_endthreadex(0);
			return 0;
		}
		pBuffer = CONTAINING_RECORD(lpol,PER_IO_BUFFER,ol);
		if (!bOk)
		{
			if (pBuffer->opType == OP_CONNECT)
			{
				DWORD dwError = WSAGetLastError();
				if (dwError != ERROR_IO_PENDING)
				{
					
					pThis->PostConnect();
					pThis->ReleaseContext((PER_HANDLE_DATA*)dwKey);
				}else
				{
					OutputDebugString(L"Connect Pending .... ");
				}
			}// (pBuffer->opType == OP_CONNECT)

			pThis->ReleaseIoBuffer(pBuffer);
		}else //(!bOk)
		{
			pThis->HandleIoOp((PER_HANDLE_DATA*)dwKey,pBuffer,dwTrans);
		}
	}
	pThis->DecCurWorkCount();
	return  WSAGetLastError();
}

void Ciocp::CreateWokerThreads()
{
	
	for (int i= 0;i< m_dwWorkThreadCount-m_dwCurWorkThreadCount ;i++)
	{
		unsigned threadid;
		_beginthreadex(NULL,0,_WorkerThreadProc,this,0,&threadid);
		m_dwCurWorkThreadCount ++ ;
	}
}

void Ciocp::CreateIocp()
{
	if (m_hIocp == INVALID_HANDLE_VALUE)
	{
		m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
	}
}

void Ciocp::AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext/*=0*/)
{
	CreateIoCompletionPort((HANDLE)s,m_hIocp,(DWORD)pContext,0);
}



BOOL Ciocp::Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp /*= "127.0.0.1"*/,DWORD dwPort/*=443 */)
{
	m_pNotifyProc = pNotifyProc ;

	m_siRemoteAddr.sin_family = AF_INET;
	m_siRemoteAddr.sin_port = htons(dwPort);
	m_siRemoteAddr.sin_addr.S_un.S_addr = inet_addr(lpstrIp);
	
	

	//listen(m_sListen,m_dwMaxConns);

	CreateIocp();

	m_bStarted = true ;
	CreateWokerThreads();

	
	// post accept 
	

	return PostConnect();


}

PER_HANDLE_DATA* Ciocp::AllocContext(SOCKET s)
{
	PER_HANDLE_DATA *pContext = NULL ;
	OutputDebugString(L"Context ++ 
");
	

	EnterCriticalSection(&m_csContextsListLock);
	if (m_pFreeContextsList== NULL )
	{
		pContext = (PER_HANDLE_DATA*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_HANDLE_DATA));
	}else
	{
		pContext = m_pFreeContextsList ;
		m_pFreeContextsList = pContext->pNext ;
		pContext->pNext = 0;
		m_dwContextsCount -- ;
	}
	LeaveCriticalSection(& m_csContextsListLock);

	if (pContext != NULL)
	{
		pContext->s = s ;
		pContext->m_hWriteComplete = CreateEvent(NULL,true,TRUE,NULL);
	}
	return pContext ;
}

void Ciocp::DecCurWorkCount()
{
	EnterCriticalSection(&m_csWorkLock);
	m_dwCurWorkThreadCount -- ;
	LeaveCriticalSection(&m_csWorkLock);
}



void Ciocp::HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans)
{
	EnterCriticalSection(&m_csIoLock);
	
	switch(pBuffer->opType)
	{
	case OP_ACCEPT:
		//OutputDebugString(L"Accept 
");
		ProcessIoAccept(pContext,pBuffer,dwTrans);
		break;
	case OP_CONNECT:
		
		ProcessIoConnect(pContext,pBuffer,dwTrans);

		break;
	case OP_WRITE:
		//OutputDebugString(L"Write 
");
		ProcessIoWrite(pContext,pBuffer,dwTrans);
		break;
	case OP_READ:
		//OutputDebugString(L"Read 
");
		ProcessIoRead(pContext,pBuffer,dwTrans);
		break;
	default:
		OutputDebugString(L"HandleIoOp Default... 
");
		break;
	}
	// release pBuffer 
	//ReleaseIoBuffer(pBuffer);

	LeaveCriticalSection(&m_csIoLock);
}

void Ciocp::ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
	if (dwTrans ==0 ) // read error 
	{
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);

		if (pContext->bConnect) // is connect 
		{
			
			PostConnect();
			
		}else
		{

		}
		ReleaseContext(pContext);


	}else
	{
		//  read ok notify main thread  
		pBuffer->dwTrans = dwTrans ;
		if (pContext->dwBufferOffSet+pBuffer->dwTrans > BUFFER_SIZE_DATA )
		{
			closesocket(pContext->s);
			if (pContext->bConnect)
			{
				PostConnect();
			}
			return ;
		}
		memcpy(pContext->readBytes+pContext->dwBufferOffSet,pBuffer->lpBuffer,pBuffer->dwTrans);
		pContext->dwBufferOffSet+= pBuffer->dwTrans ;

		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_READ);
		PostRead(pContext);
		
	}

	ReleaseIoBuffer(pBuffer);
	
}

void Ciocp::NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg)
{
	if (m_pNotifyProc == NULL )
	{
		OutputDebugString(L"NotifyMsg m_pNotifyProc is NUll 
");
	}
	if(!IsBadCodePtr((FARPROC)m_pNotifyProc))
	{
		m_pNotifyProc(pContext,pBuffer,dwMsg);
	}else
	{
		OutputDebugString(L" m_pNotifyProc is badcodeptr 
");
	}
}

void Ciocp::ReleaseContext(PER_HANDLE_DATA* pContext)
{
	OutputDebugString(L"Context -- 
");
	EnterCriticalSection(& m_csContextsListLock);
	CloseHandle(pContext->m_hWriteComplete);

	ZeroMemory(pContext,sizeof(PER_HANDLE_DATA));
	if (m_dwContextsCount > m_dwMaxFreeContexts)
	{
		HeapFree(GetProcessHeap(),0,pContext);
		LeaveCriticalSection(& m_csContextsListLock);
		return ;
	}else
	{
		
		pContext->pNext = m_pFreeContextsList;
		m_pFreeContextsList = pContext ;
		m_dwContextsCount ++ ;
		
	}
	LeaveCriticalSection(& m_csContextsListLock);
}

void Ciocp::ReleaseIoBuffer(PER_IO_BUFFER* pBuffer)
{
	OutputDebugString(L"Buffer -- 
");
	EnterCriticalSection(& m_csBuffersListLock);
	ZeroMemory(pBuffer,sizeof(PER_IO_BUFFER)+pBuffer->dwBufferSize);
	if (m_dwBuffersCount > m_dwMaxFreeBuffers)
	{
		HeapFree(GetProcessHeap(),0,pBuffer);
		LeaveCriticalSection(& m_csBuffersListLock);
		return ;
	}else
	{
		pBuffer->pNext = m_pFreeBuffersList ;
		m_pFreeBuffersList =pBuffer ;
		m_dwBuffersCount ++ ;
	}
	LeaveCriticalSection(& m_csBuffersListLock);

}

void Ciocp::ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
	if (dwTrans == 0)
	{
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
		ReleaseContext(pContext);
	}else
	{
		int nAddrLen = sizeof(SOCKADDR_IN);
		getsockname(pContext->s,(SOCKADDR*)&pContext->saddr,&nAddrLen);
		SetKeepAlive(pContext->s);
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_CONNECT);
		//OutputDebugString(L"Connect 
");
		PostRead(pContext);
	}
	ReleaseIoBuffer(pBuffer);
}

void Ciocp::Shutdown()
{
	m_bStarted = false ;

	if (m_sListen != INVALID_SOCKET )
	{
		closesocket(m_sListen);
		m_sListen = INVALID_SOCKET ;

	}
	if (m_sConnect != INVALID_SOCKET)
	{
		closesocket(m_sConnect);
		m_sConnect = INVALID_SOCKET ;
	}

	while (m_dwCurWorkThreadCount > 0)
	{
		::PostQueuedCompletionStatus(m_hIocp, -1, 0, NULL);  
		Sleep(100);
	}

	PER_IO_BUFFER* pBuffer = m_pFreeBuffersList ;
	while(pBuffer)
	{
		m_pFreeBuffersList = pBuffer->pNext ;
		HeapFree(GetProcessHeap(),0,pBuffer);
		pBuffer = m_pFreeBuffersList ;
	}
	m_dwBuffersCount = 0;
	PER_HANDLE_DATA* pContext = m_pFreeContextsList ;
	while(pContext)
	{
		m_pFreeContextsList = pContext->pNext ;
		HeapFree(GetProcessHeap(),0,pContext);
		pContext = m_pFreeContextsList ;
	}
	m_dwContextsCount = 0 ;
	
	
}

void Ciocp::ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{

	LPSOCKADDR lpLocalAddr,lpRemoteAddr;
	int	nLocalAddr,nRemoteAddr;
	DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;

	PER_HANDLE_DATA* pContext1 = AllocContext(pBuffer->sClient);

	m_lpfnGetAcceptExSockAddrs(pBuffer->lpBuffer,pBuffer->dwBufferSize- 2*dwAddrSize,dwAddrSize,dwAddrSize,&lpLocalAddr,&nLocalAddr,&lpRemoteAddr,&nRemoteAddr);
	memcpy(& (pContext1->saddr),lpRemoteAddr,nRemoteAddr);


	SetKeepAlive(pBuffer->sClient);

	AddSocketToIocp(pBuffer->sClient,pContext1);

	NotifyMsg(pContext1,pBuffer,NOTIFY_MSG_ACCEPT);

	PostRead(pContext1);

	PostAccept();

	ReleaseIoBuffer(pBuffer);
}

BOOL Ciocp::SetKeepAlive(SOCKET s)
{
	BOOL bKeepAlive = TRUE;  
	int nRet = ::setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));  
	if (nRet == SOCKET_ERROR)
	{
		return false ;
	}else
	{
		tcp_keepalive alive_in = {0};
		tcp_keepalive alive_out = {0};

		alive_in.keepalivetime = 5000;
		alive_in.keepaliveinterval = 1000;
		alive_in.onoff = TRUE ;
		unsigned long ulBytesReturn = 0 ;
		nRet = WSAIoctl(s,SIO_KEEPALIVE_VALS,&alive_in,sizeof(alive_in),&alive_out,sizeof(alive_out),&ulBytesReturn,NULL,NULL);
		if (nRet == SOCKET_ERROR)
		{
			return FALSE ;
		}

	}

	return TRUE ;
}

void Ciocp::GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite)
{
	*pulWrite = m_ulWriteBytes ;
	*pulRead = m_ulReadBytes ;
}

void Ciocp::Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize)
{
	WaitForSingleObject(pContext->m_hWriteComplete,INFINITE);
	PostWrite(pContext,lpBuffer,dwSize);
}

void Ciocp::ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
	if (dwTrans == 0)
	{
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);

		if (pContext->bConnect) // is connect 
		{

			PostConnect();

		}else
		{

		}
		ReleaseContext(pContext);
	}
	else
	{
		SetEvent(pContext->m_hWriteComplete);
	}

	ReleaseIoBuffer(pBuffer);
}

  

 IOCP - UDP

//NOTE
// 
//This code taken from Mr. Bob Quinn's article 
//titled 'Internet Multicasting'
//published in Dr. Dobb's Journal dated Oct 1997

//I have modified the original code to illustrate 
//the use I/O completion ports with UDP.

//If you have any comments email me : shapall@hotmail.com

#include "StdAfx.h"
#include <winsock2.h>
#include <ws2tcpip.h>
#include "Stdio.h"

#define BUFSIZE 1024 //max size of incoming data buffer
#define MAXADDRSTR 16

#define DEFAULT_GROUP_ADDRESS "239.254.1.2"
#define DEFAULT_PORT 7125 

LONG nCount = 0;
HANDLE g_hCompletionPort;
DWORD WINAPI WorkerThread( LPVOID WorkContext );

BOOL HandleIncomingData( UCHAR* pBuf);
BOOL CreateNetConnections( VOID );
BOOL CreateWorkers( UINT );
void InitWinsock2();
void UnInitWinsock2();

HANDLE g_hReadEvent;
SOCKET g_hSocket;
UCHAR achInBuf [BUFSIZE];
char achMCAddr[MAXADDRSTR] = DEFAULT_GROUP_ADDRESS;
u_short nPort = DEFAULT_PORT;


OVERLAPPED Overlapped;

//-----------------------------------------------------------------
void InitWinsock2()
{
    WSADATA data;
    WORD version; 
    int ret = 0;  

    version = (MAKEWORD(2, 2)); 
    ret = WSAStartup(version, &data); 
    if (ret != 0) 
    {  
        ret = WSAGetLastError(); 
        
        if (ret == WSANOTINITIALISED) 
        {  
            printf("not initialised"); 
        }
    }
}

//-----------------------------------------------------------------
void UnInitWinsock2()
{ 
    WSACleanup();
}

//-----------------------------------------------------------------
BOOL CreateNetConnections (void)
{ 
    DWORD nbytes; 
    BOOL b; 
    BOOL fFlag = TRUE; 
    int nRet=0; 
    
    SOCKADDR_IN stLclAddr;  
    struct ip_mreq stMreq; // Multicast interface structure  

    // Get a datagram socket  
    g_hSocket = socket(AF_INET, SOCK_DGRAM,0); 
    
    if (g_hSocket == INVALID_SOCKET)  
    { 
        printf ("socket() failed, Err: %d
", WSAGetLastError()); 
        return FALSE;  
    }  

    nRet = setsockopt(g_hSocket,SOL_SOCKET,
               SO_REUSEADDR, (char *)&fFlag, sizeof(fFlag));  
    if (nRet == SOCKET_ERROR)  
    { 
        printf ("setsockopt() SO_REUSEADDR failed, 
                          Err: %d
",WSAGetLastError()); 
    } 

    // Name the socket (assign the local port number to receive on)  
    stLclAddr.sin_family = AF_INET; 
    stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); 
    stLclAddr.sin_port = htons(nPort); 

    nRet = bind(g_hSocket,(struct sockaddr*) &stLclAddr,sizeof(stLclAddr)); 
    if (nRet == SOCKET_ERROR)  
    { 
        printf ("bind() port: %d failed, Err: %d
", 
                                nPort,WSAGetLastError()); 
    } 
    // Join the multicast group so we can receive from it  
    stMreq.imr_multiaddr.s_addr = inet_addr(achMCAddr); 
    stMreq.imr_interface.s_addr = INADDR_ANY; 
    nRet = setsockopt(g_hSocket,IPPROTO_IP, 
               IP_ADD_MEMBERSHIP,(char *)&stMreq,sizeof(stMreq)); 

    if (nRet == SOCKET_ERROR)  
    { 
        printf("setsockopt() IP_ADD_MEMBERSHIP address %s failed, 
                             Err: %d
",achMCAddr,
                             WSAGetLastError()); 
    }

    //
    //note the 10 says how many concurrent cpu bound threads to allow thru 
    //this should be tunable based on the requests. CPU bound requests will 
    // really really honor this. 
    // 

    g_hCompletionPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE,
                                                              NULL,0,3); 
    if (!g_hCompletionPort) 
    { 
        fprintf (stdout, "g_hCompletionPort Create Failed
"); 
        return FALSE; 
    } 
    //Associate this socket to this I/O completion port 
    CreateIoCompletionPort((HANDLE)g_hSocket,g_hCompletionPort,
                                             (DWORD)g_hSocket,3);  

    //
    // Start off an asynchronous read on the socket.  
    //  
    Overlapped.hEvent = g_hReadEvent;  
    Overlapped.Internal = 0;  
    Overlapped.InternalHigh = 0;  
    Overlapped.Offset = 0;  
    Overlapped.OffsetHigh = 0;  
    b = ReadFile ((HANDLE)g_hSocket,&achInBuf,
               sizeof(achInBuf),&nbytes,&Overlapped);  
    
    if (!b && GetLastError () != ERROR_IO_PENDING)  
    {  
        fprintf (stdout, "ReadFile Failed
");  
        return FALSE;  
    }  
    
    return TRUE;
}
//-----------------------------------------------------------------

BOOL CreateWorkers (UINT dwNumberOfWorkers)
{ 
    DWORD ThreadId; 
    HANDLE ThreadHandle; 
    DWORD i; 

    for (i = 0; i < dwNumberOfWorkers; i++) 
    { 
        ThreadHandle = CreateThread (NULL,0,
                  WorkerThread,NULL,0,&ThreadId); 
        if (!ThreadHandle) 
        { 
            fprintf (stdout, "Create Worker Thread Failed
"); 
            return FALSE;
        } 
               
        CloseHandle (ThreadHandle); 
    } 
    return TRUE;
}

//-----------------------------------------------------------------
DWORD WINAPI WorkerThread (LPVOID WorkContext)
{ 
    DWORD nSocket; 
    BOOL b; 
    OVERLAPPED ovl; 
    LPOVERLAPPED lpo=&ovl; 
    DWORD nBytesRead=0; 
    DWORD nBytesToBeRead; 
    UCHAR ReadBuffer[BUFSIZE]; 
    LPVOID lpMsgBuf; 

    memset(&ReadBuffer,0,BUFSIZE); 
    for (;;) 
    { 
        b = GetQueuedCompletionStatus(g_hCompletionPort,
                    &nBytesToBeRead,&nSocket,&lpo,INFINITE); 
        if (b || lpo) 
        { 
            if (b) 
            { 
                // 
                // Determine how long a response was desired by the client. 
                // 
                
                OVERLAPPED ol; 
                ol.hEvent = g_hReadEvent; 
                ol.Offset = 0; 
                ol.OffsetHigh = 0; 

                b = ReadFile ((HANDLE)nSocket,&ReadBuffer,
                                nBytesToBeRead,&nBytesRead,&ol); 
                if (!b )  
                { 
                    DWORD dwErrCode = GetLastError(); 
                    if( dwErrCode != ERROR_IO_PENDING ) 
                    { 
                        // something has gone wrong here... 
                        printf("Something has gone 
                               wrong:Error code - %d
",dwErrCode ); 

                        FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | 
                               FORMAT_MESSAGE_FROM_SYSTEM | 
                               FORMAT_MESSAGE_IGNORE_INSERTS, 
                               NULL, dwErrCode, 
                               MAKELANGID(LANG_NEUTRAL, 
                                    SUBLANG_DEFAULT),// Default language
                               (LPTSTR) &lpMsgBuf, 0, NULL); 

                        OutputDebugString((LPCTSTR)lpMsgBuf); 
                        //Free the buffer. 

                        LocalFree(lpMsgBuf ); 
                    } 
                    else if( dwErrCode == ERROR_IO_PENDING ) 
                    {
                        // I had to do this for my UDP sample 
                        //Never did for my TCP servers 
                        WaitForSingleObject(ol.hEvent,INFINITE); 

                        HandleIncomingData(ReadBuffer); 
                    } 
                } 
                else 
                { 
                    HandleIncomingData(ReadBuffer); 
                } 
                continue; 
            } 
            else 
            { 
                fprintf (stdout, "WorkThread Wait Failed
"); 
                //exit (1); 
            } 
        } 
        return 1; 
    } 
} 
//-----------------------------------------------------------------
BOOL HandleIncomingData( UCHAR* pBuf)
{ 
    InterlockedIncrement(&nCount); 
    SYSTEMTIME *lpstSysTime; 

    lpstSysTime = (SYSTEMTIME *)(pBuf); 
    printf("[%d]UTC Time %02d:%02d:%02d:%03d on %02d-%02d-%d 
",nCount, 
                lpstSysTime->wHour, lpstSysTime->wMinute, 
                lpstSysTime->wSecond, lpstSysTime->wMilliseconds, 
                lpstSysTime->wMonth, lpstSysTime->wDay, lpstSysTime->wYear); 
                memset(&pBuf,0,BUFSIZE); 
                //just making sure that i am not showing stale data 

    return TRUE; 
} 
//-----------------------------------------------------------------
main () 
{ 
    //You can modify your program to take some arguments for port number 
    //and multicast group address here 
    
    printf("
***************************************
"); 
    printf("Group IP address: %s
",achMCAddr); 
    printf("Port number : %d
",nPort); 
    printf("
***************************************
"); 

    //Initialize winsock 2 
    InitWinsock2(); 

    //We want to keep the main thread running 
    HANDLE hWait2Exit = CreateEvent(NULL,FALSE,TRUE,"MCLIENT"); 
    ResetEvent(hWait2Exit ); 

    //This OVERLAPPED event 
    g_hReadEvent = CreateEvent(NULL,TRUE,TRUE,NULL); 

    // 
    // try to get timing more accurate... Avoid context 
    // switch that could occur when threads are released 
    // 

    SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_TIME_CRITICAL); 
    if (!CreateNetConnections ()) 
    { 
        printf("Error condition @ CreateNetConnections , exiting
"); 
        return 1; 
    } 

    if (!CreateWorkers (5)) 
    { 
        printf("Error condition @CreateWorkers, exiting
"); 
        return 1; 
    } 
    
    WaitForSingleObject(hWait2Exit,INFINITE); 
    UnInitWinsock2(); 
    return 1; 
}

转 实现UDP IOCP心得-zt

 http://www.cnblogs.com/BeginGame/archive/2011/09/18/2180241.html

签名档: 从事网络安全和编程的我,很希望能找到志同道合的朋友交流。 欢迎cn博客的好友拍砖,留言。
原文地址:https://www.cnblogs.com/M4ster/p/my_socket_iocp.html