Python开发【项目】:RPC异步执行命令(RabbitMQ双向通信)

RPC异步执行命令
需求:
  • 利用RibbitMQ进行数据交互
  • 可以对多台服务器进行操作
  • 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
  • 实现异步操作

本节涉及最多的还是rabbitmq通信原理知识,要求安装rabbitmq服务

程序用广播topic模式做更好 

程序目录结构:

程序简介:

复制代码
# 异步rpc程序


## 1、需求
- [ ] 利用RibbitMQ进行数据交互
- [ ] 可以对多台服务器进行操作
- [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
- [ ] 实现异步操作

## 备注

- [ ] RabbitMQ队列名:
                    ①执行命令时,队列名为服务器端的IP
                    ②查询数据时,用的是回调时随机生成的callback_queue名
- [ ] threading多线程:
                    实现命令执行后不等待执行结果,依然可以输入新的指令

- [ ] 执行命令格式:
                 -->>run "dir" host 192.168.5.107 127.0.0.1
                        dir     server端要执行的命令
                        host    host后可跟一个或多个可以通过rabbitMQ的服务器地址

- [ ] 查看后台所有的TASK_ID信息:
                 -->>check_all
     显示结果样式:TASK_ID【76786】    HOST【192.168.5.107】    COMMAND【dir】
                  TASK_ID【10307】    HOST【127.0.0.1】    COMMAND【dir】

- [ ] 查看TASK_ID对应的执行结果:
                 -->>check_task 10307
                         10307 为check_all查到的TASK_ID
复制代码

程序流程图:

服务器端:

复制代码
#!/usr/bin/env python
# -*- coding:utf-8 -*-

# !/usr/bin/env python
# -*- coding:utf-8 -*-


import pika
import os

class Server(object):
    def __init__(self,rabbitmq,queue_name):
        self.queue_name = queue_name
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=rabbitmq))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

    def handle(self,command):
        command = command.decode()
        print(command,type(command))
        message = os.popen(command).read()
        if not message:
            message = "Wrong Command"
        return message

    def on_request(self,ch, method, props, body):
        response = self.handle(body)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,  # 回信息队列名
                         properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def start(self):
        self.channel.basic_consume(self.on_request,
                                   queue=self.queue_name)

        print(" [x] Awaiting RPC requests")
        self.channel.start_consuming()


if __name__ == "__main__":
    rabbitmq = "localhost"      #rabbitmq服务器地址
    queue_name = "192.168.20.22"    #queue_name为本地ip地址
    server = Server(rabbitmq,queue_name)
    server.start()
复制代码

客户端:

bin目录:

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import os
import platform


#添加BASE_DIR,添加顶级目录到路径中,方便调用其他目录模块
if platform.system() == 'Windows':
    print(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
    BASE_DIR = '\'.join(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
else:
    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])


#加载环境变量
sys.path.append(BASE_DIR)
from conf import settings
from core import main

if __name__ == '__main__':
    obj = main.Handler()
    obj.start()

conf目录:

#!/usr/bin/env python
#-*- coding:utf-8 -*-


import os
import sys
import platform


if platform.system() == 'Windows':
    BASE_DIR = '\'.join(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
    school_dbpaths = os.path.join(BASE_DIR,'school_db')

else:
    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
    school_dbpaths =os.path.join(BASE_DIR, 'school_db')


#rabbitmq服务地址ip
RabbitMQ_IP = 'localhost'

core目录

from 作业.rpc_client.conf import settings
from 作业.rpc_client.modules.client import Client
import random,time
import threading

class Handler(object):
    def __init__(self):
        self.information = {}   # 后台进程信息

    def check_all(self,*args):
        '''查看所有task_id信息'''
        time.sleep(2)
        for key in self.information:
            print("TASK_ID【%s】	HOST【%s】	COMMAND【%s】"%(key,self.information[key][0],
                                                                    self.information[key][1]))

    def check_task(self,user_cmd):
        '''查看task_id执行结果'''
        time.sleep(2)
        try:
            task_id = user_cmd.split()[1]
            task_id = int(task_id)
            callback_queue=self.information[task_id][2]
            callback_id=self.information[task_id][3]
            client = Client()
            response = client.get_response(callback_queue, callback_id)
            print(response.decode())
            del self.information[task_id]

        except KeyError  as e :
            print("33[31;0mWrong id[%s]33[0m"%e)
        except IndexError as e:
            print("33[31;0mWrong id[%s]33[0m"%e)

    def run(self,user_cmd):
        '''执行命令'''
        try:
            time.sleep(2)
            #print("--->>",user_cmd)
            command = user_cmd.split(""")[1]
            hosts = user_cmd.split()[3:]
            for host in hosts:
                task_id = random.randint(10000, 99999)
                client = Client()
                response = client.call(host, command)
                # print(response)
                self.information[task_id] = [host, command, response[0],response[1]]
        except IndexError as e:
            print("33[31;0mError:%s33[0m"%e)

    def reflect(self,str,user_cmd):
        '''反射'''
        if hasattr(self, str):
            getattr(self, str)(user_cmd)
        # else:
        #     setattr(self, str, self.foo)
        #     getattr(self, str)()

    def start(self):
        while True:
            user_cmd = input("->>").strip()
            if not user_cmd:continue
            str = user_cmd.split()[0]
            t1 = threading.Thread(target=self.reflect,args=(str,user_cmd))  #多线程
            t1.start()

modules目录

import pika
import uuid
from 作业.rpc_client.conf import settings

class Client(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=settings.RabbitMQ_IP))
        self.channel = self.connection.channel()

    def on_response(self, ch, method, props, body):
        '''获取命令执行结果的回调函数'''
        # print("验证码核对",self.callback_id,props.correlation_id)
        if self.callback_id == props.correlation_id:  # 验证码核对
            self.response = body
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def get_response(self,callback_queue,callback_id):
        '''取队列里的值,获取callback_queued的执行结果'''
        self.callback_id = callback_id
        self.response = None
        self.channel.basic_consume(self.on_response,  # 只要收到消息就执行on_response
                                   queue=callback_queue)
        while self.response is None:
            self.connection.process_data_events()  # 非阻塞版的start_consuming
        return self.response

    def call(self,queue_name,command):
        '''队列里发送数据'''
        result = self.channel.queue_declare(exclusive=False) #exclusive=False 必须这样写
        self.callback_queue = result.method.queue
        self.corr_id = str(uuid.uuid4())
        # print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key=queue_name,
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,  # 发送返回信息的队列name
                                       correlation_id=self.corr_id,  # 发送uuid 相当于验证码
                                   ),
                                   body=command)

        return self.callback_queue,self.corr_id

运行示例图

原文地址:https://www.cnblogs.com/fuyuteng/p/9283343.html