Python 程序:基于RabbitMQ实现主机管理

Python 程序:基于RabbitMQ实现主机管理


1、需求

2、代码

3、测试样图


一、需求

1、可以异步的执行多个命令

2、对多台机器

举例

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4 
task id: 45334
>>: check_task 45334 
>>:

二、代码

 1 import random
 2 import pika
 3 
 4 class rpc_cilent(object):
 5     def connect(self):
 6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 7     def connect_1(self):
 8         self.credentials = pika.PlainCredentials('zz', '123456')
 9         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
10         '192.168.43.165',5672,'/',self.credentials))
11     def on_response(self, ch, method, props, body):
12         if self.corr_id == props.correlation_id:
13             self.response = body
14     def call(self, command, host,tmp_dict):
15         if host == "127.0.0.1":
16             self.connect()
17         elif host == "192.168.43.165":
18             self.connect_1()
19         self.channel = self.connection.channel()
20         result = self.channel.queue_declare(exclusive=True)
21         self.callback_queue = result.method.queue
22         self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
23         self.response = None
24         self.corr_id = str(random.randint(10000, 99999))
25         ack = self.channel.basic_publish(exchange='',
26                                          routing_key= "127.0.0.1",
27                                          properties=pika.BasicProperties(
28                                              reply_to=self.callback_queue,
29                                              correlation_id=self.corr_id),
30                                          body=str(command))
31         while self.response is None:
32             self.connection.process_data_events()
33         task_id = self.corr_id
34         res = self.response.decode()
35         tmp_dict[task_id] = res
36         print('33[42;0mtask_id: %s host: %s cmd: %s33[0m' % (self.corr_id, host, command))
37 class rpc_start(object):
38     def __init__(self):
39         self.tmp_dict = {}
40         self.start()
41     def check_all(self,*args):
42         for index, key in enumerate(self.tmp_dict.keys()):
43             print(index, '33[42;0mtask_id: %s33[0m' % key)
44     def check_task(self,user_cmd):
45         try:
46             command_list = user_cmd.split()
47             print(self.tmp_dict[command_list[1]])
48             del self.tmp_dict[command_list[1]]
49         except IndexError:
50             help()
51     def run(self,user_cmd):
52          try:
53             hosts_obj = ( user_cmd.split('hosts'))
54             hosts = hosts_obj[1].strip().split()
55             command = user_cmd.split(""")[1]
56             for host in hosts:
57                 try:
58                     if host == "127.0.0.1":
59                         rpc_cilent.call(command, host,self.tmp_dict)
60                     elif host == "192.168.43.165":
61                         rpc_cilent.call(command, host,self.tmp_dict)
62                     else:
63                         print("33[41;0mno host:%s33[0m"%host)
64                 except TypeError and AssertionError:
65                     break
66          except IndexError:
67             print("33[31;0mcommand not found33[0m")
68             self.help()
69     def help(self):
70         print('33[34;0mUsage: run "df -h" hosts 127.0.0.1 192.168.43.165 33[0m')
71         print('33[34;0m       check_task 9712833[0m')
72         print('33[34;0m       check_all33[0m')
73     def start(self):
74         self.help()
75         while True:
76             user_cmd = input("->>").strip()
77             if not user_cmd:continue
78             self.cmd = user_cmd.split()[0]
79             if hasattr(self, self.cmd):
80                 getattr(self, self.cmd)(user_cmd)
81             else:
82                 print("33[31;0mcommand not found!33[0m")
83                 self.help()
84 
85 rpc_cilent = rpc_cilent()
86 start = rpc_start()
rpc_client
 1 import pika
 2 import subprocess
 3 
 4 class rpc_server(object):
 5     def __init__(self,host,queue):
 6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
 7         self.channel = self.connection.channel()
 8         self.channel.queue_declare(queue=queue)
 9         self.channel.basic_qos(prefetch_count=1)
10         self.channel.basic_consume(self.on_request, queue=queue)
11         print(" [x] Awaiting RPC requests")
12         self.channel.start_consuming()
13     def command(self,cmd):
14         res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
15         msg = res.stdout.read().decode('gbk')
16         if len(msg) == 0:
17             msg = res.stderr.read().decode('gbk')
18         print(msg)
19         return msg
20     def on_request(self,ch, method, props, body):
21         cmd = body.decode()
22         print(cmd)
23         respone = self.command(cmd)
24         ch.basic_publish(exchange='',
25                          routing_key=props.reply_to,
26                          properties=pika.BasicProperties(correlation_id=props.correlation_id),
27                          body=respone)
28         ch.basic_ack(delivery_tag=method.delivery_tag)
29 
30 server = rpc_server("localhost","127.0.0.1")
rpc_server

三、测试样图

启动客户端

windows端服务器启动

linux端服务器启动

client端测试:

原文地址:https://www.cnblogs.com/hy0822/p/9284507.html