ZeroMQ使用汇总

ZeroMQ,史上最快的消息队列 —– ZMQ的学习和研究

 ZeroMQ 的模式

[架构] ZeroMQ 深度探索(一)

 消息队列ZeroMQ

 

服务端使用流程:

  

void* m_Context;
void* m_sktMsgVideoFrame;

m_sktMsgVideoFrame = zmq_socket(m_Context,ZMQ_PUB);
    int ret = zmq_bind(m_sktMsgVideoFrame,bytesMsgVideoFrameAddress.data());

 zmq_msg_t msg;

   zmq_msg_init_size(&msg,frameBuffSize+MIN_MSG_LEN);
   memcpy(zmq_msg_data(&msg),MSG_VIDEO_FRAME,5);

   memcpy((char*)zmq_msg_data(&msg)+5, frameBuff, frameBuffSize); //

   int iRet = zmq_msg_send(&msg, (zmq_msg_t*)m_sktMsgVideoFrame, 0);


   zmq_msg_close(&msg);


zmq_close(m_sktMsgVideoFrame);
zmq_ctx_shutdown(m_Context);

  

客户端流程:

QByteArray bytesMsgControlAddress = g_strMsgControlAddress.toUtf8();
    QByteArray bytesMsgReqCaptureRetAddress = g_strMsgReqCaptureRetAddress.toUtf8();
    QByteArray bytesMsgVideoFrameAddress = g_strMsgVideoFrameAddress.toUtf8();


    void* context = zmq_ctx_new();

    void* m_pSktMsgControl = zmq_socket(context, ZMQ_SUB);
    int ret = zmq_connect(m_pSktMsgControl,bytesMsgControlAddress.data());
    qDebug()<<"MSG_CONTROL PULL 连接地址:"<<g_strMsgControlAddress;
    qDebug()<<"MSG_CONTROL PULL 连接结果:"<<ret;
    ret = zmq_setsockopt(m_pSktMsgControl, ZMQ_SUBSCRIBE, "", 0);/// 必须添加该语句对消息滤波,否则接受不到消息


    void* m_pSktMsgReqCaptureRet = zmq_socket(context, ZMQ_SUB);
    ret = zmq_connect(m_pSktMsgReqCaptureRet, bytesMsgReqCaptureRetAddress.data());
    qDebug()<<"MSG_REQ_CAPUTRE_RET PULL 连接地址:"<<g_strMsgReqCaptureRetAddress;
    qDebug()<<"MSG_REQ_CAPUTRE_RET PULL 连接结果:"<<ret;

    ret = zmq_setsockopt(m_pSktMsgReqCaptureRet, ZMQ_SUBSCRIBE, "", 0);/// 必须添加该语句对消息滤波,否则接受不到消息

    void* m_pSktVideoFrame = zmq_socket(context, ZMQ_SUB);
    ret = zmq_connect(m_pSktVideoFrame, bytesMsgVideoFrameAddress.data());    

    qDebug()<<"MSG_VIDEO_FRAME PULL 连接地址:"<<g_strMsgVideoFrameAddress;
    qDebug()<<"MSG_VIDEO_FRAME PULL 连接结果:"<<ret;

    ret = zmq_setsockopt(m_pSktVideoFrame, ZMQ_SUBSCRIBE, "", 0);/// 必须添加该语句对消息滤波,否则接受不到消息

    while (true)
    {
        //一直监听来自其他模块的PUSH消息,采用非阻塞模式
         zmq_msg_t msg;
        int responseLen = zmq_msg_init(&msg);
        responseLen = zmq_msg_recv(&msg,receiver, ZMQ_DONTWAIT);
        if(responseLen<MIN_MSG_LEN)
        {
             //qDebug()<<"收到消息长度小于最小消息长度,本次消息无效.";
             zmq_msg_close(&msg);
             return;
        }

        BYTE* pStr = (BYTE*)malloc(responseLen);
        memcpy(pStr, zmq_msg_data(&msg), responseLen);

        char head[MIN_MSG_LEN+1] = {0};
        memcpy(head, pStr, MIN_MSG_LEN);

        QString headStr = QString::fromUtf8(head);
        if (headStr.indexOf(MSG_GPS) != -1)
        {
             //dosomething.....
        }

        zmq_msg_close(&msg);

        QThread::msleep(20);
    }

    zmq_close(m_pSktMsgControl);
    zmq_close(m_pSktMsgReqCaptureRet);
    zmq_close(m_pSktVideoFrame);

    zmq_ctx_shutdown(context);
原文地址:https://www.cnblogs.com/zhehan54/p/7245824.html