【网络编程】之十二、wsaeventselect+线程池 服务器实现

#include<WinSock2.h>
#include<iostream>
using namespace std;


#pragma comment(lib, "WS2_32.lib")

typedef struct _SOCKET_OBJ{
	SOCKET s;
	HANDLE event;
	sockaddr_in addrRemote;
	_SOCKET_OBJ *pNext;
}SOCKET_OBJ, *PSOCKET_OBJ;

typedef struct _THREAD_OBJ{
	HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
	int nSocketCount;
	PSOCKET_OBJ pSocketHeader;
	PSOCKET_OBJ pSocketTail;
	CRITICAL_SECTION cs;
	_THREAD_OBJ *pNext;
}THREAD_OBJ, *PTHREAD_OBJ;

PTHREAD_OBJ g_pThreadList;
CRITICAL_SECTION g_cs;

LONG g_nTatolConnections;
LONG g_nCurrentConnections;

DWORD WINAPI ServerThread(LPVOID lpParam);
//******************************************************************************//
PSOCKET_OBJ GetSocketObj(SOCKET s)
{
	PSOCKET_OBJ pSocket = (PSOCKET_OBJ)::GlobalAlloc(GPTR, sizeof(SOCKET_OBJ));
	if(pSocket != NULL)
	{
		pSocket->s = s;
		pSocket->event = ::WSACreateEvent();
	}

	return pSocket;
}

void FreeSocketObj(PSOCKET_OBJ pSocket)
{
	::CloseHandle(pSocket->event);
	if(pSocket->s != INVALID_SOCKET)
	{
		closesocket(pSocket->s);
	}
	::GlobalFree(pSocket);
}

//*************************************************************************//

PTHREAD_OBJ GetThreadObj()
{
	PTHREAD_OBJ pThread = (PTHREAD_OBJ)::GlobalAlloc(GPTR, sizeof(THREAD_OBJ));
	if(pThread != NULL)
	{
		::InitializeCriticalSection(&pThread->cs);

		pThread->events[0] = ::WSACreateEvent();

		::EnterCriticalSection(&g_cs);
		pThread->pNext = g_pThreadList;
		g_pThreadList = pThread;
		::LeaveCriticalSection(&g_cs);
	}

	return pThread;
}

void FreeThreadObj(PTHREAD_OBJ pThread)
{
	::EnterCriticalSection(&g_cs);
	PTHREAD_OBJ p = g_pThreadList;
	if(p == pThread)
	{
		g_pThreadList = p->pNext;
	}
	else
	{
		while(p != NULL && p->pNext != pThread)
		{
			p = p->pNext;
		}
		if(p != NULL)
		{
			p->pNext = pThread->pNext;
		}
	}
	::LeaveCriticalSection(&g_cs);

	::CloseHandle(pThread->events[0]);
	::DeleteCriticalSection(&pThread->cs);
	::GlobalFree(pThread);
}


void RebulidArray(PTHREAD_OBJ pThread)
{
	::EnterCriticalSection(&pThread->cs);
	PSOCKET_OBJ pSocket = pThread->pSocketHeader;
	int n = 1;
	while(pSocket != NULL)
	{
		pThread->events[n++] = pSocket->event;
		pSocket = pSocket->pNext;
	}

	::LeaveCriticalSection(&pThread->cs);
}

//********************************************************************//

BOOL insertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
	BOOL bRet = FALSE;
	::EnterCriticalSection(&pThread->cs);
	if(pThread->nSocketCount < WSA_MAXIMUM_WAIT_EVENTS - 1)
	{
		if(pThread->pSocketHeader == NULL)
		{
			pThread->pSocketHeader = pThread->pSocketTail = pSocket;
		}
		else
		{
			pThread->pSocketTail->pNext = pSocket;
			pThread->pSocketTail = pSocket;
		}
		pThread->nSocketCount++;
		bRet = TRUE;
	}

	::LeaveCriticalSection(&pThread->cs);

	if(bRet)
	{
		::InterlockedIncrement(&g_nTatolConnections);
		::InterlockedIncrement(&g_nCurrentConnections);
	}
	return bRet;
}

void AssignToFreeThread(PSOCKET_OBJ pSocket)
{
	pSocket->pNext = NULL;
	::EnterCriticalSection(&g_cs);
	PTHREAD_OBJ pThread = g_pThreadList;

	while(pThread != NULL)
	{
		if(insertSocketObj(pThread, pSocket))
			break;
		pThread = pThread->pNext;
	}


	if(pThread == NULL)
	{
		pThread = GetThreadObj();
		insertSocketObj(pThread, pSocket);
		::CreateThread(NULL, 0, ServerThread, pThread, 0, NULL);

	}

	::LeaveCriticalSection(&g_cs);

	::WSASetEvent(pThread->events[0]);
}


void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
	::EnterCriticalSection(&pThread->cs);

	PSOCKET_OBJ pTest = pThread->pSocketHeader;
	if(pTest == pSocket)
	{
		if(pThread->pSocketHeader == pThread->pSocketTail)
			pThread->pSocketTail = pThread->pSocketHeader = pTest->pNext;
		else
			pThread->pSocketHeader = pTest->pNext;
	}
	else
	{
		while(pTest != NULL && pTest->pNext != pSocket)
			pTest = pTest->pNext;
		if(pTest != NULL)
		{
			if(pThread->pSocketTail == pSocket)
				pThread->pSocketTail = pTest;
			pTest->pNext = pSocket->pNext;
		}
	}

	pThread->nSocketCount--;

	::LeaveCriticalSection(&pThread->cs);


	::WSASetEvent(pThread->events[0]);
	::InterlockedDecrement(&g_nCurrentConnections);
}

//********************************************************************//
PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex)
{
	PSOCKET_OBJ pSocket = pThread->pSocketHeader;
	while(--nIndex)
	{
		if(pSocket == NULL)
			return NULL;
		pSocket = pSocket->pNext;
	}
	return pSocket;
}

BOOL HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
	WSANETWORKEVENTS event;
	::WSAEnumNetworkEvents(pSocket->s, pSocket->event, &event);
	do{
		if(event.lNetworkEvents & FD_READ)
		{
			if(event.iErrorCode[FD_READ_BIT] == 0)
			{
				char szText[256];
				int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0);
				if(nRecv > 0)
				{
					szText[nRecv] = '';
					cout << "接收到数据:" << szText << endl;
				}
			}
			else
				break;
		}
		else
		{
			if(event.lNetworkEvents &FD_CLOSE)
			{
				break;
			}
			else
			{
				if(event.lNetworkEvents & FD_WRITE)
				{
					if(event.iErrorCode[FD_READ_BIT] == 0)
					{
						char szText[256];
						int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0);
						if(nRecv > 0)
						{
							szText[nRecv] = '';
							cout << "接收到数据:" << szText << endl;
						}
					}
					else
						break;
				}
			}
		}
		return TRUE;

	}while(FALSE);


	RemoveSocketObj(pThread, pSocket);
	FreeSocketObj(pSocket);
	return FALSE;
}


DWORD WINAPI ServerThread(LPVOID lpParam)
{
	PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;
	while(TRUE)
	{
		int nIndex = ::WSAWaitForMultipleEvents(pThread->nSocketCount + 1, pThread->events, FALSE, WSA_INFINITE, FALSE);
		nIndex = nIndex - WSA_WAIT_EVENT_0;

		for(int i = nIndex; i < pThread->nSocketCount + 1; ++i)
		{
			nIndex = ::WSAWaitForMultipleEvents(1, &pThread->events[i], TRUE, 1000, FALSE);
			if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)
			{
				continue;
			}
			else
			{
				if(i == 0)
				{
					RebulidArray(pThread);

					if(pThread->nSocketCount == 0)
					{
						FreeThreadObj(pThread);
						return 0;
					}
					::WSAResetEvent(pThread->events[0]);
				}
				else
				{
					PSOCKET_OBJ pSocket = (PSOCKET_OBJ)FindSocketObj(pThread, i);
					if(pSocket != NULL)
					{
						if(!HandleIO(pThread, pSocket))
							RebulidArray(pThread);
					}
					else
						cout << "Unable to find socket object" << endl;
				}
			}
		}
	}
	return 0;
}


//******************************************************************************//

int main(void)
{

	WSADATA wsaData;  
	WORD sockVersion = MAKEWORD(2,0);//指定版本号  
	::WSAStartup(sockVersion, &wsaData);//载入winsock的dll  
	//创建套接字基于TCP  
	SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);  
	if(sListen == INVALID_SOCKET)  
	{  
		printf("error");  
		::WSACleanup();//清理,释放资源  
		return 0;  
	}  

	sockaddr_in sin;  
	sin.sin_family = AF_INET;  
	sin.sin_port = htons(8888);//端口号8888  
	sin.sin_addr.S_un.S_addr = INADDR_ANY;//地址全是0,也就是所有的地址  
	//绑定socket  
	if(::bind(sListen, (LPSOCKADDR)&sin, sizeof(sin)) == SOCKET_ERROR)  
	{  
		printf("error");  
		::WSACleanup();//清理释放资源  
		return 0;  
	}  
	//监听socket  
	if(::listen(sListen, 2) == SOCKET_ERROR)  
	{  
		printf("error");  
		::WSACleanup();//释放资源  
		return 0;  
	}  

	WSAEVENT event = ::WSACreateEvent();
	::WSAEventSelect(sListen, event, FD_ACCEPT | FD_CLOSE);
	::InitializeCriticalSection(&g_cs);

	//处理请求
	while(TRUE)
	{
		int nRet = ::WaitForSingleObject(event, 5 * 1000);
		if(nRet == WAIT_FAILED)
		{
			cout << "failed waitforsingleobject" << endl;
			break;
		}
		else if(nRet == WSA_WAIT_TIMEOUT)
		{
			cout << endl;
			cout << " tatolconnections:" << g_nTatolConnections << endl;
			cout << " currentconnections: " << g_nCurrentConnections << endl;
			continue;
		}
		else
		{
			::ResetEvent(event);   //新连接

			while(TRUE)
			{
				sockaddr_in si;
				int nLen = sizeof(si);
				SOCKET sNew = ::accept(sListen, (sockaddr*)&si, &nLen);
				if(sNew == SOCKET_ERROR)
					break;
				PSOCKET_OBJ pSocket = GetSocketObj(sNew);
				pSocket->addrRemote = si;
				::WSAEventSelect(pSocket->s, pSocket->event, FD_READ | FD_CLOSE | FD_WRITE);
				AssignToFreeThread(pSocket);
			}
		}
	}

	::DeleteCriticalSection(&g_cs);
	::WSACleanup();
	return 0;
}

原文地址:https://www.cnblogs.com/java20130723/p/3211385.html