IOCP模型与网络编程

【服务端源代码】

#include "stdafx.h"

/*
iocp是windows上通讯模型,把socket的阻塞函数,如recv改成完成端口的来完成.
基本的思路,创建一个线程池来作为工作者线程,然后线程的处理函数是接收/转发数据.
server
*/

/*
所用到的函数:
HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle, // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
__in HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
__in ULONG_PTR CompletionKey, // 完成键,包含了指定I/O完成包的指定文件
__in DWORD NumberOfConcurrentThreads );// 真正并发同时执行最大线程数,一般推介是CPU核心数*2+2


BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort, //指定的IOCP
LPDWORD lpNumberOfBytes, //一次完成后的I/O操作所传送数据的字节数
PULONG_PTR lpCompletionKey, //当文件I/O操作完成后,用于存放与之关联的CK
LPOVERLAPPED *lpOverlapped, //为调用IOCP机制所引用的OVERLAPPED结构
DWORD dwMilliseconds); //用于指定调用者等待CP的时间

int WSAAPI WSARecv (
SOCKET s, //socket
LPWSABUF lpBuffers, //存放数据的数组的指针
DWORD dwBufferCount, //数组的数量
LPDWORD lpNumberOfBytesRecvd, //接收字节数的指针
LPINT lpFlags, //指向标志位的指针
LPWSAOVERLAPPED lpOverlapped, //指向重叠结构的指针
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);//调用例程的指针


*/

#define _CRT_SECURE_CPP_OVERLOAD_STANDARD_NAMES 1

#include <WinSock2.h>
#include <Windows.h>
#include <vector>
#include <iostream>

using namespace std;


#pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库
#pragma comment(lib, "Kernel32.lib") // IOCP需要用到的动态链接库

//字符串处理函数
int myStrLen(const char* str)
{
int i = 0;
while (*str != '')
{
i++;
str++;
}
return i;
}

//自定义全局变量
HANDLE CompletionPort = NULL;
HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
const int g_DataBuffSize = 2 * 1024; //服务器端口
typedef struct
{
OVERLAPPED overlapped;
WSABUF databuff;
char buffer[g_DataBuffSize];
int BufferLen;
int operationType;
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;
typedef struct //客户端结构
{
SOCKET socket;
SOCKADDR_STORAGE ClientAddr;
char *pszClientName;
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
const int g_DefaultPort = 6000;
vector < PER_HANDLE_DATA* > g_clientGroup; // 记录客户端的向量组
SOCKET serverSocket; //服务器的socket

//自定义函数声明
BOOL InitNetwork(WORD port); //初始化网络,参数port是服务器的端口
BOOL InitWorkThread(); //初始化工作者线程
DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID); //工作者线程处理函数
DWORD WINAPI ServerSendThread(LPVOID IpParam); //发送消息的函数
DWORD WINAPI AddClient(LPVOID IpParam); //向服务器添加客户端
BOOL DelClient(vector<PER_HANDLE_DATA*>&clientGroup, SOCKET& clientSocket);//向服务器减少客户端

int _tmain(int argc, _TCHAR* argv[])
{
if (!InitNetwork(g_DefaultPort))return -1;
if (!InitWorkThread())return -1;
HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);
HANDLE addThread = CreateThread(NULL, 0, AddClient, 0, 0, NULL);
WaitForSingleObject(sendThread, INFINITE);
WaitForSingleObject(addThread, INFINITE);
system("pause");

return 0;
}

BOOL InitNetwork(WORD port)
//初始化网络
{
//请求2.2的wsa版本的网络库
WORD wVersionRequested = MAKEWORD(2, 2);
WSADATA wsaData;
DWORD err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0)
{
cerr << "请求动态链接库失败 ";
return FALSE;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
{
WSACleanup();
cerr << "请求的版本不是2.2版本 ";
return FALSE;
}

//建立起服务器的socket
serverSocket = socket(AF_INET, SOCK_STREAM, 0); //流式套接字
SOCKADDR_IN serverAddr;
serverAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
int bindResult = bind(serverSocket, (SOCKADDR*)&serverAddr, sizeof(SOCKADDR));
if (SOCKET_ERROR == bindResult)
{
cerr << "服务器绑定失败:" << GetLastError() << endl;
return FALSE;
}

int listenResult = listen(serverSocket, 10); // 将SOCKET设置为监听模式
if (listenResult == SOCKET_ERROR)
{
cerr << "进入监听模式失败: " << GetLastError() << endl;
return FALSE;
}
cout << " IOCP服务器已准备就绪,正在等待客户端的接入中............ ";
return TRUE;
}

BOOL InitWorkThread()
//初始化工作者线程
{

// 创建IOCP的内核对象
HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (completionPort == NULL){ // 创建IO内核对象失败
cerr << "创建IO完成端口发生错误:" << GetLastError() << endl;
return FALSE;
}

// 创建IOCP线程--线程里面创建线程池,并将完成端口传递到该线程
SYSTEM_INFO mySysInfo;
GetSystemInfo(&mySysInfo); // 确定处理器的核心数量
for (DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2 + 2); ++i){// 基于处理器的核心数量创建线程
HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL);
if (ThreadHandle == NULL){
cerr << "创建线程失败" << GetLastError() << endl;
return FALSE;
}
CloseHandle(ThreadHandle);
}
return TRUE;
}

DWORD WINAPI ServerWorkThread(LPVOID IpParam)
//工作线程的处理函数
{
CompletionPort = (HANDLE)IpParam;
DWORD BytesTransferred;
LPOVERLAPPED IpOverlapped;
LPPER_HANDLE_DATA PerHandleData = NULL;
LPPER_IO_DATA PerIoData = NULL;
DWORD RecvBytes;
DWORD Flags = 0;
BOOL bRet = false;

while (true){
bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);//
//GetQueuedCompletionStatus参数(完成端口对象,传输完成的状态码,socket的信息,重叠结构信息,等待时间)
if (bRet == 0){
cerr << "获取完成端口状态发生失败: " << GetLastError() << endl;
DelClient(g_clientGroup, PerHandleData->socket);
continue;
}
PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);//有意思,先计算出地址的偏移量,然后根据特定的变量地址找到结构体地址,这样就相当于一个client取一个特定内存区域的址进行读取,当然多个client就有多个区域

// 检查在套接字上是否有错误发生
if (BytesTransferred == 0){
closesocket(PerHandleData->socket);
GlobalFree(PerHandleData);
GlobalFree(PerIoData);
continue;
}

// 开始数据处理,接收来自客户端的数据
WaitForSingleObject(hMutex, INFINITE);
SOCKET clientsockt = NULL;
cout << "A Client says: " << PerIoData->databuff.buf << endl;
ReleaseMutex(hMutex);

// 为下一个重叠调用建立单I/O操作数据
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存
PerIoData->databuff.len = 1024;
PerIoData->databuff.buf = PerIoData->buffer;
PerIoData->operationType = 0; // read
WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
}

return 0;
}

DWORD WINAPI ServerSendThread(LPVOID IpParam)
//发送信息的执行函数
{
while (1){
char talk[200];
gets_s(talk);
int len;
for (len = 0; talk[len] != ''; ++len);
talk[len] = ' ';
talk[++len] = '';
printf("I Say:");
cout << talk << g_clientGroup.size();

WaitForSingleObject(hMutex, INFINITE);
for (int i = 0; i < g_clientGroup.size(); ++i){
send(g_clientGroup[i]->socket, talk, 200, 0); // 发送信息
}
ReleaseMutex(hMutex);
}
return 0;
}

DWORD WINAPI AddClient(LPVOID IpParam)
//向服务器添加客户端
{
while (true){
PER_HANDLE_DATA * PerHandleData = NULL;
SOCKADDR_IN saRemote;
int RemoteLen;
SOCKET acceptSocket;

// 接收连接,并分配完成端,这儿可以用AcceptEx()
RemoteLen = sizeof(saRemote);
acceptSocket = accept(serverSocket, (SOCKADDR*)&saRemote, &RemoteLen);
if (SOCKET_ERROR == acceptSocket){ // 接收客户端失败
cerr << "接收客户端失败:" << GetLastError() << endl;
return -1;
}

// 创建用来和套接字关联的单句柄数据信息结构
PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA)); // 在堆中为这个PerHandleData申请指定大小的内存
PerHandleData->socket = acceptSocket;
memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);
g_clientGroup.push_back(PerHandleData); // 将单个客户端数据指针放到客户端组中

// 将接受套接字和完成端口关联(socket/完成端口/传递给处理函数的参数,多少个线程访问完成端口0是和处理器的数量相同)
CreateIoCompletionPort((HANDLE)(PerHandleData->socket), CompletionPort, (DWORD)PerHandleData, 0);

// 开始在接受套接字上处理I/O使用重叠I/O机制
// 在新建的套接字上投递一个或多个异步
// WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务
// 单I/O操作数据(I/O重叠)
LPPER_IO_OPERATION_DATA PerIoData = NULL;
PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
PerIoData->databuff.len = 1024;
PerIoData->databuff.buf = PerIoData->buffer;
PerIoData->operationType = 0; // read

DWORD RecvBytes;
DWORD Flags = 0;
WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
//参数:socket,buff,buff的数目,指向所接收字节数的指针,指向标志位的指针,重叠指针,结束调用例程指针
}
}

BOOL DelClient(vector<PER_HANDLE_DATA*>&clientGroup, SOCKET& clientSocket)
{
for (int i = 0; i < clientGroup.size(); i++)
{
if (clientGroup[i]->socket == clientSocket)
{
clientGroup.erase(clientGroup.begin() + i);
cout << "有客户端发生退出 ";
return TRUE;
}
}
return FALSE;
}

 【客户端源代码】

#include "stdafx.h"

#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <winsock2.h>
#include <Windows.h>

using namespace std;

#pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库

SOCKET sockClient; // 连接成功后的套接字
HANDLE bufferMutex; // 令其能互斥成功正常通信的信号量句柄
const int DefaultPort = 6000;

int _tmain(int argc, _TCHAR* argv[])
{
// 加载socket动态链接库(dll)
WORD wVersionRequested;
WSADATA wsaData; // 这结构是用于接收Wjndows Socket的结构信息的
wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) { // 返回值为零的时候是表示成功申请WSAStartup
return -1;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { // 检查版本号是否正确
WSACleanup();
return -1;
}

// 创建socket操作,建立流式套接字,返回套接字号sockClient
sockClient = socket(AF_INET, SOCK_STREAM, 0);
if (sockClient == INVALID_SOCKET) {
printf("Error at socket():%ld ", WSAGetLastError());
WSACleanup();
return -1;
}

// 将套接字sockClient与远程主机相连
// int connect( SOCKET s, const struct sockaddr* name, int namelen);
// 第一个参数:需要进行连接操作的套接字
// 第二个参数:设定所需要连接的地址信息
// 第三个参数:地址的长度
SOCKADDR_IN addrSrv;
addrSrv.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); // 本地回路地址是127.0.0.1;
addrSrv.sin_family = AF_INET;
addrSrv.sin_port = htons(DefaultPort);
while (SOCKET_ERROR == connect(sockClient, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR))){
// 如果还没连接上服务器则要求重连
cout << "服务器连接失败,是否重新连接?(Y/N):";
char choice;
while (cin >> choice && (!((choice != 'Y' && choice == 'N') || (choice == 'Y' && choice != 'N')))){
cout << "输入错误,请重新输入:";
cin.sync();
cin.clear();
}
if (choice == 'Y'){
continue;
}
else{
cout << "退出系统中...";
system("pause");
return 0;
}
}
cin.sync();
cout << " 客户端用户已准备完毕,可直接输入向服务器反馈消息. ";

send(sockClient, " Attention: A Client has enter... ", 200, 0);

bufferMutex = CreateSemaphore(NULL, 1, 1, NULL);

DWORD WINAPI SendMessageThread(LPVOID IpParameter);
DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter);

HANDLE sendThread = CreateThread(NULL, 0, SendMessageThread, NULL, 0, NULL);
HANDLE receiveThread = CreateThread(NULL, 0, ReceiveMessageThread, NULL, 0, NULL);


WaitForSingleObject(sendThread, INFINITE); // 等待线程结束
closesocket(sockClient);
CloseHandle(sendThread);
CloseHandle(receiveThread);
CloseHandle(bufferMutex);
WSACleanup(); // 终止对套接字库的使用

printf("End linking... ");
printf(" ");
system("pause");

return 0;
}

DWORD WINAPI SendMessageThread(LPVOID IpParameter)
{
while (1){
string talk;
getline(cin, talk);
WaitForSingleObject(bufferMutex, INFINITE); // P(资源未被占用)
if ("quit" == talk){
talk.push_back('');
send(sockClient, talk.c_str(), 200, 0);
break;
}
else{
talk.append(" ");
}
printf(" 客户端数据消息(输入quit)退出程序:");
cout << talk;
send(sockClient, talk.c_str(), 200, 0); // 发送信息
ReleaseSemaphore(bufferMutex, 1, NULL); // V(资源占用完毕)
}
return 0;
}


DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter)
{
while (1){
char recvBuf[300];
recv(sockClient, recvBuf, 200, 0);
WaitForSingleObject(bufferMutex, INFINITE); // P(资源未被占用)

printf("%s Says: %s", "Server", recvBuf); // 接收信息

ReleaseSemaphore(bufferMutex, 1, NULL); // V(资源占用完毕)
}
return 0;
}

【运行效果如下】

原文地址:https://www.cnblogs.com/chinasirius/p/12076756.html