【python】-- RabbitMQ RPC模型

RabbitMQ RPC模型

RPC(remote procedure call)模型说通俗一点就是客户端发一个请求给远程服务端,让它去执行,然后服务端端再把执行的结果再返回给客户端。

1、服务端

import pika

#创建socket实例,声明管道,声明queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue="rpc_queue")


def fib(n):
    """
    斐波那契数列
    :param n:
    :return:
    """
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):  # props 是客户端发过来的消息
    n = int(body)
    print("fib(%s)" % n)
    response = fib(n)
    # 发布消息
    ch.basic_publish(exchange="",
                     routing_key=props.reply_to,  # props.reply_to从客户端取出双方约定好存放返回结果的queue
                     properties=pika.BasicProperties  # 定义一些基本属性
                     (correlation_id=props.correlation_id),  # props.correlation_id 从客户端取出当前请求的ID返回给客户端做验证
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认消息被消费

channel.basic_qos(prefetch_count=1)  # 每次最多处理一个客户端发过来的消息
# 消费消息
channel.basic_consume(on_request,  # 回调函数
                      queue="rpc_queue")

print("Awaiting RPC requests")
channel.start_consuming()

  

2、客户端

import pika
import uuid
import time


class FibonacciRpcClient(object):
    """
    斐波那契数列rpc客户端
    """

    def __init__(self):
        """
        定义好创建socket实例、声明管道、声明随机产生的唯一queue、消费信息的静态变量
        """
        self.connection = pika.BlockingConnection(pika.ConnectionParameters
                                                  (host="localhost"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        print("---->", method, props)
        # 当服务端返回的id跟当初请求的id一致时,再去读取服务端发送的信息保持数据的一致性
        if self.corr_id == props.correlation_id:  # 当服务端返回的id跟当初请求的id一致时,保持数据的一致性
           self.response = body


    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.publish(exchange="",
                             routing_key="rpc_queue",  # 双方的request所用的queue
                             properties=pika.BasicProperties(  # 定义基本属性
                                 reply_to=self.callback_queue,  # 定义客户端服务端双方response的所用的Q
                                 correlation_id=self.corr_id),  # 定义这次request的唯一ID
                             body=str(n))
        while self.response is None:
            self.connection.process_data_events()  # 非 阻塞版的start_consumer()
            print("no msg....")
            time.sleep(0.5)
        return int(self.response)

if __name__ == "__main__":
    fibonacci_rpc = FibonacciRpcClient()
    print("Requesting fib(8)")
    response = fibonacci_rpc.call(8)
    print("Got %r" % response)

3、输出

服务端:

 Awaiting RPC requests
 fib(8)

  

客户端:

Requesting fib(8)
no msg....
----> <Basic.Deliver(['consumer_tag=ctag1.cf2e7983c7d840db8c68f4571472c18d', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=amq.gen-ezXgs0tRO5SldZeRH97VPw'])> <BasicProperties(['correlation_id=601e860d-c93d-4c94-959a-3a39be177f7c'])>
no msg....
Got 21

  

原文地址:https://www.cnblogs.com/Keep-Ambition/p/8054285.html