ZeroMQ_12 请求应答---LRU模式

// 2015-02-27T11:40+08:00
//  ROUTER-to-DEALER example

#include "../zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10

static void *
worker_task(void *args)
{
    void *context = zmq_ctx_new();
    void *worker = zmq_socket(context, ZMQ_REQ);

#if (defined (WIN32))
    s_set_id(worker, (intptr_t)args);
#else
    s_set_id(worker);          //  Set a printable identity
#endif

    zmq_connect (worker, "ipc://routing.ipc");

    int total = 0;
    while (1) {
        //  Tell the broker we're ready for work
        s_send(worker, "ready");

        char *workload = s_recv(worker);

        //  .skip
        int finished = (strcmp(workload, "END") == 0);
        free(workload);
        if (finished) {
            printf("Completed: %d tasks
", total);
            break;
        }
        total++;

        //  Do some random work
        s_sleep(randof(1000) + 1);
    }
    zmq_close(worker);
    zmq_ctx_destroy(context);
    return NULL;
}

//  .split main task
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. Each thread has its own
//  context and conceptually acts as a separate process.

int main(void)
{
    void *context = zmq_ctx_new();
    void *broker = zmq_socket(context, ZMQ_ROUTER);

    zmq_bind(broker, "ipc://routing.ipc");
    srandom((unsigned)time(NULL));

    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create(&worker, NULL, worker_task, (void *)(intptr_t)worker_nbr);
    }

    int task_nbr;
    for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
        // 最近最少使用的worker就在消息队列中
        char *address = s_recv(broker);
        char *empty = s_recv(broker);
        free(empty);
        char *ready = s_recv(broker);
        free(ready);
 
        s_sendmore(broker, address);
        s_sendmore(broker, "");
        s_send(broker, "This is the workload");
        free(address);
    }
     // 通知所有REQ套接字结束工作
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        char *address = s_recv(broker);
        char *empty = s_recv(broker);
        free(empty);
        char *ready = s_recv(broker);
        free(ready);
 
        s_sendmore(broker, address);
        s_sendmore(broker, "");
        s_send(broker, "END");
        free(address);
    }
    zmq_close(broker);
    zmq_ctx_destroy(context);
    return 0;
}
//  .until

out:

Completed: 9 tasks
Completed: 10 tasks
Completed: 9 tasks
Completed: 11 tasks
Completed: 9 tasks
Completed: 12 tasks
Completed: 12 tasks
Completed: 10 tasks
Completed: 8 tasks
Completed: 10 tasks

关于以上代码的几点说明:

  • 我们不需要像前一个例子一样等待一段时间,因为REQ套接字会明确告诉ROUTER它已经准备好了。

  • 我们使用了zhelpers.h提供的s_set_id()函数来为套接字生成一个可打印的字符串标识,这是为了让例子简单一些。在现实环境中,REQ套接字都是匿名的,你需要直接调用zmq_recv()和zmq_send()来处理消息,因为s_recv()和s_send()只能处理字符串标识的套接字。

  • 更糟的是,我们使用了随机的标识,不要在现实环境中使用随机标识的持久套接字,这样做会将节点消耗殆尽。

  • 如果你只是将上面的代码拷贝过来,没有充分理解,那你就像是看到蜘蛛人从屋顶上飞下来,你也照着做了,后果自负吧。

在将消息路由给REQ套接字时,需要注意一定的格式,即地址-空帧-消息:

原文地址:https://www.cnblogs.com/vczf/p/12955003.html