笔记:《ZeroMQ》

消息传递模式

分布式设计需要引入中间件

问题导入

(摘自官网教程:《ZeroMQ:The Dynamic Discovery Problem》):

设计原则:中间层应当是“无状态”(stateless)的

所谓“无状态”,是指通讯在下一次连接或传递消息时,并不知道之前是否已连接,所以需要携带本次通讯所需的全部属性和信息,而不能从之前的连接中继承任何数据。相对的“有状态”诸如TCP协议,当建立连接后,TCP通讯的状态变为xxx,当断开连接时,同样需要从xxx变为yyy。有状态连接,其消息传递是建立在之前的若干操作基础上的,故而可以省略很多重复数据或状态信息,但缺点是,该状态需要维护并在每次的消息积累中更新当前状态。

我们说HTTP协议是无状态的,而它的底层协议TCP却是有状态的:这是指,HTTP通讯双方并不了解之前是否已经完成了连接,每一次传递的消息内容都是作为一次新的连接进行数据封装的。

对于一般的MQ中间件,通常是有状态的,这就意味着,消息中间件需要记录每一个连接的状态——message brokers are greedy things; in their role as central intermediaries, they become too complex, too stateful, and eventually a problem.

ZMQ更倾向于HTTP代理服务器的模式:它存在,但除了转发,并没有任何特殊作用或影响。为了实现无状态的Broker/Dispatcher/Forward/Proxy,ZMQ将连接的状态附着于请求消息中或是某些共享存储中,例如一个数据库中(于是连接的双方可以从中读取到当前状态并实时更新,而这不再需要中间件来维护)。

设计原则:尽量减少静态部件

Too many static pieces are like liquid concrete: knowledge is distributed and the more static pieces you have, the more effort it is to change the topology.

解决方案

自己编写ZMQ中间件

//  Simple request-reply broker

#include "zhelpers.h"

int main (void)
{
    //  Prepare our context and sockets
    void *context = zmq_ctx_new ();
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    void *backend  = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (frontend, "tcp://*:5559");
    zmq_bind (backend,  "tcp://*:5560");

    //  Initialize poll set
    zmq_pollitem_t items [] = {
        { frontend, 0, ZMQ_POLLIN, 0 },
        { backend,  0, ZMQ_POLLIN, 0 }
    };
    //  Switch messages between sockets
    while (1) {
        zmq_msg_t message;
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {
            while (1) {
                //  Process all parts of the message
                zmq_msg_init (&message);
                zmq_msg_recv (&message, frontend, 0);
                int more = zmq_msg_more (&message);
                zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);
                zmq_msg_close (&message);
                if (!more)
                    break;      //  Last message part
            }
        }
        if (items [1].revents & ZMQ_POLLIN) {
            while (1) {
                //  Process all parts of the message
                zmq_msg_init (&message);
                zmq_msg_recv (&message, backend, 0);
                int more = zmq_msg_more (&message);
                zmq_msg_send (&message, frontend, more? ZMQ_SNDMORE: 0);
                zmq_msg_close (&message);
                if (!more)
                    break;      //  Last message part
            }
        }
    }
    //  We never get here, but clean up anyhow
    zmq_close (frontend);
    zmq_close (backend);
    zmq_ctx_destroy (context);
    return 0;
}

使用ZMQ内置的代理功能

import zmq

def main():
    """ main method """

    context = zmq.Context()

    # Socket facing clients
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5559")

    # Socket facing services
    backend  = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5560")

    zmq.proxy(frontend, backend)

    # We never get here…
    frontend.close()
    backend.close()
    context.term()

if __name__ == "__main__":
    main()

负载均衡模式

原文地址:https://www.cnblogs.com/brt3/p/10147941.html