python rabbitMQ 实现RPC

客户端:

'''
RPC: remote procedure call

远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)

简单来说, 一个服务既是生产者又是消费者
'''
import  pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        #声明接收的queue
        self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue)

    def on_response(self,ch,method,props,body):
        #只有接收的id是自己生成的id时才会将body 赋值给 response  ,防止接收到其他客户端的结果
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self,n):
        self.response=None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(reply_to=self.callback_queue,#告诉服务器 客户端监听返回值的queue 的name
                                                                   correlation_id=self.corr_id),
                                   body=str(n))

        while self.response is None:
            #接收消息
           # self.channel.start_consuming() 与self.connection.process_data_events() 的区别是前者会阻塞 而后者不会阻塞
            self.connection.process_data_events()
            print("no message")
            time.sleep(0.5)
        return int(self.response)

fibonacci_rpc  = FibonacciRpcClient()

print("request fib(30)")
response = fibonacci_rpc.call(6)
print("result:",response)

服务器:

'''
接收client 消息, 处理 , 返回

'''

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n==0:
        return 0
    elif n==1:
        return 1
    else:
        return fib(n-1)+fib(n-2)

def on_request(ch,method,props,body):
    n = int(body)
    print("fib : ",n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #服务器端将消息发送到客户端监听的queue中, 这个参数是客户端传递过来的
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),body = str(response))

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print('wait rpc request')
channel.start_consuming()
原文地址:https://www.cnblogs.com/gaizhongfeng/p/8099310.html