6.基于ZMQ的游戏网络层基础架构

对于内网服务器的通信采用zmq来进行,对于和客户端的通信采用boost的asio来。这里先来搭建zmq的基础结构。

zmq相关的知识可以去zmq官方网站查询。

这里使用zmq的push 和pull来进行通信。

先放一张结构图:


其中PushZmq是推管道, PullZmq是拉管道:

对于Push的流程是:

zmq_init()----> zmq_socket()---->zmq_connect()---->zmq_init_size()----->zmq_init_data--->zmq_send()--->zmq_msg_close()--->zmq_close()---->zma_term()

具体见代码:

PushZmq.h

#ifndef __PUSH_ZMQ_H__
#define __PUSH_ZMQ_H__

#include <zmq.h>
#include <string.h>
#include <iostream>
#include <glog/logging.h>

using namespace std;

class PushZmq
{
public:
    PushZmq(const char* url, void* zmqContext = NULL);
    ~PushZmq();

    size_t Send(const char* buffer, size_t length);
private:
    string _strUrl;
    void* _ctx;
    void* _socket;
};

#endif


PushZmq.cpp

#include "pushZmq.h"
#include "proto/hello.pb.h"

using namespace hello;

int main()
{
    FLAGS_minloglevel = google::INFO;
    google::InitGoogleLogging("");
    google::SetLogDestination(google::INFO, "../");
    google::SetLogFilenameExtension("log_");
    google::LogToStderr();

    string url = "tcp://127.0.0.1:5555";
    PushZmq* push = new PushZmq(url.c_str());
    //string sendContent = "Hello Pull.I am From Push!";
    PbMsgHello helloMsg;
    helloMsg.set_helloint(123456);
    helloMsg.set_hellostring("ni hao wang peng ");

    int length = helloMsg.ByteSize();
    char* buffer = (char*)malloc(length);
    
    helloMsg.SerializeToArray(buffer, length);
    push->Send(buffer, length);
    free(buffer);
    return 0;
}

PushZmq::PushZmq( const char* url, void* zmqContext /*= NULL*/ )
:_strUrl(url)
,_ctx(zmqContext)
{
    if(!_ctx)
    {
        _ctx = zmq_init(1);
    }

    _socket = zmq_socket(_ctx, ZMQ_PUSH);
    if(!_socket)
    {
        cout << "Error int zmq_socket:" << zmq_strerror(errno) << endl;
        return;
    }

    int rc = zmq_connect(_socket, _strUrl.c_str());
    if(rc != 0 )
    {
        cout << "error in zmq_connect:" << zmq_strerror(errno) << endl;
        return;
    }
}

PushZmq::~PushZmq()
{
    zmq_close(_socket);
    zmq_term(_ctx);
}

size_t PushZmq::Send( const char* buffer, size_t length )
{
    zmq_msg_t msg;
    int rc = zmq_msg_init_size(&msg, length);
    memcpy((char*)zmq_msg_data(&msg), buffer, length);

    rc = zmq_send(_socket, &msg, ZMQ_NOBLOCK);
    if(rc < 0)
    {
        cout << "error in zmq_send:" << zmq_strerror(errno) << endl;
        zmq_msg_close(&msg);
        return -1;
    }
    zmq_msg_close(&msg);

    LOG(INFO) << "Send Hello success: rc=" << rc;  
    return rc;
}


对于Pull的流程是:

zmq_init()--->zmq_socket()--->zmq_bind()--->zmq_poll--->zmq_msg_init()---->zmq_recv()--->zmq_msg_data()--->zmq_msg_size()-------调用具体处理函数--->zmq_close-->zmq_msg_close--->zmq_close()--->zmq_term

PullZmq.h

#ifndef __PULL_ZMQ_H__
#define __PULL_ZMQ_H__

#include <zmq.h>
#include <iostream>
#include <string.h>
#include <glog/logging.h>
#include <boost/bind.hpp>
#include <boost/function.hpp>
using namespace std;

class PullZmq
{
public:
    typedef boost::function<bool(const char*, size_t)> TypeOnMessage;

    PullZmq(const char* url, TypeOnMessage onPipeMessage, void* zmqContext=NULL);
    ~PullZmq();

    void Run();
private:
    void* _ctx;
    string _strUrl;
    void* _socket;
    TypeOnMessage _onMessage;
};
#endif


PullZmq.Cpp:

#include "PullZmq.h"
#include "proto/hello.pb.h"
using namespace hello;

bool TestOnMessage( const char* buffer, size_t length );
int main()
{
    FLAGS_minloglevel = google::INFO;
    google::InitGoogleLogging("");
    google::SetLogDestination(google::INFO, "../");
    google::SetLogFilenameExtension("log_");
    google::LogToStderr();

    string url = "tcp://*:5555";
    PullZmq* pull = new PullZmq(url.c_str(),
        boost::bind(TestOnMessage, _1, _2));

    pull->Run();

    return 0;
}

PullZmq::PullZmq( const char* url, TypeOnMessage onPipeMessage, void* zmqContext )
:_strUrl(url)
,_onMessage(onPipeMessage)
, _ctx(zmqContext)
{
    if(!_ctx)
    {
        _ctx = zmq_init(1);
        if(!_ctx)
        {
            cout << "error in zmq_init:" << zmq_strerror(errno) << endl;
            return;
        }
    }

    _socket = zmq_socket(_ctx, ZMQ_PULL);
    if (!_socket)
    {
        LOG(ERROR) << "Error in zmq_socket:" << zmq_strerror(errno);
        return;
    }

    int rc = zmq_bind(_socket, url);
    if(rc != 0)
    {
        LOG(ERROR) << "error in zmq_bind:" << zmq_strerror(errno);
        return;
    }
}

PullZmq::~PullZmq()
{
    int rc = zmq_close(_socket);
    if(rc != 0)
    {
        LOG(ERROR) << "error in zmq_close:" << zmq_strerror(errno);
    }
    rc = zmq_term(_ctx);
    if(rc !=0 )
    {
        LOG(ERROR) << "error in zmq_term:" << zmq_strerror(errno);
    }
}

void PullZmq::Run()
{
    zmq_pollitem_t item;
    item.socket = _socket;
    item.events = ZMQ_POLLIN;

    long pollWaitTime = 1000;
    bool bLoop = true;

    while(bLoop)
    {
        int rc = zmq_poll(&item, 1, -1);
        if(rc < 0)
        {
            LOG(ERROR) << "error in zmq_poll:" << zmq_strerror(errno);
        }else if(rc ==0)
        {
            //LOG(ERROR) << "On Idle!";
        }else 
        {
            int msgCount = rc;
            while(msgCount--)
            {
                zmq_msg_t msg;
                rc = zmq_msg_init(&msg);
                if (rc !=0 )
                {
                    LOG(ERROR) << "error in zmq_msg_init:" << zmq_strerror(errno);
                    return;
                }

                rc = zmq_recv(_socket, &msg, 0);
                if(rc != 0)
                {
                    LOG(ERROR) << "error in zmq_recv:" << zmq_strerror(errno);
                    zmq_msg_close(&msg);
                    continue;
                }

                void* buffer = zmq_msg_data(&msg);
                size_t len = zmq_msg_size(&msg);
                bLoop = _onMessage((const char*)buffer, len);
                zmq_msg_close(&msg);
            }
        }
    }

}

bool TestOnMessage( const char* buffer, size_t length )
{
    LOG(INFO) << "TestOnMessage:";
    
    PbMsgHello helloMsg;
    helloMsg.ParseFromArray(buffer, length);
    LOG(INFO) << " helloInt = " << helloMsg.helloint()
        << " helloString = " << helloMsg.hellostring();

    //string content;
    //content.append(buffer);
    //LOG(INFO) << "buffer = " << content << " length = " << length;

    return true;
}


对应Makefile为:

all:	pull push

hello.o:
	g++ -c -o hello.o proto/hello.pb.cc	

pull:	hello.o
	g++ -o pullZmq hello.o PullZmq.cpp -lzmq -lglog -lboost_filesystem -lprotobuf

push:	hello.o
	g++ -o pushZmq hello.o PushZmq.cpp -lzmq -lglog -lboost_filesystem -lprotobuf


clean:
	rm -rf *.o
	rm -rf pullZmq
	rm -rf pushZmq


对于上文的cpp中,开启了Protobuffer的 因此需要导入protobuffer的支持,对应proto文件

hello.proto为:

package hello;
message PbMsgHello
{
	required string helloString = 1;
    required int32 helloInt =2;
}


运行以上cpp  可以实现 在push端包装一个Protobuffer的Message 在序列化之后Push到Pull端, Pull端接受到消息后进行解析 并读Message中的内容。

结果如下:

pull端:


Push端:



可见在Push端组装的 int 和string 在pull端成功解析。


下一步应该进行Message的包装,以及ProtoBuffer的反射解析。即根据类型来自动生成解析所需的Message类型。


1-6章节对于源码下载:http://download.csdn.net/detail/jcracker/6267125

原文地址:https://www.cnblogs.com/suncoolcat/p/3323112.html