利用ZeroMQ消息队列实现多端口并发数据处理

这里至于ZeroMQ是什么就不过多赘述了,这是基础的知识,不懂的读者可以先去百度一下,这里也提供一个老哥的详细解释,贴上链接大家可以去看看https://www.cnblogs.com/chenny7/p/6245236.html。

先说一下为什么要这样处理,本人是从事新能源电池检测相关工作的,属于工控的领域,有大量的指令下发(通过上位机的控制),指令下发以后会接收到下位机(电源模块)上传的数据,是一对多的模式(一个上位机,面对着多个电源模块)。这里就有一个并发的问题,同时要接收多个电源模块上传的数据,而且是1秒钟一条数据,这个并发量相对来说比较大,为了性能考虑,防止拥塞这里用到了ZeroMQ消息队列,ZMQ对这一块的处理相对合理。

这里只是一个模拟的程序,相当于一个demo,实际开发工程中要比这复制的多,因为上抛的数据还分很多种,比如记录数据,告警数据,针床的数据等等。

用QT做了一个简单的界面,这里的姓名  年龄  学号模拟的是上位机控制指令,当然实际的情况是上位机下发有相对应的协议,这个协议必须是上位机和下位机定义好。

首先是定义初始化上下文,这是ZeroMQ中必须要做的工作。

void ZeroMQComm::InitCtx()
{
    //1.建上下文
    m_ctx = zmq_init(1);
    if (!m_ctx)
    {
        printf("build zmq_init():%s
", zmq_strerror(errno));
        return;
    }
    else
    {
        printf("创建上下文成功!
");
    }
}

然后初始化socket,其实可以把ZMQ看成是对socket的进一步封装,这里用到了3个端口,5000,5001,5002。为什么要用这三个端口呢?每一个端口代表了不同的信息,比如5000端口是作为的PUSH端,相当于一个分发任务的角色,这个demo中

当点击发送指令的时候就会利用这个端口来给PULL端发送数据。5001端口(作为PULL端)是接收及格数据,5002端口(作为PULL端)是接收不及格数据,这两个端口模式的是在实际的项目中的记录数据和告警数据,记录数据是通过5001端口上抛,告

警数据是由5002端口上抛。下面代码中绑定的IP可以忽略,我是按照我测试的环境中来配置IP,读者要进行的时候需要根据自己的实际情况来!

void ZeroMQComm::InitSockets()
{
    //1.创建发布Socket通讯对象
    //该对象用于下中位机发送控制指令
    if((m_sokt5000 = zmq_socket(m_ctx, ZMQ_PUSH)) == NULL)
    {
        printf("%s
", zmq_strerror(errno));
        zmq_close(m_sokt5000);
        zmq_ctx_term(m_ctx);
        return;
     }
    else
    {
        printf("创建发送端口socket成功!
");
    }


    int tmp_bet = zmq_bind(m_sokt5000, "tcp://10.168.205.73:5000");
    //printf("zmq_bind=%d
", tmp_bet);
    if (tmp_bet < 0)
    {
        printf("bind port %s
", zmq_strerror(errno));
        return;
    }
    else
    {
        printf("bind port success!
");
    }
    

    //设置发送超时时间3秒
    int iSendTimeout = 3000;
    if (zmq_setsockopt(m_sokt5000, ZMQ_SNDTIMEO, &iSendTimeout, sizeof(iSendTimeout)) < 0)
    {
        zmq_close(m_sokt5000);
        zmq_ctx_destroy(m_ctx);
        return;
    }

    //2.创建中位机回复Socket
    m_sokt5001 = zmq_socket(m_ctx, ZMQ_PULL);
    zmq_connect(m_sokt5001, "tcp://10.168.205.73:5001");

    //设置接收超时时间5秒
    int iRcvTimeout = 5000;
    if (zmq_setsockopt(m_sokt5001, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(m_sokt5001);
        zmq_ctx_destroy(m_ctx);
        return;
    }


    //3.创建中位机警告信息Socket
    m_sokt5002 = zmq_socket(m_ctx, ZMQ_PULL);
    zmq_connect(m_sokt5002, "tcp://10.168.205.73:5002");

    if (zmq_setsockopt(m_sokt5002, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(m_sokt5002);
        zmq_ctx_destroy(m_ctx);
        return;
    }



}

这里是上面界面中"发送指令"按钮的信号和槽,关于Qt的信号和槽也不赘述了,想要了解的自行去百度。

    connect(m_pBtnSendMedCom, SIGNAL(clicked()), this, SLOT(slot_sendmedcomsg()));

槽函数的实现是获取到姓名 年龄 学号然后通过封装的接口下发(从PUSH端到PULL端),对于每一个QLineEdit都了相应容错处理,不能为空,然后通过QJsonObject以json的形式下发数据,m_VirUpComZMQ就是封装的一个ZMQ的类,在最后通过SendCmd5000()

接口下发数据。

void CVirUpComWgt::slot_sendmedcomsg()
{
    QString GetNameStr = m_pLeditName->text();
    if (GetNameStr == "")
    {
        //printf("姓名不能为空!
");
        QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("姓名不能为空"));
        return;
    }

    QString GetAgeStr = m_pLeditAge->text();

    if (GetAgeStr == "")
    {
        //printf("姓名不能为空!
");
        QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("年龄不能为空"));
        return;
    }

    QString GetStuNoStr = m_pLeditStuno->text();

    if (GetStuNoStr == "")
    {
        //printf("姓名不能为空!
");
        QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("学号不能为空"));
        return;
    }

    QJsonObject basemsg_json;
    basemsg_json.insert("name", GetNameStr.toLocal8Bit().data());
    basemsg_json.insert("age", GetAgeStr.toInt());
    basemsg_json.insert("stuno", GetStuNoStr.toInt());

    QJsonDocument document;
    document.setObject(basemsg_json);
    QByteArray basemsg_array = document.toJson(QJsonDocument::Compact);

    QString BaseMsgJsonStr(basemsg_array);
    qDebug() << QString::fromLocal8Bit("简单的QtJson数据:") << BaseMsgJsonStr;

    //发送指令
    m_VirUpComZMQ->SendCmd5000(BaseMsgJsonStr.toLocal8Bit().data());

    
}

在模拟的这个上位机程序中还要对接收模块做处理,当下发了指令以后会接收到上抛的数据,这些数据会通过5001端口和5002端口来接收,前面已经提到过,它们是PULL端。这里要创建两个线程来专门接收上抛的数据。

void ZeroMQComm::CreateCommThread()
{
    DWORD dwThreadID = 0;
    m_handle5001 = CreateThread(NULL, 0, ZeroMQComm::Recv5001Msg, this, 0, &dwThreadID);
    m_handle5002 = CreateThread(NULL, 0, ZeroMQComm::Recv5002Msg, this, 0, &dwThreadID);
}

定时器的处理是为了定时刷新界面上接收到的数据,表示数据一直在发送中,这个就相当于实际项目中,电源模块不停的给上位机上抛数据。

    m_timer = new QTimer(this);
    connect(m_timer, SIGNAL(timeout()), this, SLOT(slot_updaterecordmsg()));
    m_timer->start(50);

到此,上位机的模拟程序就差不多完成了。一些核心的东西没有贴出来,大概的思路已经给读者梳理情况了,其实也没那么难,自己动手去写一下很快就能明白。

模拟下位机的程序是完全可以倒推出来的。

比如下位机肯定也需要做上下文初始化和socket的初始化。这是ZMQ更古不变的原则,请参考上面模拟上位机的操作。

其次,下位机这边肯定也需要一个线程来接收上位机下发的指令

void CVirMedComZMQ::CreateRcvUpComDataThread()
{
    DWORD dwThreadID = 0;
    m_handle5000 = CreateThread(NULL, 0, CVirMedComZMQ::Recv5000Msg, this, 0, &dwThreadID);
}

接收到了上位机的数据以后直接上抛数据给上位机,这里做的相对简单了一些,只是做了指针的判空处理,实际项目中肯定对于接收到了上位机的数据是要做一定处理的。当接收到的数据不为空的情况下,直接通过5001端口和5002端口发送数据到上位机,

上位机的接收端口也是5001(PULL端)和5002(PULL端)。

DWORD WINAPI CVirMedComZMQ::Recv5000Msg(LPVOID para)
{
    CVirMedComZMQ* VirMedCom = (CVirMedComZMQ*)(para);
    if (!VirMedCom)
    {
        return 0;
    }

    while (true)
    {
        bool isAcceptMsg = VirMedCom->Is5000AcceptMsg();
        void* sokt5000 = VirMedCom->Get5000Socket();
        while (isAcceptMsg && sokt5000)
        {
            char* msg = s_recv(sokt5000);
            if (msg != NULL)
            {
                VirMedCom->RcvUpComData(msg);
                while (true)
                {
                    //通过5001端口发送及格门数数据
                    srand(clock());
                    QString PssNumStr = QString::number(rand() % 100);
                    QJsonObject passnum_json;
                    passnum_json.insert("Pass", PssNumStr.toInt());
                    QJsonDocument pass_document;
                    pass_document.setObject(passnum_json);
                    QByteArray passnum_array = pass_document.toJson(QJsonDocument::Compact);
                    QString PassNumJsonStr(passnum_array);
                    qDebug() << QString::fromLocal8Bit("发送及格门数数据:") << PassNumJsonStr;
                    VirMedCom->Send5001BaseData(PassNumJsonStr.toLocal8Bit().data());

                    //通过5002端口发送不及格门数数据
                    QString FailNumStr = QString::number(rand() % 100);
                    QJsonObject failnum_json;
                    failnum_json.insert("Fail", FailNumStr.toInt());
                    QJsonDocument fail_document;
                    fail_document.setObject(failnum_json);
                    QByteArray failnum_array = fail_document.toJson(QJsonDocument::Compact);
                    QString FailNumJsonStr(failnum_array);
                    qDebug() << QString::fromLocal8Bit("发送不及格门数数据:") << FailNumJsonStr;
                    VirMedCom->Send5002WarnData(FailNumJsonStr.toLocal8Bit().data());
                }
            }
            free(msg);
        }
        s_sleep(1);
    }
}

到此,整个模拟的上下位机已经完成,程序中还有很多地方不完善,读者有问题可以一起讨论。

原文地址:https://www.cnblogs.com/joorey/p/14785608.html