python zmq(ZeorMQ)

pip install pyzmq

ZeroMQ位于OSI模型的表示层,使用后台异步线程完成消息的接收和发送,大大简化了编程的复杂度。

传统的TCP Socket连接时1-1的,可以认为"1个socket=1个连接",每个线程独立维护一个socket,但在zmq中实现了1-n,m-n的连接模式,一个zmq socket维护一组连接,用户只可以操作socket,而不可以操作这些连接。zmq socket特殊的机制去区分多个连接,用户不需要关心。

另外,因为zmq socket使用后台异步线程,因此zmq不允许在线程之间共享socket,不然会报错

zmq.error.ZMQError: Operation cannot be accomplished in current state

三种模式:

一、Request-Reply模式

一问一答,客户端request,服务器reply

哪方先先启动都可以,客户端中途断掉和服务端在reply后断掉都无所谓。

server:

import zmq

context = zmq.Context()
socket = context.socket(zmq.REP) # 设置socket的类型
socket.bind('tcp://*:15000') # 端口绑定

message = socket.recv() # 收到的是byte类型
print(message)

socket.send_string('copy!')

client:

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://localhost:15000')
socket.send_string('request')

message = socket.recv()
print(message)

二、Publisher-Subscriber模式

一对多,一个发布者,若干订阅者。订阅者端可以通过设置过滤器过滤数据。

Publisher

import zmq
from random import randrange
 
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:15000")
 
while True:
    socket.send_string("message")

Subscriber

import sys
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:15000")
 
# 过滤器
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002"
socket.setsockopt(zmq.SUBSCRIBE, zip_filter)

for i in range(5):
    msg = socket.recv()
    print msg 
 
print(msg)

三、Push-Pull模式

服务端push,所有连接在服务端的客户端pull,不同的是只有一个客户端可以pull,它们之间存在竞争,具体机制不需要了解,此模式类似于负载均衡。

Server

import zmq

context = zmq.Context()
server = context.socket(zmq.PUSH)
server.bind('tcp://*:15000')

while True:
    server.send_string('Push')

Client

import zmq

context = zmq.Context()
client = context.socket(zmq.PULL)
client.connect('tcp://localhost:15000')
 
while True:
    msg = client.recv()

问题1:如果客户端既需要pull模式 又需要subscriber模式的socket

import zmq
 
context = zmq.Context()
 
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
 
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
 
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
 
while True:
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break
 
    if receiver in socks:
        message = receiver.recv()
 
    if subscriber in socks:
        message = subscriber.recv()

问题2:在Request-Reply模式下,如果服务端压力过大,如何给服务端负载均衡

 可以通过增加中间代理,来自动分摊来自客户端的Request。

Server

import zmq
 
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:15000")
 
while True:
    message = socket.recv()
    socket.send_string("msg")

Urgent

import zmq
 
# Prepare our context and sockets
context = zmq.Context()
 
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind("tcp://*:15001")
backend.bind("tcp://*:15002")
 
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
 
while True:
    socks = dict(poller.poll())
    
    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)
    
    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

Client

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:15001")
 
for request in range(1,11):
    socket.send_string("Hello")
    message = socket.recv()
socket.close()
context.term()
原文地址:https://www.cnblogs.com/LMIx/p/12677787.html