Router-Dealer代理模式

属于是在ZMQ指南上的原代码,跑得一直有问题,先贴上Okay的代码。

 1 //
 2 //  Hello world 客户端
 3 //  连接REQ套接字至 tcp://localhost:5559 端点
 4 //  发送Hello给服务端,等待World应答
 5 //
 6 #include "zhelpers.h"
 7 
 8 int main (void)
 9 {
10     void *context = zmq_init (1);
11 
12     //  用于和服务端通信的套接字
13     void *requester = zmq_socket (context, ZMQ_REQ);
14     zmq_connect (requester, "tcp://localhost:5060");
15 
16     int request_nbr;
17     for (request_nbr = 0; request_nbr != 10; request_nbr++) {
18         s_send (requester, "From Request");
19         char *string = s_recv (requester);
20         printf ("收到应答 %d [%s]
", request_nbr, string);
21         free (string);
22     }
23     zmq_close (requester);
24     zmq_term (context);
25     return 0;
26 }
Request 端
 1 //
 2 //  Hello World 服务端
 3 //  连接REP套接字至 tcp://*:5560 端点
 4 //  接收Hello请求,返回World应答
 5 //
 6 #include "zhelpers.h"
 7 
 8 int main (void)
 9 {
10     void *context = zmq_init (1);
11 
12     //  用于何客户端通信的套接字
13     void *responder = zmq_socket (context, ZMQ_REP);
14     zmq_connect (responder, "tcp://localhost:5070");
15 
16     while (1) {
17         //  等待下一个请求
18         char *string = s_recv (responder);
19         printf ("Received request: [%s]
", string);
20         free (string);
21 
22         //  做一些“工作”
23 
24         //  返回应答信息
25         s_send (responder, "World");
26     }
27     //  程序不会运行到这里,不过还是做好清理工作
28     zmq_close (responder);
29     zmq_term (context);
30     return 0;
31 }
Reply端
 1 //
 2 //  简易请求-应答代理
 3 //
 4 #include "zhelpers.h"
 5 
 6 int main(void)
 7 {
 8     //  准备上下文和套接字
 9     void *context = zmq_init(1);
10     void *frontend = zmq_socket(context, ZMQ_ROUTER);
11     void *backend  = zmq_socket(context, ZMQ_DEALER);
12     zmq_bind(frontend, "tcp://*:5060");
13     zmq_bind(backend,  "tcp://*:5070");
14 
15     //  初始化轮询集合
16     zmq_pollitem_t items [] = {
17         { frontend, 0, ZMQ_POLLIN, 0 },
18         { backend,  0, ZMQ_POLLIN, 0 }
19     };
20     //  在套接字间转发消息
21     while(1) {
22         zmq_msg_t message;
23         int64_t more;           //  检测多帧消息
24 
25         zmq_poll(items, 2, -1);
26         if(items [0].revents & ZMQ_POLLIN) {
27             while(1) {
28                 //  处理所有消息帧
29                 zmq_msg_init(&message);
30                 zmq_recvmsg(frontend, &message, 0);
31                 size_t more_size = sizeof(more);
32                 zmq_getsockopt(frontend, ZMQ_RCVMORE, &more, &more_size);
33                 zmq_sendmsg(backend, &message, more? ZMQ_SNDMORE: 0);
34                 zmq_msg_close(&message);
35                 if(!more)
36                     break;      //  最后一帧
37             }
38         }
39         if(items [1].revents & ZMQ_POLLIN) {
40             while(1) {
41                 //  处理所有消息帧
42                 zmq_msg_init(&message);
43                 zmq_recvmsg(backend, &message, 0);
44                 size_t more_size = sizeof(more);
45                 zmq_getsockopt(backend, ZMQ_RCVMORE, &more, &more_size);
46                 zmq_sendmsg(frontend, &message, more? ZMQ_SNDMORE: 0);
47                 zmq_msg_close(&message);
48                 if(!more)
49                     break;      //  最后一帧
50             }
51         }
52     }
53     //  程序不会运行到这里,不过还是做好清理工作
54     zmq_close(frontend);
55     zmq_close(backend);
56     zmq_term(context);
57     return 0;
58 }
Agent 代理

这一切都没有问题,但是如果我们将Agent当中的void* context =  zmq_init(1)挪一个地方,例如以下代码所示:

 1 ZMQHelper::ZMQHelper()
 2 {
 3     this->context = zmq_init(1); //在构造函数里面做init
 4     assert(this->context);
 5 }
 6 
 7 void ZMQHelper::RouteMessage(RouteFunc func, unsigned workerNumber)
 8 {
 9 //    void* ctx = zmq_init(1);
10     void *frontend = zmq_socket(this->context, ZMQ_ROUTER);
11     void *backend  = zmq_socket(this->context, ZMQ_DEALER);
12 
13         // other code ...
14 }
15 
16 //调用
17 ZMQHelper helper;
18 helper.RouteMessage(msgProcess,1);

如果这样使用,Front收到的消息请求在3~4个不等,且backend无法收到任何消息。这个问题折腾了半天,有待进一步发现问题。

原文地址:https://www.cnblogs.com/kevingeek/p/3803436.html