一:服务器代码:
1.ServerDlg.cpp中启动或停止服务器
CIOCPServer *iocp; void CServerDlg::OnBnClickedOk2() { iocp->StartServer(10000, AfxGetMainWnd()); }
2. IOCPServer.h和IOCPServer.cpp
IOCPServer.h:
#pragma once #include<Mswsock.h> #include<windows.h> #define MAX_BUF_SIZE 4096 #define WM_SHOW_MSG WM_USER+100 // I/O类型 enum IO_TYPE { IO_TYPE_ACCEPT, IO_TYPE_READ, IO_TYPE_WRITE, IO_TYPE_UNKNOWN }; //扩展重叠的结构 class COverLappedEX { public: OVERLAPPED m_OLap; IO_TYPE m_IOType; char m_szBuf[MAX_BUF_SIZE]; COverLappedEX(IO_TYPE ioType) { ZeroMemory(&m_OLap, sizeof(OVERLAPPED)); m_IOType = ioType; ZeroMemory(m_szBuf, MAX_BUF_SIZE); } private: }; //单句柄数据, 包含监听Socket和接收的客户端信息 class CPerSocketData { public: SOCKET m_Socket; SOCKET m_AccSocket; CPerSocketData() { m_Socket = INVALID_SOCKET; m_AccSocket = INVALID_SOCKET; } ~CPerSocketData() {} private: }; class CIOCPServer { public: CIOCPServer(void); ~CIOCPServer(void); public: void SetLastErrorMsg(CString strErrorMsg); CString GetLastErrorMsg(); BOOL WinSockInit(); void AssociateWnd(CWnd *pWnd); BOOL StartServer(UINT uListenPort, CWnd *pWnd); //发送异步接收客户端连接请求 BOOL PostAccept(CPerSocketData *pSockData); //发送异步接收客户端数据请求 BOOL PostRecv(CPerSocketData *pSockData); BOOL PostSend(LPCTSTR lpszText, DWORD dwSizeInBytes); //线程池的线程函数 static DWORD WINAPI ThreadPoolProc(LPVOID lpParam); //关联某一个Socket到I/O完成端口队列 CPerSocketData *AssignSockToCompletionPort(SOCKET tSocket); void StopServer(); private: LONG m_ThreadNums; //线程池工作线程的个数 HWND m_pWnd; //窗口句柄 CString m_strErrorMsg; SOCKET m_ListenSocket; //监听Socket HANDLE m_hCompletionPort; //完成端口对象句柄 CList<CPerSocketData *, CPerSocketData *> m_ArrSocketData; CList<COverLappedEX *, COverLappedEX *> m_ArrOverLapEx; };
IOCPServer.cpp:
#include "stdafx.h" #include "IOCPServer.h" CIOCPServer::CIOCPServer() { m_ThreadNums = 0; //m_pWnd = NULL; m_ListenSocket = INVALID_SOCKET; } CIOCPServer::~CIOCPServer() { StopServer(); } void CIOCPServer::AssociateWnd(CWnd *pWnd) { m_pWnd = *pWnd; } void CIOCPServer::SetLastErrorMsg(CString strErrorMsg) { m_strErrorMsg = strErrorMsg; } CString CIOCPServer::GetLastErrorMsg() { return m_strErrorMsg; } BOOL CIOCPServer::WinSockInit() { WSADATA data = { 0 }; if (WSAStartup(MAKEWORD(2, 2), &data)) return FALSE; if (LOBYTE(data.wVersion) != 2 || HIBYTE(data.wVersion) != 2) { WSACleanup(); return FALSE; } return TRUE; } CPerSocketData * CIOCPServer::AssignSockToCompletionPort(SOCKET tSocket) { ASSERT(tSocket != INVALID_SOCKET); CPerSocketData *pSockData = new CPerSocketData(); pSockData->m_Socket = tSocket; m_ArrSocketData.AddTail(pSockData); CreateIoCompletionPort((HANDLE)tSocket, m_hCompletionPort, (ULONG_PTR)pSockData, 0); return pSockData; } BOOL CIOCPServer::PostAccept(CPerSocketData *pSockData) { DWORD dwBytesRecv = 0; ASSERT(pSockData != NULL); COverLappedEX *m_pOverLap = new COverLappedEX(IO_TYPE_ACCEPT); m_ArrOverLapEx.AddTail(m_pOverLap); pSockData->m_AccSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); BOOL bRet = AcceptEx(pSockData->m_Socket, pSockData->m_AccSocket, m_pOverLap->m_szBuf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesRecv, &m_pOverLap->m_OLap); if (!bRet) { if (WSAGetLastError() != WSA_IO_PENDING) { return FALSE; } } return TRUE; } BOOL CIOCPServer::PostRecv(CPerSocketData *pSockData) { ASSERT(pSockData != NULL); WSABUF wsaBuf = { 0 }; COverLappedEX *m_pOverLap = new COverLappedEX(IO_TYPE_READ); m_ArrOverLapEx.AddTail(m_pOverLap); wsaBuf.buf = m_pOverLap->m_szBuf; wsaBuf.len = MAX_BUF_SIZE; DWORD dwBytesRecv = 0, dwFlags = 0; int iRet = WSARecv(pSockData->m_Socket, &wsaBuf, 1, &dwBytesRecv, &dwFlags, &(m_pOverLap->m_OLap), NULL); if (!iRet) { if (WSAGetLastError() != WSA_IO_PENDING) { return FALSE; } } return TRUE; } BOOL CIOCPServer::PostSend(LPCTSTR lpszText, DWORD dwSizeInBytes) { DWORD dwSend = 0; CPerSocketData *pSocketData = NULL; COverLappedEX *pOverLapEx = NULL; POSITION pos = m_ArrSocketData.GetHeadPosition(); while (pos != NULL) { pSocketData = m_ArrSocketData.GetNext(pos); if (pSocketData->m_Socket != m_ListenSocket) { pOverLapEx = new COverLappedEX(IO_TYPE_WRITE); memcpy_s(pOverLapEx->m_szBuf, MAX_BUF_SIZE, lpszText, dwSizeInBytes); WSABUF wsaBuf = { 0 }; wsaBuf.buf = pOverLapEx->m_szBuf; wsaBuf.len = dwSizeInBytes; m_ArrOverLapEx.AddTail(pOverLapEx); WSASend(pSocketData->m_Socket, &wsaBuf, 1, &dwSend, 0, &(pOverLapEx->m_OLap), NULL); } } return TRUE; } DWORD WINAPI CIOCPServer::ThreadPoolProc(LPVOID lpParam) { COverLappedEX *pOverLaps = NULL; CPerSocketData *pPerSockData = NULL; CIOCPServer *pThis = (CIOCPServer *)lpParam; ASSERT(pThis != NULL); //工作线程+1 InterlockedIncrement(&pThis->m_ThreadNums); BOOL bIORet = FALSE; DWORD dwTrans = 0; while (TRUE) { bIORet = GetQueuedCompletionStatus(pThis->m_hCompletionPort, &dwTrans, (PULONG_PTR)&pPerSockData, (LPOVERLAPPED *)&pOverLaps, INFINITE); //客户端退出 if (dwTrans == 0 && (pOverLaps->m_IOType == IO_TYPE_READ || pOverLaps->m_IOType == IO_TYPE_WRITE)) { closesocket(pPerSockData->m_Socket); pThis->m_ArrSocketData.RemoveAt(pThis->m_ArrSocketData.Find(pPerSockData)); delete pPerSockData; pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps)); delete pOverLaps; continue; } if (bIORet&&pOverLaps&&pPerSockData) { switch (pOverLaps->m_IOType) { case IO_TYPE_READ: { //pThis->m_pWnd->SendMessage(WM_SHOW_MSG, 0, (LPARAM)pOverLaps->m_szBuf); //CString* msg = new CString(pOverLaps->m_szBuf); CString str((char*)pOverLaps->m_szBuf); ::SendMessage(pThis->m_pWnd, WM_SHOW_MSG, (WPARAM)pOverLaps->m_szBuf, (LPARAM)pOverLaps->m_szBuf); pThis->PostRecv(pPerSockData); pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps)); delete pOverLaps; break; } case IO_TYPE_WRITE: { pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps)); delete pOverLaps; break; } case IO_TYPE_ACCEPT: { if (pPerSockData->m_AccSocket == INVALID_SOCKET) continue; //新接收的客户端与完成端口关联到一起 CPerSocketData *pData = pThis->AssignSockToCompletionPort(pPerSockData->m_AccSocket); //新来一个客户端,激活一个工作线程,最多激活与电脑CPU个数一样多 QueueUserWorkItem(ThreadPoolProc, pThis, WT_EXECUTELONGFUNCTION); pThis->PostRecv(pData); pThis->PostAccept(pPerSockData); pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps)); delete pOverLaps; break; } default: break; } } else if (!pOverLaps&&!pPerSockData) { //exit the thread 服务器要关闭 break; } } InterlockedDecrement(&pThis->m_ThreadNums); return TRUE; } BOOL CIOCPServer::StartServer(UINT uListenPort, CWnd *pWnd) { m_pWnd = *pWnd; if (!WinSockInit()) { SetLastErrorMsg(TEXT("Socket库初始化失败!")); return FALSE; } m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (m_ListenSocket == INVALID_SOCKET) { SetLastErrorMsg(TEXT("新建Socket失败!")); return FALSE; } sockaddr_in service; service.sin_family = AF_INET; service.sin_addr.s_addr = INADDR_ANY; service.sin_port = htons(uListenPort); if (bind(m_ListenSocket, (sockaddr *)&service, sizeof(sockaddr_in)) == SOCKET_ERROR) { SetLastErrorMsg(TEXT("绑定端口失败!")); goto _Error_End; } if (listen(m_ListenSocket, SOMAXCONN) == SOCKET_ERROR) { SetLastErrorMsg(TEXT("监听失败!")); goto _Error_End; } m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (m_hCompletionPort == NULL) { SetLastErrorMsg(TEXT("完成端口创建失败!")); goto _Error_End; } QueueUserWorkItem(ThreadPoolProc, this, WT_EXECUTELONGFUNCTION); CPerSocketData *pSockData = AssignSockToCompletionPort(m_ListenSocket); PostAccept(pSockData); goto _Error_End; _Error_End: /*if (m_ListenSocket != NULL) closesocket(m_ListenSocket); if (m_hCompletionPort != NULL) CloseHandle(m_hCompletionPort); WSACleanup();*/ return TRUE; } void CIOCPServer::StopServer() { CPerSocketData *pSocketData = NULL; POSITION pos = m_ArrSocketData.GetHeadPosition(); while (pos != NULL) { pSocketData = m_ArrSocketData.GetNext(pos); closesocket(pSocketData->m_Socket); delete pSocketData; } m_ArrSocketData.RemoveAll(); COverLappedEX *pOverLap = NULL; pos = m_ArrOverLapEx.GetHeadPosition(); while (pos != NULL) { pOverLap = m_ArrOverLapEx.GetNext(pos); delete pOverLap; } m_ArrSocketData.RemoveAll(); while (m_ThreadNums>0) { PostQueuedCompletionStatus(m_hCompletionPort, 0, 0, NULL); Sleep(100); } if (m_hCompletionPort != NULL) CloseHandle(m_hCompletionPort); WSACleanup(); }