基于libevent的tcp拆包分包库

         TCP/IP协议虽然方便,但是由于是基于流的传输(UDP是基于数据报的传输),无论什么项目,总少不了解决拆包分包问题。

         以前的项目总是每个程序员自己写一套拆包分包逻辑,实现的方法与稳定性都不太一致。终于有了做基线的机会,自己写了一个基于libevent的拆包分包库。

         本文档黏贴一些核心的内容。

         //回调接口

         

class ITcpPacketNotify
{
public:
virtual void OnConnected(int fd) = 0;
virtual void OnDisConnected(int fd) = 0;
virtual void OnTimeOutError(int fd) = 0;
virtual void OnBuffOverFlow(int fd) = 0;
//提取包,需要业务层返回解析出包的长度,或者舍弃一些不合格的包,成功解析出包返回true
virtual bool OnAnalyzePacket(int fd,const char* buff,int bufflen,int& packetlen,int &ignore) = 0;
//业务层处理包回调,如果需要返回包,可以直接在respond参数和respondlen参数返回,长度不得超过40960
virtual void OnPacketArrived(int fd,const char* packet,int packetlen,char* respond,int& respondlen) = 0;
};

      提供了两种错误情况的通知,跟别的库不太一样的地方是,需要业务层实现拆包逻辑,毕竟每个项目的协议不一样。然后就会收到packet的通知。

//提取包,需要业务层返回解析出包的长度,或者舍弃一些不合格的包,成功解析出包返回true
//1B 1B 2B 4B NB 2B 2B
//帧头 命令字 帧序号 帧长度 帧数据 校验字 帧尾
// HEAD CMD FRAME_SEQ DATA_LEN DATA CRC TAIL

virtual bool OnAnalyzePacket(int fd, const char* buff, int bufflen,int& packetlen, int &ignore)
{
if (bufflen<=12)
{
return false;
}
if (buff[0]!=(char)'!')
{
for (int i=1;i<bufflen;i++)
{
if (buff[i] == (char)'!')
{
ignore = i;
break;
}
}
return false;
}

//数据长度
int length = 0;
memcpy((void*)&length, (void*)&buff[4], 4);
length = ntohl(length);
if (bufflen<length+12)
{
return false;
}

packetlen = length + 12;
return true;
}

      

上面是某种协议的拆包例子。

typedef struct _TcpPacketConfig
{
int _port; //服务器的端口号
short _workNum; //工作的线程数目
unsigned int _connNum; //每个工作线程的连接数
int _readTimeOut; //读取的超时时间
int _writeTimeOut; //写入的超时时间

_TcpPacketConfig()
{
_connNum = 100;
_workNum = 5;
_readTimeOut = 120;
_writeTimeOut = 120;
_port = 8000;
}

} TcpPacketConfig;

class ITcpPacketManager
{
public:
virtual bool Start(_TcpPacketConfig & config, ITcpPacketNotify* notify) = 0;
virtual void Stop() = 0;
virtual bool SendPacket(int fd,const char* packet,int packetlen) = 0;
};

TCPPACKET_API ITcpPacketManager* CreateTcpPacketManager();

TCPPACKET_API void DestroyTcpPacketManager(ITcpPacketManager* manager);

对外的接口方法。

bool CTcpPacketImp::Start(_TcpPacketConfig & config, ITcpPacketNotify* notify)
{
return m_libEvent.StartServer(config._port, config._workNum, config._connNum, config._readTimeOut, config._writeTimeOut,notify);
}

void CTcpPacketImp::Stop()
{
return m_libEvent.StopServer();
}

bool CTcpPacketImp::SendPacket(int fd, const char* packet, int packetlen)
{
return m_libEvent.SendPacket(fd,packet,packetlen);
}

转移到m_libEvent实现。

最核心的功能代码如下。

一些数据定义:

#pragma once
#include <event2/bufferevent.h>
#include <event2/bufferevent_compat.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/buffer_compat.h>
#include <event2/http_struct.h>
#include <event2/bufferevent.h>
#include <event2/thread.h>
#include <thread>
#include <mutex>

struct _Conn;
struct _Worker;
class CLibEvent;
//服务器属性封装对象
struct _Server
{
bool bStart;
short nPort;
short workernum;
unsigned int connnum;
volatile int nCurrentWorker;
int read_timeout;
int write_timeout;
struct evconnlistener *pListener;
struct event_base *pBase;
std::thread hThread;
_Worker *pWorker;
};


//连接对象列表
struct _ConnList
{
_ConnList()
{
head=NULL;
tail=NULL;
plistConn=NULL;
}
_Conn *head;
_Conn *tail;
_Conn *plistConn;
};
//连接对象
struct _Conn
{
_Conn()
{
fd=NULL;
bufev=NULL;
index=-1;
in_buf_len=0;
//out_buf_len=0;
in_buff_max = 0;
//out_buff_max = 0;
owner=NULL;
next=NULL;
pImp = NULL;
in_buf=NULL;
//out_buf=NULL;
}
~_Conn()
{
delete[]in_buf;
//delete[]out_buf;
bufferevent_free(bufev);
bufev = NULL;
}
struct bufferevent *bufev;
evutil_socket_t fd;
int index;
char *in_buf;
int in_buf_len;
//char *out_buf;
//int out_buf_len;
int in_buff_max;
//int out_buff_max;
_Worker *owner;
_Conn *next;
CLibEvent* pImp;
};
//工作线程封装对象.
struct _Worker
{
_Worker()
{
pWokerbase=NULL;
pListConn=NULL;

}
~_Worker()
{

}
struct event_base *pWokerbase;
std::thread hThread;
_ConnList *pListConn;
std::recursive_mutex cs;
_Conn* GetFreeConn()
{
_Conn*pItem=NULL;
cs.lock();
if(pListConn->head!=pListConn->tail)
{
pItem=pListConn->head;
pListConn->head=pListConn->head->next;
}
cs.unlock();
return pItem;
}
void PutFreeConn(_Conn *pItem)
{
cs.lock();
pListConn->tail->next=pItem;
pListConn->tail=pItem;
pItem->next = NULL;
cs.unlock();
}
};


typedef struct _Server Server;
typedef struct _Worker Worker;
typedef struct _Conn Conn;
typedef struct _ConnList ConnList;

头文件:

class CLibEvent
{
public:
CLibEvent(void);
~CLibEvent(void);
private:
//当前服务器对象
Server m_Server;
public:
bool StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,ITcpPacketNotify* notify);
void StopServer();
bool SendPacket(int fd, const char* packet, int packetlen);
private:
static void DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data);
static void DoError(struct bufferevent *bev, short error, void *ctx);
static void CloseConn(Conn *pConn);
static void DoRead(struct bufferevent *bev, void *ctx);
static DWORD WINAPI ThreadServer(LPVOID lPVOID);
static DWORD WINAPI ThreadWorkers(LPVOID lPVOID);

static ITcpPacketNotify * m_notify;
};

cpp:

#include "StdAfx.h"
#include "LibEvent.h"
#include <string>
#include <iostream>
#include <signal.h>

#ifdef WIN32
#include <WinSock2.h>
#include <WS2tcpip.h>
#else
#include<netinet/in.h>
#include<sys/socket.h>
#include<unistd.h>
#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#endif

using namespace std;

CLibEvent::CLibEvent(void)
{
#ifdef WIN32
ZeroMemory(&m_Server, sizeof(m_Server));
WSADATA WSAData;
WSAStartup(0x0201, &WSAData);
#else
memset(&m_Server, 0, sizeof(m_Server));
#endif // WIN32
}

CLibEvent::~CLibEvent(void)
{
#ifdef WIN32
WSACleanup();
#endif // WIN32
}

bool CLibEvent::StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,
int inbuffmax, int outbuffmax, ITcpPacketNotify* notify)
{
m_notify = notify;
m_Server.bStart=false;
m_Server.nCurrentWorker=0;
m_Server.nPort=port;
m_Server.workernum=workernum;
m_Server.connnum=connnum;
m_Server.read_timeout=read_timeout;
m_Server.write_timeout=write_timeout;
#ifdef WIN32
evthread_use_windows_threads();
#else
evthread_use_pthreads();
#endif // WIN32

m_Server.pBase=event_base_new();
if (m_Server.pBase==NULL)
{
return false;
}
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(m_Server.nPort);
m_Server.pListener=evconnlistener_new_bind(m_Server.pBase,DoAccept,(void*)this,LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, 1024,(struct sockaddr*)&sin,sizeof(sin));
if (m_Server.pListener==NULL)
{
return false;
}

m_Server.pWorker=new Worker[workernum];
for (int i=0;i<workernum;i++)
{
m_Server.pWorker[i].pWokerbase=event_base_new();
if (m_Server.pWorker[i].pWokerbase== NULL)
{
delete []m_Server.pWorker;
return false;
}
//初始化连接对象
{
m_Server.pWorker[i].pListConn=new ConnList();
if (m_Server.pWorker[i].pListConn==NULL)
{
return false;
}
m_Server.pWorker[i].pListConn->plistConn=new Conn[m_Server.connnum+1];
m_Server.pWorker[i].pListConn->head=&m_Server.pWorker[i].pListConn->plistConn[0];
m_Server.pWorker[i].pListConn->tail=&m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum];
for (unsigned j=0; j<m_Server.connnum; j++) {
m_Server.pWorker[i].pListConn->plistConn[j].index=j;
m_Server.pWorker[i].pListConn->plistConn[j].next=&m_Server.pWorker[i].pListConn->plistConn[j+1];
}
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].index=m_Server.connnum;
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].next=NULL;
//设置当前事件
Conn *p=m_Server.pWorker[i].pListConn->head;
while (p!=NULL)
{
p->bufev=bufferevent_socket_new(m_Server.pWorker[i].pWokerbase,-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (p->bufev==NULL)
{
return false;
}
bufferevent_setcb(p->bufev, DoRead, NULL, DoError, p);
bufferevent_setwatermark(p->bufev, EV_READ, 0, 0); //使用默认的高低水位
bufferevent_enable(p->bufev, EV_READ);
p->owner=&m_Server.pWorker[i];
p->pImp = this;
p->in_buff_max = inbuffmax;
p->in_buf = new char[inbuffmax];
p=p->next;
}
}
m_Server.pWorker[i].hThread = std::thread(ThreadWorkers, &m_Server.pWorker[i]);
}
m_Server.hThread= std::thread(ThreadServer,&m_Server);
m_Server.bStart=true;
return true;
}

void CLibEvent::StopServer()
{
if (m_Server.bStart)
{
struct timeval delay = { 2, 0 };
event_base_loopexit(m_Server.pBase, NULL);
m_Server.hThread.join();
if (m_Server.pWorker)
{
for (int i=0;i<m_Server.workernum;i++)
{
event_base_loopexit(m_Server.pWorker[i].pWokerbase, NULL);
m_Server.pWorker[i].hThread.join();
}
for (int i=0;i<m_Server.workernum;i++)
{
if (m_Server.pWorker[i].pListConn)
{
delete []m_Server.pWorker[i].pListConn->plistConn;
delete m_Server.pWorker[i].pListConn;
m_Server.pWorker[i].pListConn=NULL;
}
event_base_free(m_Server.pWorker[i].pWokerbase);
}
delete[]m_Server.pWorker;
m_Server.pWorker=NULL;
}
evconnlistener_free(m_Server.pListener);
event_base_free(m_Server.pBase);
}
m_Server.bStart=false;
}

void CLibEvent::DoRead(struct bufferevent *bev, void *ctx)
{
struct evbuffer * input=bufferevent_get_input(bev);
if (evbuffer_get_length(input))
{
Conn *c = (Conn*) ctx;
while (evbuffer_get_length(input))
{
//超过emMaxBuffLen还没有被消费掉,无能为力了。
if (c->in_buf_len >= c->in_buff_max)
{
const char* msg = "no more buff";
c->pImp->m_notify->OnError(c->fd,105,&msg);
CloseConn(c);
return;
}

//拷贝缓冲池的内存到Conn,最大缓冲不超过emMaxBuffLen
c->in_buf_len += evbuffer_remove(input, c->in_buf + c->in_buf_len, c->in_buff_max - c->in_buf_len);
//抛给业务层去解析包
while (true)
{
int packlen = 0, ignore = 0;
bool bRet = c->pImp->m_notify->OnAnalyzePacket(c->fd, c->in_buf,c->in_buf_len,packlen, ignore);
if (!bRet) //可能要舍弃一些脏数据
{
if (ignore > 0)
{
c->in_buf_len -= ignore; //缓冲长度变少
if (c->in_buf_len>0)
{
memmove(c->in_buf, c->in_buf + ignore, c->in_buf_len);
}

}
else
{
//解析包失败了,往往是长度不够,跳出此循环继续读缓冲数据
break;
}
}
else
{
if (packlen>c->in_buf_len)
{
//用户解析的时候未考虑长度
break;
}
//解析成功,通知业务层处理
int out_len = 0;
c->pImp->m_notify->OnPacketArrived(c->fd, c->in_buf, packlen,0, out_len);
/*if (c->out_buf_len !=0)
{
//回复报文
SendPacket(c->fd, c->out_buf, c->out_buf_len);
//移除数据
c->out_buf_len = 0;
}*/
//移除这个包文
c->in_buf_len -= packlen; //缓冲长度变少
if (c->in_buf_len>0)
{
memmove(c->in_buf, c->in_buf + packlen, c->in_buf_len);
}

}
}

}

}
}

void CLibEvent::CloseConn(Conn *pConn)
{
pConn->in_buf_len = 0;
//pConn->out_buf_len = 0;
bufferevent_setfd(pConn->bufev, -1);
pConn->pImp->m_notify->OnDisConnected(pConn->fd);
evutil_closesocket(pConn->fd);


pConn->fd = NULL;

bufferevent_set_timeouts(pConn->bufev, NULL, NULL); //取消读写超时时间
bufferevent_enable(pConn->bufev, EV_READ );

if (pConn->owner) //server
{
pConn->owner->PutFreeConn(pConn);
}


}

void CLibEvent::DoError(struct bufferevent *bev, short error, void *ctx)
{
Conn *c=(Conn*)ctx;
if (error & BEV_EVENT_CONNECTED)
{
int fd = bufferevent_getfd(bev);
c->fd = fd;
evutil_make_socket_nonblocking(fd);
bufferevent_setfd(c->bufev, c->fd);
//转发发送事件
c->pImp->m_notify->OnConnected(c->fd);
bufferevent_enable(c->bufev, EV_READ | EV_WRITE);
return;
}
if (error&EVBUFFER_TIMEOUT)
{
c->pImp->m_notify->OnTimeOutError(c->fd);
}else if (error&EVBUFFER_ERROR)
{
int err = EVUTIL_SOCKET_ERROR();
const char* msg = evutil_socket_error_to_string(err);
c->pImp->m_notify->OnError(c->fd,err,&msg);
}
else if (error & EVBUFFER_EOF)
{
const char* msg = "close";
c->pImp->m_notify->OnError(c->fd, 0, &msg);
}
CloseConn(c);
}

void CLibEvent::DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data)
{
//此处为监听线程的event.不做处理.
CLibEvent* pThis = (CLibEvent*)user_data;
Server *pServer = &(pThis->m_Server);
//主线程处做任务分发.
int nCurrent=pServer->nCurrentWorker++%pServer->workernum;
//当前线程所在ID号
Worker &pWorker=pServer->pWorker[nCurrent];
//通知线程开始读取数据,用于分配哪一个线程来处理此处的event事件
Conn *pConn=pWorker.GetFreeConn();
if (pConn==NULL)
{
const char* msg = "!!!!连接数已经被用完!!!!";
pThis->m_notify->OnError(fd, 2,&msg);
return;
}
pConn->fd=fd;

evutil_make_socket_nonblocking(pConn->fd);
bufferevent_setfd(pConn->bufev, pConn->fd);
//转发发送事件
pThis->m_notify->OnConnected(pConn->fd);
struct sockaddr_in *sin = (sockaddr_in*)sa;
char* ClientIP =new char[256]; //客户端的IP地址
memset(ClientIP, 0, 256);
memcpy(ClientIP, inet_ntoa(sin->sin_addr), 20);
pThis->m_notify->OnError(pConn->fd, 0, (const char**)&ClientIP);//临时用下,确认ip
delete[]ClientIP;
}

void CLibEvent::ThreadServer(void* lPVOID)
{
Server * pServer=reinterpret_cast<Server *>(lPVOID);
if (pServer==NULL)
{
return;
}
event_base_dispatch(pServer->pBase);
return ;
}

void CLibEvent::ThreadWorkers(void* lPVOID)
{
Worker *pWorker=reinterpret_cast<Worker *>(lPVOID);
if (pWorker==NULL)
{
return ;
}
event_base_dispatch(pWorker->pWokerbase);

return ;
}

bool CLibEvent::SendPacket(int fd, const char* packet, int packetlen)
{
int nLeft, nWritten;
const char* pBuf = packet;
nLeft = packetlen;

//加个锁
m_cs.lock();
while (nLeft > 0)
{
nWritten = send(fd, pBuf, nLeft, 0);
if (nWritten == SOCKET_ERROR)
{
int err = EVUTIL_SOCKET_ERROR();
const char* msg = evutil_socket_error_to_string(err);
m_notify->OnError(fd, err, &msg);
m_cs.unlock();
return false;
}
nLeft -= nWritten;
pBuf += nWritten;
}
m_cs.unlock();
return true;

}

makefile:

DBUG=FALSE
SRC_GENERIC=LibEvent.cpp TcpPacketImp.cpp tcpPacket.cpp

HIK_LINUX_LIB=../../../public/lib/x64/linux

GENERIC_SOURCES=$(SRC_GENERIC)
GENERIC_OBJECTS=$(SRC_GENERIC:.cpp=.o)

KERNAL_VER=$(shell uname -r)
KERNAL_ID=$(shell uname -s)

#����Linuxƽ̨
ifeq ($(KERNAL_ID), Linux)

PLATFORM_SPEC=-DLINUX
SYS_USRLIB_DIR=/usr/lib
SYS_LIB_DIR=/lib
LIB_DIR=../../../public/LinuxSys32Lib/
OUTPUT_LIB_DIR=../Release/
BOOST_LIB=/usr/local/lib
BOOST_INCLUDE=/usr/local/include/
LIBEVENT_LIB=/opt/libevent/lib
LIBEVENT_INC=/opt/libevent/include
COMMON_INCLUDE=../../../public/include/

#����debug�汾
ifeq ($(DBUG),TRUE)
LIB_DIR=
endif
#����ifeq ($(DBUG),TRUE)

BUILD_DIR=./
RM =rm -rf
CC=g++ -m64 -std=c++11
#-g -rdynamic
#LDFLAGS =-lc -lm -L$(SYS_USRLIB_DIR) -L$(SYS_LIB_DIR) -L$(BOOST_LIB) -L$(LIBEVENT_LIB)
LDFLAGS =-lc -lm -L$(SYS_LIB_DIR) -L$(BOOST_LIB) -L$(LIBEVENT_LIB)
OBJECT_EXTENSION=o
OBJECT_EXE=libTcpPacket.so
SPECIFIC_CFLAGS=-shared -fPIC

CFLAGS=-w -O2 -fstrength-reduce -finline-functions -ffast-math -DNDEBUG -I$(LIBEVENT_INC) -I$(DY_INCLUDE) -I./ -I$(COMMON_INCLUDE) -I/usr/include/nptl -I/usr/local/include/boost $(PLATFORM_SPEC)

CDFLAGS=-DDEBUG -Wall -g -fstrength-reduce -finline-functions -ffast-math -I/usr/include/nptl -I$(BOOST_INCLUDE) $(COMMON_INCLUDE)
#����debug�汾
ifeq ($(DBUG),TRUE)
CFLAGS =$(CDFLAGS)
endif
#����ifeq ($(DBUG),TRUE)

SPECIFIC_LDFLAGS= -L$(SYS_USRLIB_DIR)/ntpl -L$(LIB_DIR) -ldl -lpthread -levent -levent_pthreads -lboost_system -Wl,-rpath,./ -Wl,-rpath-link,$(LIB_DIR)
endif
#����ifeq ($(KERNAL_ID), Linux)

OBJECTS = $(GENERIC_OBJECTS)
VPATH = $(BUILD_DIR)
#-----------------------------------------------------------------------------
# The default rule
#-----------------------------------------------------------------------------
.SUFFIXES: .$(OBJECT_EXTENSION) .cpp
all: $(OBJECT_EXE)
@echo
@echo "---------------------------------------------------------------"
@echo " $(SHARED_LIB) has been successfully built."
@echo
@echo " * Binaries are currently located in the '$(OUTPUT_LIB_DIR)' directory"
@echo "---------------------------------------------------------------"
@echo "$(BOOST_LIB) "
@echo "$(GENERIC_SOURCES)"
@echo

$(OBJECTS): $(GENERIC_SOURCES)

.cpp.$(OBJECT_EXTENSION):
@echo " C: $(@D)/$(<F)"
@$(CC) -c $(SPECIFIC_CFLAGS) $(CFLAGS) $< -o $(BUILD_DIR)$@

$(OBJECT_EXE): $(BUILD_DIR)$(OBJECTS)
@echo " All object file: $(OBJECTS)"
@echo " L: $(@F)"
@$(CC) $(SPECIFIC_CFLAGS) $(OBJECTS) $(LDFLAGS) $(SPECIFIC_LDFLAGS) -o $(OUTPUT_LIB_DIR)$(OBJECT_EXE)

.PHONY: clean

clean:
@echo " Clean: object files"
@find $(BUILD_DIR) -name "*.o" -exec $(RM) {} ;

copy:
@cp $(OUTPUT_LIB_DIR)/*.so $(BOOST_LIB)
@cp $(OUTPUT_LIB_DIR)/*.so $(HIK_LINUX_LIB)

完整源码见:https://github.com/RibbonServyou/tcppacket

原文地址:https://www.cnblogs.com/xuhuajie/p/7435540.html