IOCP五:UDP线程退出

我也不多讲么东西了,你个狗一样的人也不用多讲么。


UDP的基本步骤:

        1.创建socket s,绑定到本地地址和端口。

        2.创建IOCP,为s创建PER_HANDLE_DATA,将s关联到IOCP。

        3.在s上投递多个WSARecvFrom,当有数据到来时,WSARecvFrom以投递顺序决定谁接收数据。


异步接收:WSARecvFrom

	DWORD dwRecv = 0;
	DWORD dwFlags = 0;
	int len = sizeof(sockaddr);
	int ret = WSARecvFrom(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->clientAddr, &len, &ppiod->overlapped, NULL);
	if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
	{
		delete ppiod;
		return false;
	}
WSARecvFrom与WSARecv基本相同,仅增加了两个参数:

六参:用来存储数据发送者的地址信息,(IP和端口)

七参:万年不变sizeof(sockaddr)


PER_IO_DATA也相应增加了一个成员

typedef struct PER_IO_DATA
{
	OVERLAPPED overlapped;
	int no;
	char buf[BUF_LEN]; 
	int operationType;
	sockaddr clientAddr;
}PER_IO_DATA, *LPPER_IO_DATA;
WSARecvFrom返回时buf中承载着接收到的数据、clientAddr中承载着这些数据发送者的地址信息。


如何从clientAddr中获取到发送者的IP和Port?

sockaddr和sockaddr_in可以相互转换,但要想提取地址信息,只能从sockaddr_in。所以可以这样

			sockaddr_in sin;
			memcpy(&sin, &ppiod->clientAddr, sizeof(sin));
			char *ip = inet_ntoa(sin.sin_addr);
			int port = sin.sin_port;
ip指针指向的字符串不能长保,得到后需尽快拷贝一份。

异步发送:WSASendTo

	DWORD dwRecv = 0;
	DWORD dwFlags = 0;
	int ret = WSASendTo(s, &databuf, 1, &dwRecv, dwFlags, clientAddr, sizeof(sockaddr), &ppiod->overlapped, NULL);
	if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
	{
		delete ppiod;
		return false;
	}
变化也是多了两个参数,

六参:此次发送的目标地址

七参:万年不变的sizeof(sockaddr)


/*********************************************************************************************************/


实验过程:

        1.线程A在socket s上投递WSARecvFrom编号1

        2.Client发送”woyougexiaozhinv,kewoburenshita“

        3.1号WSARecvFrom接收数据

        4.创建线程B,由B投递2号WSARecvFrom

        5.线程A投递3-10号WSARecvFrom

        6.线程B每隔2秒发送"nihaihaoma",发送两次后线程退出

        7.Client接收两次"nihaihaoma",然后发送"woyougexiaozhinv,kewoburenshita"


实验结果:

        线程B退出,由B投递的2号WSARecvFrom马上以ret=false,dwNum=0,GetLastError()=995返回 (995:由于线程退出或应用程序请求,已放弃 I/O 操作)。

        Client最后一次发送的数据由后面的3号WSARecvFrom接收。

实验结论:

        一个线程退出,由该线程投递的所有异步请求马上以ret=fasle, dwNum=0, GetLastError()=995返回。


实验结果图:



实验代码:

#include <WinSock2.h>
#include <Windows.h>
#include <iostream>
#include <process.h>
#include <string>
#include <MSWSock.h>
#include <set>

#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Kernel32.lib")
#pragma comment(lib, "Mswsock.lib")

#define BUF_LEN 1024

bool flag = true;

enum OperateType
{
	OP_RECV,
	OP_SEND,
};

typedef struct PER_HANDLE_DATA
{
	SOCKET s;
	SOCKADDR_IN addr;
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

typedef struct PER_IO_DATA
{
	OVERLAPPED overlapped;
	int no;
	char buf[BUF_LEN]; 
	int operationType;
	sockaddr clientAddr;
}PER_IO_DATA, *LPPER_IO_DATA;

typedef struct Arg
{
	SOCKET s;
	sockaddr addr;
}Arg;

SOCKET SocketInitBind()
{
	SOCKET s = socket(AF_INET, SOCK_DGRAM, 0);
	if(INVALID_SOCKET == s)
	{
		std::cout<<"create socket failed : "<<GetLastError()<<std::endl;
		return INVALID_SOCKET;
	}

	SOCKADDR_IN	addr;
	addr.sin_family = AF_INET;
	addr.sin_addr.S_un.S_addr = INADDR_ANY;
	addr.sin_port = htons(4444);

	int ret = bind(s, (sockaddr*)&addr, sizeof(addr));
	if(SOCKET_ERROR == ret)
	{
		std::cout<<"bind failed : "<<GetLastError()<<std::endl;
		return SOCKET_ERROR;
	}

	return s;
}

bool PostSendTo(SOCKET s, const sockaddr *clientAddr, const char *buf, int len)
{
	LPPER_IO_DATA ppiod = new PER_IO_DATA;
	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
	ppiod->operationType = OP_SEND;
	ppiod->clientAddr = *clientAddr;
	ppiod->no = 128;
	memset(ppiod->buf, 0, BUF_LEN);
	memcpy(ppiod->buf, buf, len);

	WSABUF databuf;
	databuf.buf = ppiod->buf;
	databuf.len = len;

	DWORD dwRecv = 0;
	DWORD dwFlags = 0;
	int ret = WSASendTo(s, &databuf, 1, &dwRecv, dwFlags, clientAddr, sizeof(sockaddr), &ppiod->overlapped, NULL);
	if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
		return false;

	return true;
}

bool PostRecvFrom(SOCKET s, int n)
{
	LPPER_IO_DATA ppiod = new PER_IO_DATA;
	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
	ppiod->operationType = OP_RECV;
	ppiod->no = n;
	memset(ppiod->buf, 0, BUF_LEN);

	WSABUF databuf;
	databuf.buf = ppiod->buf;
	databuf.len = BUF_LEN;

	DWORD dwRecv = 0;
	DWORD dwFlags = 0;
	int len = sizeof(sockaddr);
	int ret = WSARecvFrom(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->clientAddr, &len, &ppiod->overlapped, NULL);
	if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
		return false;

	return true;
}

//Client中收发顺序:发-收-收-发
//这个子线程证明了线程A在socket 1上投递异步接收请求后等待数据到来,线程B在socket 1上投递异步发送请求,两厢互不干扰。
unsigned int __stdcall Func(void *param)
{
	Arg *arg = (Arg*)param;
	SOCKET s = arg->s;
	sockaddr addr = arg->addr;

	PostRecvFrom(s, 2);

	for(int i = 0; i < 2; i++)
	{
		Sleep(2000);
		std::string str = "nihaihaoma";
		PostSendTo(s, &addr, str.c_str(), str.length());
	}

	_endthreadex(0);
	return 0;
}

unsigned int __stdcall ThreadFunc(void *arg)
{
	HANDLE hcp = (HANDLE)arg;
	if(NULL == hcp)
	{
		std::cout<<"thread arg error"<<std::endl;
		return -1;
	}

	DWORD dwNum = 0;
	LPPER_HANDLE_DATA pphd;
	LPPER_IO_DATA ppiod;
	while(true)
	{
		bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);

		//线程退出控制,没有释放申请的堆空间,还不完善
		if(-1 == dwNum)
		{
			std::cout<<"Thread Exit"<<std::endl;
			_endthreadex(0);
			return 0;
		}

		//错误发生
		if(false == ret || 0 == dwNum)
		{
			std::cout<<"An Error Occurs : "<<GetLastError()<<" no:"<<ppiod->no<<std::endl;
			std::cout<<"ret="<<ret<<" dwNum="<<dwNum<<std::endl;
			delete(ppiod);
			continue;
		}

		int type = ppiod->operationType;
		if(OP_RECV == type)
		{
			//
			std::cout<<"接收完成"<<std::endl;
			//
			sockaddr_in sin;
			memcpy(&sin, &ppiod->clientAddr, sizeof(sin));
			std::cout<<"From:"<<inet_ntoa(sin.sin_addr)<<" Port:"<<sin.sin_port<<std::endl;
			ppiod->buf[dwNum] = '';
			std::cout<<"Receiver : "<<ppiod->no<<"  "<<ppiod->buf<<std::endl;

			//只允许进入一次
			if(flag)
			{
				flag = false;

				Arg arg;
				arg.s = pphd->s;
				arg.addr = ppiod->clientAddr;
				//测试在s等待接收时,另一线程在s上投递发送会怎样
				_beginthreadex(NULL, 0, Func, (void*)&arg, 0, NULL);
				//等待子线程成功读取参数
				Sleep(500);

				//投递3-10号接收请求
				for(int i = 3; i < 11; i++)
				{
					bool ret = PostRecvFrom(pphd->s, i);
					if(false == ret)
					{
						std::cout<<"PostAccept Failed"<<std::endl;
						return -1;
					}
				}
			}

			delete ppiod;

			/*ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
			ZeroMemory(ppiod->buf, BUF_LEN);
			ppiod->no = 14;
			WSABUF databuf;
			databuf.buf = ppiod->buf;
			databuf.len = BUF_LEN;

			DWORD dwRecv = 0;
			DWORD dwFlags = 0;
			int len = sizeof(sockaddr);
			WSARecvFrom(pphd->s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->clientAddr, &len, &ppiod->overlapped, NULL);*/
		}
		else if(OP_SEND == type)
		{
			//
			std::cout<<"发送完成"<<std::endl;
			//
			sockaddr_in sin;
			memcpy(&sin, &ppiod->clientAddr, sizeof(sin));
			std::cout<<"To:"<<inet_ntoa(sin.sin_addr)<<" Port:"<<sin.sin_port<<std::endl;

			delete ppiod;
		}
	}

	return 0;
}

int main()
{
	WSADATA ws;
	if(WSAStartup(MAKEWORD(2, 2), &ws) != 0)
	{
		std::cout<<"WSAStartup error : "<<GetLastError()<<std::endl;
		return -1;
	}

	HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
	if(NULL == hcp)
	{
		std::cout<<"create completion port failed : "<<GetLastError()<<std::endl;
		return -1;
	}

	std::set<HANDLE> setWorkers;
	SYSTEM_INFO si;
	GetSystemInfo(&si);
	for(int i = 0; i < si.dwNumberOfProcessors * 2 + 2; i++)
	{
		HANDLE worker = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, (LPVOID)hcp, 0, NULL);
		if(NULL == worker)
		{
			std::cout<<"create thread failed : "<<GetLastError()<<std::endl;
			return -1;
		}
		setWorkers.insert(worker);
	}

	SOCKET s = SocketInitBind();
	if(INVALID_SOCKET == s)
	{
		std::cout<<"socket init failed"<<std::endl;
		return -1;
	}

	LPPER_HANDLE_DATA pphd = new PER_HANDLE_DATA;
	pphd->s = s;
	CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0);

	//投递编号1接收请求
	bool ret = PostRecvFrom(s, 1);
	if(false == ret)
	{
		std::cout<<"PostAccept Failed"<<std::endl;
		return -1;
	}

	//退出控制
	/*std::cin.get();
	for(int i = 0; i < setWorkers.size(); i++)
		PostQueuedCompletionStatus(hcp, -1, NULL, NULL);*/

	auto iter = setWorkers.begin();
	for(; iter != setWorkers.end(); iter++)
		WaitForSingleObject(*iter, INFINITE);

	WSACleanup();

	std::cin.get();
	return 0;
}

客户端代码:

#include <stdio.h> 
#include <Winsock2.h> 
#include <iostream>

#pragma comment(lib, "Ws2_32.lib")

int main() 
{ 
	WSADATA ws;

	if(WSAStartup(MAKEWORD(2, 2), &ws) != 0)
		return -1;

	sockaddr_in addr; 
	int len = sizeof(addr); 
	addr.sin_family = AF_INET; 
	addr.sin_port = htons(4444);
	addr.sin_addr.s_addr = inet_addr("192.168.15.14"); 
	SOCKET s = socket(AF_INET, SOCK_DGRAM, 0); 
	
	std::string str = "woyougexiaozhinv,kewoburenshita";
	int ret = sendto(s, str.c_str(), str.length(), 0, (sockaddr*)&addr, len);
	if (SOCKET_ERROR == ret) 
	{
		std::cout<<"Data Send Fail"<<std::endl;
		return -1;
	} 

	char buf[100] = {0};
	ret = recvfrom(s, buf, 100, 0, (sockaddr*)&addr, &len);
	if(SOCKET_ERROR == ret)
	{
		std::cout<<"Data Recv Fail"<<std::endl;
		return -1;
	}
	std::cout<<buf<<std::endl;

	{
		char buf[100] = {0};
		ret = recvfrom(s, buf, 100, 0, (sockaddr*)&addr, &len);
		if(SOCKET_ERROR == ret)
		{
			std::cout<<"Data Recv Fail"<<std::endl;
			return -1;
		}
		std::cout<<buf<<std::endl;

		std::cout<<"Send?"<<std::endl;
		std::cin.get();

		std::string str = "woyougexiaozhinv,kewoburenshita";
		int ret = sendto(s, str.c_str(), str.length(), 0, (sockaddr*)&addr, len);
		if (SOCKET_ERROR == ret) 
		{
			std::cout<<"Data Send Fail"<<std::endl;
			return -1;
		} 
	}

	std::cin.get();
	closesocket(s);
	WSACleanup();   
	return 0;
}


原文地址:https://www.cnblogs.com/chaikefusibushiji/p/6775787.html