简单谈谈消息发送缓冲区

在服务器的网络层中,发送缓冲区是一个不可绕过的课题

目前我遇到了主要有两种处理方式

方式一:队列处理

1、在逻辑线程里面有一个总的发送队列,然后服务器每帧都会处理这个队列

2、每一个cLink里面有一个消息队列,当cLink的状态变化时候会去处理这个消息队列

3、在每一帧处理的时候

              

 1     switch(pLink->m_eStat)
 2         {
 3         case EF_IO_EAGAIN:
 4             pLink->m_send_q.push(msg);
 5             continue;
 6             break;
 7         case EF_IO_NORMAL:
 8         {
 9             switch(send_message_process(&msg))
10             {
11                 case EF_EVR_TX_EAGAIN:
12                 {
13                     pLink->m_send_q.push(msg);
14                     pLink->m_eStat = EF_IO_EAGAIN;
15 
16                     uint32 type = EF_EVT_READ | EF_EVT_WRITE;
17                     ModIoHandle(pLink->_fd, pLink->_sn, type);
18 
19                 //    cout << "mod_io_handler " << pLink->_fd << ":" << pLink->_sn << " " << type << endl;
20                 }
21                     break;

    如果该链接状态可以发送则直接发送出去,否则该消息从总的消息队列转移到了cLink的消息队列,

4、在epoll中,当cLink的网络状态转换为可发送时候,再针对每一个cLink的发送队列进行处理,发送消息

优点:

  处理方式简单,处理流程清晰明了

缺点:

  因为每一个队列里面都是一个消息包,发送的时候,每一个包发送一次,调用一次send(),可能会造成太多小包的发送

方式二:总的缓冲区处理

         1、不使用队列,而是维护自定义的一个消息缓冲区 cSendMsg

1、每一次都把消息先放进消息缓冲区,自己缓冲内存,缓冲大小可扩展(有最大值限制)

2、每一次发送的时候把缓冲区的内容取出,整块内存发送

缺点:

     维护这个缓冲区需要自己处理,容易出错

优点:

     可以避免多次发送小包,可以多个消息包作为一个打包一起发送,提高IO效率

static const uint32 DEFAUTL_BUFFER = 1024 * 40; //10k

class CEncrypt;
static const uint32  c_wMaxSendMsg = 5120000;
class cSendMsg
{
public:
	~cSendMsg();
	cSendMsg(uint32 sz = DEFAUTL_BUFFER);
	bool AddMsg(const void* pMsg, uint32 nSize);
	bool AddQueueMsg(const char* pBuf, uint32 nSize);
	bool Init(uint32 sz);
	bool Release();
	bool AddBuffer(const void* pMsg, uint32 sz);
	bool AddQueueBuffer(const char* pBuf, uint32 sz);
	bool AddQueueBufferFront(const char* pBuf, uint32 sz);
	bool AddQueueMsgFront(const char* pBuf, uint32 nSize);
	//bool AddQueueBufferA(const char* pBuf, uint32 nSize);
	bool DoubleBuf(uint32 dwSize = 0);
	void ResetBuf();
	void ResetBufLen(uint32 nLen, uint32 off);
	const char* GetGroupBuffer(uint32& nLen, uint32& off) const;
	const char* GetQueueBuffer(uint32& nLen, uint32& off) const;
	uint32& GetLength(){return *m_pLength;}
	bool IsEmpty(){
		if (!m_pBuffer)
			return true;
		return GetLength() == 0;
	}
	void ResetBufLen(uint32 nLen);

	const char* GetBuffer(uint32& len) const;

	//////////////////////////////////////////////////////////////////////////
	//加密 、 解密
	void Encode(const char* buffer, uint32 size);
	void Decode(const char* buffer, uint32 size);
	bool SetEncode(int32 id, const char* KeyStr) ;
	void GenerateRandEncodeID();
	void GenerateRandKey();
	void InitEncode();
	void GetNewEncode(int32& EncodeID, char* KeyStr);
	bool EncodeData(char* srcBuffer,char* objBuffer, uint32 size);
	bool DecodeData(char* srcBuffer,char* objBuffer, uint32 size);
private:
	char* m_pBuffer;                      //带长度
	char* m_pBuf;                         //不带长度
	uint32* m_pLength;
	uint32 m_dwSize;

	CEncrypt* m_pEncode;			//加密类成员
	int32 m_EncodeID;					//当前的加密算法编号
	int32 m_NewEncodeID;				//准备更换的加密算法编号
	char m_KeyValue[17];			//当前的密钥
	char m_NewKeyValue[17];			//准备更换的密钥
};

  

static const uint32  g_nMaxDoubleSize = 1024 * 1024 * 10;   //扩展最大10M

cSendMsg::cSendMsg(uint32 sz)
{
	if (sz != 0)
		Init(sz);
	else
		m_pBuffer = NULL;
	m_pEncode = mempool_NEW(CEncrypt);
	m_NewEncodeID = 0;
	m_EncodeID = 0;
	InitEncode();

}

cSendMsg::~cSendMsg()
{
	Release();
	m_pBuf = NULL;
	mempool_DELETE(m_pEncode);
}

bool cSendMsg::Init(uint32 sz)
{
	m_dwSize = sz;
	uint32 nLen = sizeof(uint32);
	m_pBuffer = new char[sz + nLen];
	memset(m_pBuffer, 0, sizeof(sz + nLen));
	m_pLength = (uint32*)m_pBuffer;
	m_pBuf = m_pBuffer + nLen;
	if (m_pBuffer)
		GetLength() = 0;
	return true;
}

bool cSendMsg::DoubleBuf(uint32 dwSize)
{
	m_dwSize = max(m_dwSize * 2, dwSize);
	uint32 wLen = sizeof(uint32);
	char* pBuf = new char[m_dwSize + wLen];
	memset(pBuf,0, m_dwSize + wLen);
	memcpy(pBuf, m_pBuffer, GetLength() + wLen);
	delete[] m_pBuffer;
	m_pBuffer = pBuf;
	m_pLength = (uint32*)m_pBuffer;
	m_pBuf = m_pBuffer + wLen;
	return true;
}

bool cSendMsg::Release()
{
	if (m_pBuffer)
	{
		*m_pLength = 0;
		delete m_pBuffer;
		m_pBuffer = NULL;
	}
	return true;
}

bool cSendMsg::AddBuffer(const void* pMsg, uint32 nSize)
{
	//uint32 dwLen = sizeof(uint32);
	uint32 dwCurLen = *m_pLength;
	if (dwCurLen  + nSize > m_dwSize)
		return false;
	//memcpy(m_pBuf + dwCurLen, &nSize, dwLen);
	memcpy(m_pBuf + dwCurLen, pMsg, nSize);
	*m_pLength += nSize;
	return true;
}

//bool cSendMsg::AddQueueBufferA(const char* pBuf, uint32 nSize)
//{
//	uint32 dwCurLen = *m_pLength;
//	if (dwCurLen + nSize > m_dwSize)
//		return false;
//	memcpy(m_pBuf + dwCurLen, pBuf, nSize);
//	*m_pLength += nSize;
//	return true;
//}

bool cSendMsg::AddQueueBufferFront(const char* pBuf, uint32 sz)
{
	uint32 dwCurLen = *m_pLength;
	if (dwCurLen + sz > m_dwSize)
		return false;
	char* pTemp = new char[dwCurLen];
	memcpy(pTemp, m_pBuf, dwCurLen);
	memcpy(m_pBuf, pBuf, sz);
	memcpy(m_pBuf + sz, pTemp,dwCurLen);
	*m_pLength = sz + dwCurLen;
	delete[]pTemp;
	return true;
}

bool cSendMsg::AddQueueBuffer(const char* pBuf, uint32 nSize)
{
	while(!AddBuffer(pBuf,nSize))
	{
		std::cout << __FUNCTION__ <<"AllSize: "<<m_dwSize <<"  size: " << nSize << std::endl;
		if(GetLength() >= g_nMaxDoubleSize)
			return false;
		DoubleBuf();
	}
	return true;
}


void cSendMsg::ResetBuf()
{
	if (m_pBuffer)
	{
		*m_pLength = 0;
	}
}

void cSendMsg::ResetBufLen(uint32 nLen, uint32 off)
{
	if (*m_pLength != (off + nLen))
	{
		std::cout << __FUNCTION__ << "buffer长度出错了"<<std::endl;
	}
	if (m_pBuffer)
	{
		if (off)
		{
			memmove(m_pBuf, m_pBuf + nLen, off);
		}
		*m_pLength = off;
	}
}


bool cSendMsg::AddMsg(const void* pMsg, uint32 nSize)
{
	int32 i = 0;
	while(!AddBuffer(pMsg, nSize))
	{
		//std::cout << __FUNCTION__ <<" AllSize: "<<m_dwSize <<"  size: " << nSize << std::endl;
		if(GetLength() >= g_nMaxDoubleSize)
		{
			std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl;
			return false;
		}
		DoubleBuf();
	}
	return true;
}

bool cSendMsg::AddQueueMsg(const char* pBuf, uint32 nSize)
{
	while(!AddQueueBuffer(pBuf, nSize))
	{
		if(GetLength() >= g_nMaxDoubleSize)
		{
			std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl;
			return false;
		}
		DoubleBuf();
	}
	return true;
}



bool cSendMsg::AddQueueMsgFront(const char* pBuf, uint32 nSize)
{
	while(!AddQueueBufferFront(pBuf, nSize))
	{
		if(GetLength() >= g_nMaxDoubleSize)
		{
			std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl;
			return false;
		}
		DoubleBuf();
	}
	return true;
}

const char* cSendMsg::GetQueueBuffer(uint32& nLen, uint32& off) const
{
	//uint32 wLen = *m_pLength;
	//if (wLen > c_wMaxSendMsg)
	//{
	//	static char sChar[c_wMaxSendMsg];                   //注意 不是安全的 调用的地方需要枷锁
	//	memcpy(sChar, m_pBuf, c_wMaxSendMsg);
	//	nLen = c_wMaxSendMsg;
	//	*m_pLength -= nLen;
	//	memmove(m_pBuf,m_pBuf + c_wMaxSendMsg, *m_pLength);
	//	std::cout << __FUNCTION__ <<" still" << wLen - nLen << std::endl; 
	//	return sChar;
	//}
	//else
	//{
	//	nLen = wLen;
	//	*m_pLength = 0;
	//	return m_pBuf;
	//}
	uint32 wLen = *m_pLength;
	if (wLen > c_wMaxSendMsg)
	{
		nLen = c_wMaxSendMsg;
		off = wLen - nLen;
		std::cout << __FUNCTION__ <<" still size: " << off << std::endl; 
	}
	else
	{
		nLen = wLen;
		off = 0;
	}
	return m_pBuf;
}

const char* cSendMsg::GetGroupBuffer(uint32& nLen, uint32& off) const
{
	uint32 wLen = *m_pLength;
	if (wLen > c_wMaxSendMsg)
	{
		nLen = c_wMaxSendMsg;
		off = wLen - nLen;
		std::cout << __FUNCTION__ <<" all size:"<< wLen <<" still: " << off << std::endl; 
	}
	else
	{
		nLen = wLen;
		off = 0;
	}
	return m_pBuf;
}

const char* cSendMsg::GetBuffer(uint32& len) const
{
	len = *m_pLength;
	return m_pBuf;	
}

/////////////////////////////////////////////////////////////////////////
//   以下代码为处理消息加密操作
//

//消息加密方法
void cSendMsg::Encode(const char* buffer, uint32 size)
{
	while(size > m_dwSize)
		DoubleBuf(size);
	Assert(size <= m_dwSize);
	//ENCODE_START 
	GetLength() = size;
	//encode(buffer,size,m_buffer);
	m_pEncode->Encrypt((char*)buffer,m_pBuf,size);
	//ENCODE_END
}

//消息解密方法
void cSendMsg::Decode(const char* buffer, uint32 size)
{
	while(size > m_dwSize)
		DoubleBuf(size);
	Assert(size <= m_dwSize);
	GetLength() = size;
	//decode(buffer,size,m_buffer);
	m_pEncode->Decrypt((char*)buffer,m_pBuf,size);
}

//设置加密算法
bool cSendMsg::SetEncode(int32 id, const char* KeyStr) 
{
	m_EncodeID = id;
	//ZeroMemory(m_KeyValue,17);
	memcpy(m_KeyValue,KeyStr,17);
	return m_pEncode->MakeKey((CEncrypt::EncryptEnum)m_EncodeID,m_KeyValue);
};

//随机生成加密算法编号
void cSendMsg::GenerateRandEncodeID()
{
	//m_NewEncodeID = nsCommon::RandomAInt()% CEncrypt::Encrypt_MaxNum;
	m_NewEncodeID = CEncrypt::Encrypt_XOR256;
}

//随机生成加密密钥字符串(16个字符,128位)
void cSendMsg::GenerateRandKey()
{
	for(int32 i=0; i<16; i++)
		m_NewKeyValue[i] = (char)nsCommon::RandomInt(32, 126);		//从space至~字符[32-126]
	m_NewKeyValue[16] = '';
}

//初始化加密算法编号与密钥,用于首次使用
void cSendMsg::InitEncode()
{
	m_pEncode->MakeKey(CEncrypt::Encrypt_XOR256,"[WWW-7zgame-COM]");
}

//获取准备更换的加密算法编号和加密密钥字符串
void cSendMsg::GetNewEncode(int32& EncodeID, char* KeyStr)
{
	EncodeID = m_NewEncodeID;
	memcpy(KeyStr,m_NewKeyValue,17);
}

//以下两个加密解密方法,专门为Msg_Net_QueryEncodeLogin和Msg_Net_ChangeEncode特别定制的
bool cSendMsg::EncodeData(char* srcBuffer,char* objBuffer, uint32 size)
{
	return m_pEncode->Encrypt(srcBuffer,objBuffer,size);
}

bool cSendMsg::DecodeData(char* srcBuffer,char* objBuffer, uint32 size)
{
	return m_pEncode->Decrypt(srcBuffer,objBuffer,size);
}

  

      

原文地址:https://www.cnblogs.com/Jimmy104/p/5310405.html