python中RabbitMQ的使用(远程过程调用RPC)

在RabbitMQ消息队列中,往往接收者、发送者不止是一个身份。例如接接收者收到消息并且需要返回给发送者。

此时接收者、发送者的身份不再固定!

我们来模拟该情形:

假设有客户端client,服务端server。

我们需要从客户端发送数据,通过服务端的计算后再返回给客户端。

client.py

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 import pika
 4 import uuid
 5 
 6 class Client(object):
 7     def __init__(self):
 8         hostname = '192.168.1.133'
 9         parameters = pika.ConnectionParameters(hostname)
10         self.connection = pika.BlockingConnection(parameters)
11 
12         self.channel = self.connection.channel()
13         # 定义接收返回消息的队列
14         result = self.channel.queue_declare(exclusive=True)
15         self.callback_queue = result.method.queue
16 
17         self.channel.basic_consume(self.on_response,
18                                    no_ack=True,
19                                    queue=self.callback_queue)
20         self.response = None
21         self.corr_id = ''
22 
23     # 定义接收到返回消息的处理方法
24     def on_response(self, ch, method, props, body):
25         if self.corr_id == props.correlation_id:
26             self.response = body
27 
28     def request(self, n):
29         # self.response = None
30         self.corr_id = str(uuid.uuid4())
31         # 发送计算请求,并声明返回队列
32         self.channel.basic_publish(exchange='',
33                                    routing_key='count_queue',
34                                    properties=pika.BasicProperties(
35                                        reply_to=self.callback_queue,
36                                        correlation_id=self.corr_id,
37                                    ),
38                                    body=str(n))
39         # 接收返回的数据
40         while self.response is None:
41             self.connection.process_data_events()
42         return int(self.response)
43 
44 client = Client()
45 
46 print " [*] Requesting fib(30)"
47 response = client.request(30)
48 print " [*] Got fib(30)= %r" % response

server.py

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 import pika
 4 
 5 hostname = '192.168.1.133'
 6 parameters = pika.ConnectionParameters(hostname)
 7 connection = pika.BlockingConnection(parameters)
 8 
 9 # 创建通道
10 channel = connection.channel()
11 # 声明队列
12 channel.queue_declare(queue='count_queue')
13 print ' [*] Waiting for n'
14 
15 # 算法
16 def fib(n):
17     if n == 0:
18         return 0
19     elif n == 1:
20         return 1
21     else:
22         return fib(n - 1) + fib(n - 2)
23 
24 # 设置回调函数
25 def on_request(ch, method, props, body):
26     n = int(body)
27     response = fib(n)
28     print " [*] fib(%s)" % n, '=', response
29     # 将结果反馈
30     ch.basic_publish(exchange='',
31                      routing_key=props.reply_to,
32                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
33                      body=str(response))
34     ch.basic_ack(delivery_tag=method.delivery_tag)
35 
36 #公平调度
37 channel.basic_qos(prefetch_count=1)
38 channel.basic_consume(on_request, queue='count_queue')
39 print " [*] Awaiting count_queue requests"
40 channel.start_consuming()

结果如下:

服务端:                                                                         客户端:

                                 

原文地址:https://www.cnblogs.com/jfl-xx/p/7346464.html