基于RabbitMQ rpc实现的主机管理

要求:

文件分布:

流程图:

import pika
import os
import socket

class Server(object):
    def __init__(self, queuename):
        self.queuename = queuename
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
        self.channel = self.connection.channel()  #声明一个管道
        self.channel.queue_declare(queue=self.queuename)
    def handle(self,command):
        message = os.popen(command.decode()).read()
        if not message:
            message = 'wrong command'
        return message
    def on_requet(self, ch, method,props,body):
        response = self.handle(body)
        print(response)
        ch.basic_publish(exchange='',
                      routing_key=props.reply_to, #拿到客户端随机生成的queue
                      properties = pika.BasicProperties(correlation_id = props.correlation_id),
                      body = str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)#确保任务完成

    def start(self):
        self.channel.basic_consume(self.on_requet, queue=self.queuename) #收到消息就调用on_requet
        print(" [x] Awaiting RPC requests")
        self.channel.start_consuming()





if __name__ == "__main__":
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname) # 获取本地ip地址作为queue name
    print(ip)
    queue_name = ip
    server = Server(queue_name)
    server.start()
server
  1 import pika
  2 import uuid
  3 import random
  4 import threading
  5 
  6 
  7 class Client(object):
  8     def __init__(self):
  9         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
 10         self.channel = self.connection.channel()
 11 
 12     def on_response(self, ch, method, props, body):
 13         if self.callback_id == props.correlation_id:
 14             self.response = body
 15         ch.basic_ack(delivery_tag=method.delivery_tag)
 16 
 17     def get_response(self, callback_queue, corr_id):
 18         self.response = None
 19         self.callback_id = corr_id
 20         self.channel.basic_consume(self.on_response, queue=callback_queue)
 21         while self.response is None:
 22             self.connection.process_data_events()  # 非阻塞版的start_consuming
 23         return self.response
 24 
 25     def call(self, queuename, n):
 26         # 声明临时的回调队列
 27         result = self.channel.queue_declare(exclusive=False)
 28         self.callback_queue = result.method.queue
 29         self.corr_id = str(uuid.uuid4())
 30         self.channel.basic_publish(exchange='',
 31                                    routing_key=queuename,
 32                                    properties=pika.BasicProperties(
 33                                        reply_to=self.callback_queue,
 34                                        correlation_id = self.corr_id,
 35                                    ),
 36                                    body = n)
 37         return self.callback_queue, self.corr_id
 38 
 39 class Threading(object):
 40     def __init__(self):
 41         self.info={}
 42 
 43     def check_all(self, cmd):
 44         '''
 45         查看已经有的任务id
 46         :param cmd:
 47         :return:
 48         '''
 49         for i in self.info:
 50             print('task id: %s, host: %s, command:%s' % (i, self.info[i][0], self.info[i][1]))
 51 
 52     def check_task(self, cmd_id):
 53         '''
 54         查看运行结果
 55         :param cmd_id:
 56         :return:
 57         '''
 58         try:
 59             id = int(cmd_id.split()[1])
 60             callack_queue = self.info[id][2]
 61             callack_id=self.info[id][3]
 62             client = Client()
 63             res = client.get_response(callack_queue, callack_id)
 64             print(res.decode())
 65             del self.info[id]
 66         except Exception as e:
 67             print(e)
 68 
 69     def run(self, cmd):
 70         comm = cmd.split(""")[1]
 71         hosts = cmd.split("--")
 72         host = hosts[1].split()[1:] #拿ip地址
 73         for i in host:
 74             id = random.randint(10000,99999)
 75             obj = Client()
 76             res = obj.call(i, comm)
 77             self.info[id] = [i,comm,res[0], res[1]]
 78         return self.info
 79 
 80 
 81     def ref(self, cmd):
 82         '''
 83         反射
 84         :param cmd:
 85         :return:
 86         '''
 87         str = cmd.split()[0]
 88         if hasattr(self,str):
 89             func = getattr(self,str)
 90             r = func(cmd)
 91             if r is not None:
 92                 for i in r:
 93                     print('task id: %s, host: %s, command:%s' % (i, r[i][0], r[i][1]))
 94 
 95     def thread(self):
 96         while True:
 97             cmd = input("->>").strip()
 98             if not cmd:continue
 99             t1 = threading.Thread(target=self.ref, args=(cmd, ))
100             t1.start()
101 
102 
103 
104 
105 obj = Threading()
106 res = obj.thread()
cliernt
原文地址:https://www.cnblogs.com/nikitapp/p/6836949.html