跨平台网络编程

  程序员也三年了,工作以来,一直都在写游戏逻辑,刚好最近闲下来,难得有时间,自己研究一些新的东西。因此赶紧看看关于网络方面的东西,总结一下,好记心不如烂键盘。

  其实总的来说,网络也就那么回事,比我原本想像中要简单很多,只有简单的几个接口。

  一开始关于大数据,我还想复杂了,原本以为需要自己分包,再合包,而且我一开始也是这么做的,都已经做好了,再问别人才发现,这是没有必要的。(伤心!!) 

  网络就是一个数据流,发送的时候,可以分多次send,接收时,也可以多次recv , 因为有缓存机制。

  首先,需要做的只是给每条消息加上消息头,定义好包的大小及消息类型等其他业务相关的数据。

  第二步, 发送消息时,先加上包头,后面跟数据。

  第三步,接收消息时,先接收包头,再决定后面接收多大的数据。

以下直接贴代码:

客户端:

SocketClient.h

#ifndef __NET_SOCKET_CLIENT_H__
#define __NET_SOCKET_CLIENT_H__

#ifdef WIN32
#include <WinSock2.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
#include <unistd.h>

#ifndef typedef_socket_
#define typedef_socket_
typedef int SOCKET;
#endif
//typedef unsigned int SOCKET;
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endif

#include "ByteArray.h"
#include <thread>
#include <mutex>
#include <list>
#include <string>


#define  MsgPack_ptr std::shared_ptr<MsgPack>

class SocketClient {
private:
	SOCKET m_sock;
	std::shared_ptr<std::thread> m_thread;
	bool m_connected;
	std::mutex m_mutex;
	std::list<MsgPack*> m_requests;
	std::list<MsgPack_ptr> m_responseList;
	bool	m_running;
	std::string	m_callback;
	int32_t		m_recvTimeout;
public:
	SocketClient();
	~SocketClient();
	bool	_connect_timeout(std::string ip, unsigned short port, int timeout);
	bool	_connect_block(std::string ip, unsigned short port);
	bool	connect(std::string ip, unsigned short port, int timeout);
	int		send(const char* buf, int len);
	int		recv(char* buf, int len);
	int		close();
	void	startRecvThread();
	bool	isConnected() const { return m_connected; }
	void	setRecvTimeout(int32_t ms){ m_recvTimeout = ms; }

	bool sendRequest(MsgPack* reqest);
	MsgPack* recvMessage();

	std::list<MsgPack_ptr>&	getResponseList() { return m_responseList; }
	void lockMutex(); 
	void unlockMutex();

protected:
	void runRecv();
};


#endif

  

SocketClient.cpp

#include "SocketClient.h"
#include <iostream>
#include "bytearray.h"
#include "msg.h"
//#include "../Utils/Utils.h"
#include "cocos2d.h"
USING_NS_CC;
#define NETLOG CCLOG

#ifdef WIN32
#pragma comment(lib, "wsock32")
class SocketIniter {
public:
	SocketIniter() {
		WSADATA wsaData;
		WORD version = MAKEWORD(2, 0);
		int rt = WSAStartup(version, &wsaData);
		if (rt) {
			std::cerr << "Initilize winsock error !" << std::endl;
		}
	}

	~SocketIniter() {
		WSACleanup();
	}
};

static SocketIniter s_socketIniter;
#endif


SocketClient::SocketClient() {
	m_connected = false;
	m_running = true;
	m_recvTimeout = 5;
}

SocketClient::~SocketClient() {
	close();
	m_running = false;
}

int SocketClient::close() {
	m_connected = false;
	::shutdown(m_sock, 2);
#ifdef WIN32
	::closesocket(m_sock);
#else
	::close(m_sock);
#endif

	m_mutex.lock();
//	for (auto i : m_responseList)
//		delete i;
	m_responseList.clear();
	m_mutex.unlock();

	return 0;
}

bool SocketClient::_connect_timeout(std::string ip, unsigned short port, int timeout)
{
	if (m_connected) {
		return true;
	}

	unsigned  long  non_blocking = 1;
	unsigned  long  blocking = 0;
	bool ret = false;

	int tryTimes = timeout / 2;
	for (int i = 0; i<tryTimes; ++i)
	{
		close();

		m_sock = socket(AF_INET, SOCK_STREAM, 0);

		int rcv_size = 1024 * 1024;
		setsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size, sizeof(int));

		bool bKeppAlive = true;
		setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeppAlive, sizeof(bKeppAlive));

#ifdef WIN32
		if (-1 == ioctlsocket(m_sock, FIONBIO, (unsigned long*)&non_blocking))
			ret = false;
#else
		ioctl(m_sock, FIONBIO, &non_blocking);
#endif

		struct sockaddr_in sock_addr;
		sock_addr.sin_family = AF_INET;
		sock_addr.sin_addr.s_addr = inet_addr(ip.c_str());
		sock_addr.sin_port = htons(port);
		int rt = ::connect(m_sock, (struct sockaddr*)&sock_addr, sizeof(sock_addr));
		if (rt != 0) {
			struct timeval tv = { 2, 0 };
			fd_set writefds;
			FD_ZERO(&writefds);
			FD_SET(m_sock, &writefds);
			if (select(m_sock + 1, 0, &writefds, 0, &tv) > 0) {
#ifdef WIN32
				int error;
				int err_len = sizeof(error);
				getsockopt(m_sock, SOL_SOCKET, SO_ERROR, (char*)&error, &err_len);
#else
				int error;
				socklen_t err_len = sizeof(error);
				getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &error, &err_len);
#endif
				if (error == 0) {
					ret = true;
				}
				else {
					ret = false;
				}
			}
			else {
				ret = false;
			}
		}
		else {
			ret = true;
		}
		if (ret)
			break;
	}

	//if (!ret)
	//    return false;

#ifdef WIN32
	ioctlsocket(m_sock, FIONBIO, (unsigned long*)&blocking);
#else
	ioctl(m_sock, FIONBIO, &blocking);
#endif	

#if(CC_TARGET_PLATFORM==CC_PLATFORM_WIN32)
	DWORD tv2 = m_recvTimeout * 1000;
#else
	struct timeval tv2 = { m_recvTimeout, 0 };
#endif
	//接收不设置超时
	//setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv2, sizeof(tv2));

	m_connected = ret;
	m_requests.clear();

	if (ret)
		startRecvThread();

	MsgPack* msg = new MsgPack(this->isConnected() ? 1 : 2);//conn return
	if (msg) {
		NETLOG("connect msg: %d", msg->getMsgID());
		m_mutex.lock();
		m_responseList.push_back(MsgPack_ptr(msg));
		m_mutex.unlock();
	}
	/*
	cocos2d::Director::getInstance()->getScheduler()->performFunctionInCocosThread([&, this]{
	std::string sout;
	g_pLuaSystem->callLuaFunc(this->m_callback, this->isConnected()?1:0, sout);
	});
	*/

	//int ret = CCLuaEngine::defaultEngine()->getLuaStack()->executeFunctionByHandler(_nStartReconnectHandler, 0);

	return ret;

}

bool SocketClient::connect(std::string ip, unsigned short port, int timeout)
{
	auto t = std::thread(&SocketClient::_connect_timeout, this, ip, port, timeout);
	t.detach();
	return true;
}

bool SocketClient::_connect_block(std::string ip, unsigned short port) {
	//std::lock_guard<std::mutex> lock(m_mutex);
	if (m_connected) {
		return true;
	}
	close();// add close
	m_mutex.lock();
//	for (auto i : m_responseList)
//		delete i;
	m_responseList.clear();
	m_mutex.unlock();

	m_sock = socket(AF_INET, SOCK_STREAM, 0);
	{
		/*
		#if(CC_TARGET_PLATFORM==CC_PLATFORM_WIN32)
		DWORD tv = 10000;
		DWORD tv2 = 10000;
		#else
		struct timeval tv = {10,0};
		struct timeval tv2 = {10,0};
		#endif
		setsockopt(m_sock, SOL_SOCKET, SO_SNDTIMEO, (char*)&tv, sizeof(tv));
		setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv2, sizeof(tv2));
		*/

		bool bKeppAlive = true;
		setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeppAlive, sizeof(bKeppAlive));

		int rcv_size0;
		int optLen = sizeof(rcv_size0);
		//getsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size0, &optLen);
		//NETLOG("socket-recv-buffer= %d %d", rcv_size0, optLen);

		int rcv_size = 1024 * 1024;
		setsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size, sizeof(int));

		//getsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size0, &optLen);
		//NETLOG("socket-recv-buffer= %d %d", rcv_size0, optLen);

	}

	struct sockaddr_in svraddr;
	svraddr.sin_family = AF_INET;
	svraddr.sin_addr.s_addr = inet_addr(ip.c_str());
	svraddr.sin_port = htons(port);
	int rt = ::connect(m_sock, (struct sockaddr*)&svraddr, sizeof(svraddr));
	if (rt == SOCKET_ERROR) {
		return false;
	}
	m_requests.clear();
	m_connected = true;

	startRecvThread();

	return true;
}
/*
static const std::string key = "P%2BViyZLtO^gRT2";
bool encryptByAES(std::string in, std::string& out, bool isEncrypt) {
std::string iv;
iv.resize(AES_BLOCK_SIZE, 0);
AES_KEY aesKey;
int enc(AES_ENCRYPT);
if (isEncrypt) {
AES_set_encrypt_key((const unsigned char*)key.c_str(),
AES_BLOCK_SIZE * 8, &aesKey);
} else {
AES_set_decrypt_key((const unsigned char*)key.c_str(),
AES_BLOCK_SIZE * 8, &aesKey);
enc = AES_DECRYPT;
}
short inLen = in.length();
short block = (inLen + AES_BLOCK_SIZE - 1) / AES_BLOCK_SIZE;
in.resize(AES_BLOCK_SIZE * block);
out.resize(inLen);
short begin = 0;
try {
while (true) {
int len = inLen - begin >= AES_BLOCK_SIZE ?
AES_BLOCK_SIZE : inLen - begin;
AES_cbc_encrypt((unsigned char*)&inStr[begin], (unsigned char*)&outStr[begin], AES_BLOCK_SIZE, &aesKey,
(unsigned char*)&iv[0], enc);
begin += AES_BLOCK_SIZE;
if (begin >= inLen) {
break;
}
}
} catch (...) {
SYLAR_LOG_ERROR(g_logger) << "Bad aes buffer";
return false;
}
out.resize(inLen);
return true;
}*/

int SocketClient::send(const char* buf, int len){
	if (!m_connected) {
		return -1;
	}
	int bytes = 0;
	int count = 0;
	while (count < len) {
		bytes = ::send(m_sock, buf + count, len - count, 0);
		if (bytes == -1 || bytes == 0) {
			m_connected = false;
			NETLOG("socket send false");
			return -1;
		}
		count += bytes;
	}
	return count;
}

int SocketClient::recv(char* buf, int len) {
	if (!m_connected) {
		return -1;
	}
	return ::recv(m_sock, buf, len, MSG_WAITALL);
/*	
	int bytes = 0;
	int count = 0;
	while (count < len) {
		bytes = ::recv(m_sock, buf + count, len - count, MSG_WAITALL);
		if (bytes < 0) {
			m_connected = false;
			NETLOG("socket recv false");
			return count;
		}
		count += bytes;
	}
	return count;
	*/
	
}

void SocketClient::startRecvThread() {
	if (m_thread) {
		//CCLOG("Recv Thread is Running");
		return;
	}
	m_thread.reset(new std::thread(&SocketClient::runRecv, this));
	if (!m_thread->joinable())
		m_thread->join();
}

bool SocketClient::sendRequest(MsgPack* reqest)
{
	MsgPack* buf = reqest;
	int len = buf->getLen();

	return send(buf->getData(), len) > 0;
}

//char recbuff[4096];

MsgPack* SocketClient::recvMessage()
{
	MsgHeader header;

	if (recv((char *)&header, sizeof(MsgHeader)) <= 0)
		return 0;

	MsgPack* buff = new MsgPack(header);
	int lenLeft = recv(buff->getData(), header.nPkgSize);
	NETLOG("-----recv-end   = %d", lenLeft);
	if (lenLeft <= 0)
	{
		delete buff;
		return 0;
	}
	//msg read begin
	buff->setCur(0);
	return buff;

}

void SocketClient::runRecv() {
	while (m_running) {
#ifdef WIN32
		Sleep(1);
#else
		usleep(1);
#endif

		try {
			while (m_connected)
			{
				MsgPack* msg = recvMessage();
				if (msg) {
					NETLOG("is in runRecv msgid:%d msgLen:%d ", msg->getMsgID(), msg->getMax());
					MsgHeader &header = msg->getHeader();
					if (header.nPkgTotall > 1)
					{
						NETLOG("%d / %d", header.nPkgIndex, header.nPkgTotall);
					}
					m_mutex.lock();
					m_responseList.push_back(MsgPack_ptr(msg));
					m_mutex.unlock();
				}
			}
		}
		catch (std::bad_alloc& e) {
			NETLOG("recv-exception %s", e.what());
		}
		catch (...) {
			NETLOG("recv-exception unkown.");
		}
	}
	NETLOG("exit thread runRecv.");
}


void SocketClient::lockMutex()
{
	m_mutex.lock();
}
void SocketClient::unlockMutex()
{
	m_mutex.unlock();
}

  以下我的定义的消息头,当然其实,有的东西其实是没必要的

typedef struct
{
	int32_t nId; //消息ID
	int32_t nPkgTotall; //总包数
	int32_t nPkgIndex;  //当前包
	int32_t nPkgSize; //当前包内容大小
} MsgHeader;

MsgHeader* createMsgHeader(int32_t msgId, int32_t pkgTotal, int32_t pkgIndex, int32_t pkgSz);

  

服务端:

NetServer.h

#pragma once
//服务器

#define _MAX_HEAD 64				//最大消息头大小
#define _MAX_MSGSIZE 8 * 1024		// 暂定一个消息最大为16k
#define BLOCKSECONDS	3			// INIT函数阻塞时间
#define INBUFSIZE	(64*1024)		//?	具体尺寸根据剖面报告调整  接收数据的缓存
#define OUTBUFSIZE	(4*1024-64)		//? 具体尺寸根据剖面报告调整。 发送数据的缓存,当不超过8K时,FLUSH只需要SEND一次

//Server.cpp
#include <iostream>
#include <winsock2.h>

#pragma comment(lib, "ws2_32.lib")
DWORD WINAPI ClientThread(LPVOID lpParameter);
DWORD WINAPI server_main(PVOID pParam);
void CALLBACK startServer(HWND hWnd, UINT nMsg, UINT nTimerid, DWORD dwTime);

bool SendMsg(int msgId ,void* pBuf, int nSize, SOCKET &m_sockClient);

 NetServer.cpp

//Server.cpp
#include "stdafx.h"
#include "Server.h"
#include "WinServerDlg.h"
#include <fstream>
#include <vector>
#include "msg.h"
#include "Business.h"
using namespace std;

#define PORT 9001
#define IP_ADDRESS "127.0.0.1"

bool GetLocalIP(char* ip)
{
	//1.初始化wsa
	WSADATA wsaData;
	int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
	if (ret != 0)
	{
		return false;
	}
	//2.获取主机名
	char hostname[256];
	ret = gethostname(hostname, sizeof(hostname));
	if (ret == SOCKET_ERROR)
	{
		return false;
	}
	//3.获取主机ip
	HOSTENT* host = gethostbyname(hostname);
	if (host == NULL)
	{
		return false;
	}
	//4.转化为char*并拷贝返回
	strcpy(ip, inet_ntoa(*(in_addr*)*host->h_addr_list));
	return true;
}


DWORD WINAPI ClientThread(LPVOID lpParameter)
{
	SOCKET CientSocket = (SOCKET)lpParameter;
	int Ret = 0;
	char RecvBuffer[_MAX_MSGSIZE];
	int headLen = sizeof(MsgHeader);

	//包队列,出现多包时,合包用
	MsgBigCache* vecMsg[100];
	memset(vecMsg, 0, sizeof(vecMsg));

	while (true)
	{
		memset(RecvBuffer, 0x00, sizeof(RecvBuffer));
		Ret = recv(CientSocket, RecvBuffer, _MAX_MSGSIZE, 0);
		if (Ret == 0 || Ret == SOCKET_ERROR)
		{
			Log("客户端退出!");
			break;
		}

		MsgHeader *header = reinterpret_cast<MsgHeader*> (RecvBuffer);
		if (header->nPkgTotall > 1)
		{
			CString str;
			str.Format("此消息共%d ,当前%d", header->nPkgTotall, header->nPkgIndex);
			Log(str);
		}
//		Log("接收到客户信息为:%s", RecvBuffer+headLen);
		if (1 == header->nPkgTotall)
		{
			progressMsg(RecvBuffer + headLen, CientSocket, header);
		}
		else if (header->nPkgTotall > 1000 || header->nPkgTotall < 0)
		{
			return 0;
		}
		else
		{
			MsgBigCache *msg = new MsgBigCache();
			char *buff = new char[header->nPkgSize];
			memset(buff, 0, header->nPkgSize);
			memcpy(buff, RecvBuffer + headLen, header->nPkgSize);
			msg->header = *header;
			msg->buff = buff;
			vecMsg[header->nPkgIndex] = msg;
			
			bool allRec = true;
			for (int i = 1; i <= header->nPkgTotall; i++)
			{
				if (!vecMsg[i])
				{
					allRec = false;
					break;
				}
			}

			if (allRec)
			{
				ofstream f("1.png" , ios_base::binary);
				for (int i = 1; i <= header->nPkgTotall; i++)
				{
					f.write((char *)vecMsg[i]->buff, vecMsg[i]->header.nPkgSize);
					delete vecMsg[i]->buff;
					delete vecMsg[i];
				}
				memset(vecMsg,0,sizeof(vecMsg));
			}
		}
	}

	return 0;
}

DWORD WINAPI server_main(PVOID pParam)
{
	WSADATA  Ws;
	SOCKET ServerSocket, ClientSocket;
	struct sockaddr_in LocalAddr, ClientAddr;
	int Ret = 0;
	int AddrLen = 0;
	HANDLE hThread = NULL;
	
	char buffIp[128];
	char buffPort[20];
	ifstream f("config.ini");
	if (f.fail())
	{
		GetLocalIP(buffIp);
		strcpy(buffPort, "9001");
	}
	else
	{
		f.getline(buffIp, sizeof(buffIp));
		f.getline(buffPort, sizeof(buffPort));
	}
	int port = atoi(buffPort);
	//Init Windows Socket
	if (WSAStartup(MAKEWORD(2, 2), &Ws) != 0)
	{
		Log("Init Windows Socket Failed::%d" , GetLastError());
		return -1;
	}

	//Create Socket
	ServerSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (ServerSocket == INVALID_SOCKET)
	{
		Log ("Create Socket Failed::%d" ,GetLastError());
		return -1;
	}

	LocalAddr.sin_family = AF_INET;
	LocalAddr.sin_addr.s_addr = inet_addr(buffIp);
	LocalAddr.sin_port = htons(port);
	memset(LocalAddr.sin_zero, 0x00, 8);

	//Bind Socket
	Ret = ::bind(ServerSocket, (struct sockaddr*)&LocalAddr, sizeof(LocalAddr));
	if (Ret != 0)
	{
		Log("Bind Socket Failed::%d", GetLastError());
		return -1;
	}
	//listen
	Ret = listen(ServerSocket, 10);
	if (Ret != 0)
	{
		Log("listen Socket Failed::", GetLastError());
		return -1;
	}

	Log("服务端启动成功!");
	Log(CString("ip:") + buffIp);
	Log(CString("port:") + buffPort);

	while (true)
	{
		AddrLen = sizeof(ClientAddr);
		ClientSocket = accept(ServerSocket, (struct sockaddr*)&ClientAddr, &AddrLen);
		if (ClientSocket == INVALID_SOCKET)
		{
			Log ("Accept Failed::%d" , GetLastError());
			break;
		}
		CString str;
		str.Format("客户端连接:: %s : %d", inet_ntoa(ClientAddr.sin_addr), ClientAddr.sin_port);
		Log(str);
		sendConnSuc(ClientSocket);

		hThread = CreateThread(NULL, 0, ClientThread, (LPVOID)ClientSocket, 0, NULL);
		if (hThread == NULL)
		{
			Log("Create Thread Failed!");
			break;
		}

		CloseHandle(hThread);
	}

	closesocket(ServerSocket);
	closesocket(ClientSocket);
	WSACleanup();

	return 0;
}

bool g_start = false;


void CALLBACK startServer(HWND hWnd, UINT nMsg, UINT nTimerid, DWORD dwTime)
{
	if (g_start)
		return;
//	server_main(0);
	auto hThread = CreateThread(NULL, 0, server_main, 0, 0, NULL);
//	if (hThread == NULL)
//	{
//		Log("Create Thread Failed!");
//	}
//	else
		g_start = true;
	KillTimer(hWnd, nTimerid);
}

////////////////////////////////////////发消息给客户端

bool hasError()
{
#ifdef WIN32
	int err = WSAGetLastError();
	if (err != WSAEWOULDBLOCK) {
#else
	int err = errno;
	if (err != EINPROGRESS && err != EAGAIN) {
#endif
		return true;
	}

	return false;
}

char m_bufOutput[_MAX_MSGSIZE];
int m_nOutbufLen;
bool SendMsg(int msgid, void* pBuf, int nSize, SOCKET &m_sockClient)
{
	if (pBuf == 0 || nSize <= 0) {
		return false;
	}

	if (m_sockClient == INVALID_SOCKET) {
		return false;
	}
	char *contentBuff = (char *)pBuf;
	MsgHeader header;
	header.nId = msgid;
	header.nPkgIndex = 1;
	header.nPkgTotall = 1;// ceil(1.0 * nSize / OUTBUFSIZE);
	int headLen = sizeof(header);

	header.nPkgSize = nSize;
	m_nOutbufLen = nSize + headLen;
	CString str;
	str.Format("回复消息%d !大小 %d 个字节!", header.nId, header.nPkgSize);
	Log(str);
	
	// 发送一段数据
	int	outsize;
	outsize = send(m_sockClient, (char*)&header, headLen, 0);
	if (outsize <= 0)
	{
		if (hasError())
		{
			Log("发送失败,中断!");
			return false;
		}
	}
	outsize = send(m_sockClient, (char*)pBuf, nSize, 0);
	if (outsize <= 0)
	{
		if (hasError())
		{
			Log("发送失败,中断!");
			return false;
		}
	}

	return true;
}

  

Stay hungry, stay foolish!
原文地址:https://www.cnblogs.com/JhonKing/p/5621151.html