python RabbitMQ gRPC 实践经验

RabbitMQ

RabbitMQ主要参考 Python RabbitMQ使用示例,例子我仅仅做了微调。摘要如下。

印象中erlang和RabbitMQ是通过apt安装的。

sudo apt install -y rabbitmq-server erlang
pip install pika
  • customer.py 
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
import pika
import settings
 
parameters = pika.ConnectionParameters(settings.RabbitMQHostName)
connection = pika.BlockingConnection(parameters)
 
# 创建通道
channel = connection.channel()
channel.queue_declare(queue=settings.QueueName)
 
 
def callback(ch, method, properties, body):
    print('[x] Received %r' % (body.decode(),))
 
# 告诉rabbitmq使用callback来接收信息
channel.basic_consume(queue = settings.QueueName, on_message_callback = callback)
 
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • produce.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
import pika
import random
import settings
 
# 新建连接,rabbitmq安装在本地则hostname为'localhost'
parameters = pika.ConnectionParameters(settings.RabbitMQHostName)
connection = pika.BlockingConnection(parameters)
 
# 创建通道
channel = connection.channel()
# 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue=settings.QueueName)
 
number = random.randint(1, 1000)
body = 'hello world:%s' % number
# 交换机; 队列名,写明将消息发往哪个队列; 消息内容
# routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
channel.basic_publish(exchange='', routing_key=settings.QueueName, body=body)
print (' [x] Sent %s' % body)
connection.close()
  • settings.py
RabbitMQHostName = 'localhost'
QueueName = 'testQueue'

gRPC

关于proto格式,参见 Protobuf 语法指南

参考 Python gRPC小白使用示例,改动较大,摘录如下

  • 安装gRPC
    pip install grpcio protobuf grpcio-tools

    因为用的是anaconda的python,实际上不安装也没事

生成文件
example/data.proto

syntax = "proto3";
package example;
service FormatData {
  rpc DoFormat(Data) returns (Data){}
}
message Data {
  string text = 1;
}

在目录example下,命令:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=.  ./data.proto

server.py

#!/usr/bin/env python
#coding = utf-8

import grpc
import time
from concurrent import futures
import sys

sys.path.append('..')
# sys.path

from example import data_pb2, data_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = 'localhost'
_PORT = '8080'


class FormatData(data_pb2_grpc.FormatDataServicer):
    def DoFormat(self, request, context):
        str = request.text
        return data_pb2.Data(text=str.upper())


def serve():
    grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
    data_pb2_grpc.add_FormatDataServicer_to_server(FormatData(), grpcServer)
    grpcServer.add_insecure_port(_HOST + ':' + _PORT)
    grpcServer.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        grpcServer.stop(0)


if __name__ == '__main__':
    serve()

client.py

#!/usr/bin/env python
#coding = utf-8

import grpc
import sys
import sys

sys.path.append('..')

from example import data_pb2, data_pb2_grpc

_HOST = 'localhost'
_PORT = '8080'


def run():
    conn = grpc.insecure_channel(_HOST + ':' + _PORT)
    client = data_pb2_grpc.FormatDataStub(channel=conn)
    response = client.DoFormat(data_pb2.Data(text='hello,world!'))
    print("received: " + response.text)


if __name__ == '__main__':
    run()
  • 测试

依次运行server/client,即可看到效果

原文地址:https://www.cnblogs.com/tangxiaosheng/p/14784793.html