ZeroMQ_11 ROUTER-DEALER路由

ROUTER-DEALER是一种最简单的路由方式。将ROUTER和多个DEALER相连接,用一种合适的算法来决定如何分发消息给DEALER。DEALER可以是一个黑洞(只负责处理消息,不给任何返回)、代理(将消息转发给其他节点)或是服务(会发送返回信息)。

如果你要求DEALER能够进行回复,那就要保证只有一个ROUTER连接到DEALER,因为DEALER并不知道哪个特定的节点在联系它,如果有多个节点,它会做负载均衡,将消息分发出去。但如果DEALER是一个黑洞,那就可以连接任何数量的节点。

ROUTER-DEALER路由可以用来做什么呢?如果DEALER会将它完成任务的时间回复给ROUTER,那ROUTER就可以知道这个DEALER的处理速度有多快了。因为ROUTER和DEALER都是异步的套接字,所以我们要用zmq_poll()来处理这种情况。

下面例子中的两个DEALER不会返回消息给ROUTER,我们的路由采用加权随机算法:发送两倍多的信息给其中的一个DEALER。

 rtdealer.c

// 2015-02-27T11:40+08:00
//  ROUTER-to-DEALER example

#include "../zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10

static void *
worker_task(void *args)
{
    void *context = zmq_ctx_new();
    void *worker = zmq_socket(context, ZMQ_DEALER);

#if (defined (WIN32))
    s_set_id(worker, (intptr_t)args);
#else
    s_set_id(worker);          //  Set a printable identity
#endif

    zmq_connect (worker, "tcp://localhost:5671");

    int total = 0;
    while (1) {
        //  Tell the broker we're ready for work
        s_sendmore(worker, "");
        s_send(worker, "Hi Boss");

        //  Get workload from broker, until finished
        free(s_recv(worker));     //  Envelope delimiter
        char *workload = s_recv(worker);
        //  .skip
        int finished = (strcmp(workload, "Fired!") == 0);
        free(workload);
        if (finished) {
            printf("Completed: %d tasks
", total);
            break;
        }
        total++;

        //  Do some random work
        s_sleep(randof(500) + 1);
    }
    zmq_close(worker);
    zmq_ctx_destroy(context);
    return NULL;
}

//  .split main task
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. Each thread has its own
//  context and conceptually acts as a separate process.

int main(void)
{
    void *context = zmq_ctx_new();
    void *broker = zmq_socket(context, ZMQ_ROUTER);

    zmq_bind(broker, "tcp://*:5671");
    srandom((unsigned)time(NULL));

    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create(&worker, NULL, worker_task, (void *)(intptr_t)worker_nbr);
    }
    //  Run for five seconds and then tell workers to end
    int64_t end_time = s_clock() + 5000;
    int workers_fired = 0;
    while (1) {
        //  Next message gives us least recently used worker
        char *identity = s_recv(broker);
        s_sendmore(broker, identity);
        free(identity);
        free(s_recv(broker));     //  Envelope delimiter
        free(s_recv(broker));     //  Response from worker
        s_sendmore(broker, "");

        //  Encourage workers until it's time to fire them
        if (s_clock() < end_time)
            s_send(broker, "Work harder");
        else {
            s_send(broker, "Fired!");
            if (++workers_fired == NBR_WORKERS)
                break;
        }
    }
    zmq_close(broker);
    zmq_ctx_destroy(context);
    return 0;
}
//  .until

out:

Completed: 16 tasks
Completed: 20 tasks
Completed: 18 tasks
Completed: 17 tasks
Completed: 21 tasks
Completed: 15 tasks
Completed: 18 tasks
Completed: 21 tasks
Completed: 22 tasks
Completed: 18 tasks
原文地址:https://www.cnblogs.com/vczf/p/12942635.html