完毕port模型

完毕port模型过程例如以下:

1.调用CreateIoCompletionPort函数创建完毕port。

HANDLE CompletionPort=CreateIoCompletionStatus(INVALID_HANDLE_VALUE,NULL,0,0);

2.创建和处理器数目相等的工作线程

SYSTEM_INFO SysInfo;
GetSystemInfo(&SysInfo);
for(int i=0;i<SysInfo.)
for(int i=0;i<(sysInfo.dwNumberOfProcessors);i++)
{                                                                                                                                      HANDLE ThreadHandle=(HANDLE)_beginthreadex(NULL,0,CompletionPortProcessor,ComplPort,0,NULL);                                    CloseHandle(ThreadHandle);
}

3.接受客服端连接请求,创建单句柄数据,调用CreateIoCompletionPort将客服端套接字绑定到完毕port上。

单据句柄数据结构能够自定义字段:

struct PTR_HANDLE_DATA
{//字段能够任意定义
      SOCKET s;
      int i;
}

将套接字绑定到完毕port上:

CreateIoCompletionPort(sClient,CompletionPort,(DWORD)PerHandleData,0);

4.创建单I/O数据。并将单I/O数据作为參数传递给重叠I/O函数:WSARecv、WSASend.

创建单I/O数据,该字段除了第一个字段必须为重叠结构OVERLAPPED外。其它字段能够自定义:

<pre class="cpp" name="code">struct PER_IO_DATA
{
	OVERLAPPED Overlapped;
	WSABUF DataBuf;
	char Buffer[DATA_BUFFER];
	int OperationType;
};

调用重叠I/O函数:

WSARecv(PerHandleData->socket, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);

5.在工作线程中,调用GetQueuedCompletionStatus函数等待完毕port的完毕请求。

GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))

6.等待成功后,对请求处理。假设须要再次投递一个重叠I/O。


一个简单的样例例如以下

#include <WinSock2.h>
#include <stdio.h>
#include <string.h>
#include <ws2tcpip.h>
#include <process.h>
#pragma     comment(lib, "ws2_32.lib ")  //linking to the library                                                                    
#define DATA_BUFFER 4*1024
#define RECV_OPERATION	1
#define SEND_OPERATION  2

struct PTR_HANDLE_DATA
{
	SOCKET socket;
	int Location;
};
struct PER_IO_DATA
{
	OVERLAPPED Overlapped;
	WSABUF DataBuf;
	char Buffer[DATA_BUFFER];
	int OperationType;
};
unsigned int WINAPI CompletionPortProcessor(PVOID lParam)
{
    HANDLE CompletionPort = (HANDLE)lParam;
    DWORD BytesTransferred;
    PTR_HANDLE_DATA *PerHandleData;
    PER_IO_DATA *PerIoData;

    while(true)
    {

        if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
        {
            if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
            {
				closesocket(PerHandleData->socket);

                delete PerIoData;
                delete PerHandleData;
                continue;
            }
            return 0;
        }

        // 说明client已经退出
        if(BytesTransferred == 0)
        {
            closesocket(PerHandleData->socket);
            delete PerIoData;
            delete PerHandleData;
            continue;
        }
		if(PerIoData->OperationType==RECV_OPERATION)
		{
			printf("%d:%s
",PerHandleData->Location,PerIoData->DataBuf.buf);
			// 继续向 socket 投递WSARecv操作
			DWORD Flags = 0;
			DWORD dwRecv = 0;
			ZeroMemory(PerIoData, sizeof(PER_IO_DATA));
			PerIoData->DataBuf.buf = PerIoData->Buffer;
			PerIoData->DataBuf.len = DATA_BUFFER;
			PerIoData->OperationType=RECV_OPERATION;
			WSARecv(PerHandleData->socket, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL); 
		}
    }

    return 0;
}

void main()
{
	HANDLE ComplPort;
	ComplPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);

	SYSTEM_INFO sysInfo;
	memset(&sysInfo,0,sizeof(sysInfo));
	GetSystemInfo(&sysInfo);
	HANDLE *handleArray=(HANDLE *)malloc(sysInfo.dwNumberOfProcessors *sizeof(HANDLE));
	for(int i=0;i<(sysInfo.dwNumberOfProcessors);i++)
	{
		handleArray[i]=(HANDLE)_beginthreadex(NULL,0,CompletionPortProcessor,ComplPort,0,NULL);
	}
	WSADATA  wsaData;
	WSAStartup(MAKEWORD(2,2),&wsaData);
	
	
	SOCKET s,sClient;
	struct addrinfo hints,*result;
	int rc;
	memset(&hints,0,sizeof(hints));
	hints.ai_flags=AI_NUMERICHOST;//nodename is ip address;if set AI_PASSIVE ,it is computer name;if set AI_CANONNAME ,it is ..
	hints.ai_family=AF_UNSPEC;//IPv4 OR IPv6;if IPv4,Set AF_INET; if IPv6,Set AF_INET6
	hints.ai_socktype=SOCK_STREAM;//SOCK_DRGAM
	hints.ai_protocol=IPPROTO_TCP;
	rc=getaddrinfo("127.0.0.1","5001",&hints,&result);
	s=socket(result->ai_family,result->ai_socktype,result->ai_protocol);
	bind(s,result->ai_addr,result->ai_addrlen);
	listen(s,5);
	PER_IO_DATA *PerIoData;
	PTR_HANDLE_DATA *PerHandleData;
	int i=1;
    while(true)
    {
        sClient = accept(s, 0,0);
        PerHandleData = new PTR_HANDLE_DATA();
		PerHandleData->socket = sClient;
		PerHandleData->Location=i;
        CreateIoCompletionPort((HANDLE)PerHandleData->socket, ComplPort, (DWORD)PerHandleData, 0);

        PerIoData = new PER_IO_DATA();
        ZeroMemory(PerIoData, sizeof(PER_IO_DATA));
        PerIoData->DataBuf.buf = PerIoData->Buffer;
        PerIoData->DataBuf.len =DATA_BUFFER;
		PerIoData->OperationType=RECV_OPERATION;

        DWORD Flags = 0;
        DWORD dwRecv = 0;
        WSARecv(PerHandleData->socket, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
		i++;
    }

    DWORD dwByteTrans;
    PostQueuedCompletionStatus(ComplPort, dwByteTrans, 0, 0);
    closesocket(s);
}






原文地址:https://www.cnblogs.com/cynchanpin/p/6924427.html