pika和rabbitMQ实现rpc代码

客户端向服务端发送命令,然后服务端去执行,服务端代码会保持一种阻塞态,当消息队列中有消息存在的时候,就立刻取到命令并执行,如果消息队列中没有命令,就保持阻塞。

客户端代码:

#!/usr/bin/python3
# _*_ coding: utf-8 _*_

"""
@Software: PyCharm
@File: RPCCmd.py
@Author: 高留柱
@E-mail: liuzhu.gao@foxmail.com
@Time: 2020/7/6 15:49
"""
import pika
import uuid
import json


class CMD:
    def __init__(self):
        """
        初始化函数的时候就建立管道,接收服务端的任务结果
        """
        credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    "it.sucheon.com", credentials=credentials, virtual_host="qpm"))

        self.channel = self.connection.channel()

        # 建立随机管道,用于告诉服务端,任务的结果放在这个随机管道中
        result = self.channel.queue_declare('', exclusive=True)

        self.callback_queue = result.method.queue

        # 从随机管道中取任务
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,  # 回调函数
            auto_ack=True,
        )

    # 收到任务结果的回调函数
    def on_response(self, ch, method, props, body):
        # 如果客户端的随机字符串和服务端发送过来的随机字符串相等,就代表着该结果属于该任务
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, cmd, port):
        """
        :param cmd:
        :return:
        exchange: 交换器
        routing_key: 是管道的名字
        reply_to: 告诉服务端执行完命令把结果丢到哪个管道中
        """
        # TODO: 根据port查询UUID
        self.response = None
        self.corr_id = str(uuid.uuid4())  # 唯一标识符, 用于标识服务端的结果和客户端命令之间的联系,防止服务端和客户端命令和结果不对等
        self.channel.basic_publish(exchange="",
                                   routing_key=str(port),
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(cmd))

        # 最多等待10秒,10秒内有值立刻返回
        self.connection.process_data_events(time_limit=30)  # 检查队列中有没有新消息,没加time_limit代表不会阻塞,加了之后会进入阻塞态
        if self.response is None:
            # 如果服务端没有返回值的话,将删除任务管道,以免积累消息,但会导致服务端脚本停止堵塞态,结束运行
            # 如果不删除任务管道的话,也没啥大问题,就是当服务端重新连接rabbitMQ的时候,会把之前没接收到的命令全部执行一遍,但接收结果的管道并不会积压
            self.channel.queue_delete(queue=str(port))
            return {"status": 1, "stdout": "", "stderr": "连接超时,请重试!".encode('utf-8')}
        return json.loads(self.response)

服务端代码:

服务端可以选择使用supervisor服务加入守护进程中,确保脚本一直运行

import pika
import subprocess
import json
import os
import time


def execute_cmd(cmd):
    '''

        执行终端指令, 并返回输出流

    '''
    timeout = 10
    try:
        p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, preexec_fn=os.setsid, shell=True, close_fds=True)
        p.wait(timeout)
        out = p.stdout.read().decode("utf-8").strip("	
 ")
        err = p.stderr.read().decode("utf-8").strip("	
 ")
        print(p.returncode)
        return json.dumps({'status': p.returncode, 'stdout': out, 'stderr': err})
    except Exception as e:
        print(e)
        return json.dumps({'status': 1, 'stdout': '', 'stderr': str(e)})
    finally:
        try:
            p.stdout.flush()
            p.stderr.flush()
            p.stdout.close()
            p.stderr.close()
            os.killpg(p.pid, subprocess.signal.SIGKILL)
        except:
            pass


def getPort():
    port = json.loads(execute_cmd("/bin/echo $PORT")).get('stdout')
    if not port:
        port = json.loads(execute_cmd('/bin/hostname')).get('stdout').split('-')[-1]
    return port 


class RPCCmd:

    def __init__(self):
        # 获取端口号
        self.port = getPort()
        print("端口号:", f"4{self.port}4")
        self.credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

    def on_request(self, ch, method, props, body):
        cmd = body.decode('utf-8')
        print(cmd)
        response = execute_cmd(cmd)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=response)
        ch.basic_ack(delivery_tag=method.delivery_tag)


    def start(self):
        try:
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                "it.sucheon.com", credentials=self.credentials, virtual_host="qpm",
            ))
            # 在socket通道之上建立了rabbit协议的通道
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=self.port)
        except Exception as e:
            
            time.sleep(5)
            self.start()

        self.channel.basic_qos(prefetch_count=4)  # 消费者一次只取一个任务,谁先完成谁继续取,处理不完别来找我
        self.channel.basic_consume(queue=self.port, on_message_callback=self.on_request)
        self.channel.start_consuming()


rpc_obj = RPCCmd()
rpc_obj.start()
原文地址:https://www.cnblogs.com/cnhyk/p/13474326.html