开发要求:
1. 对主机进行批量管理
2. 可对单台或主机组批量执行命令
3. 可上传文件到对应的主机或组
4. 使用多线程实现
程序:
1. README
# 作者:hkey # 博客地址: # 功能实现: 1. 对主机进行批量管理 2. 可对单台或主机组批量执行命令 3. 可上传文件到对应的主机或组 4. 使用多线程实现 # 目录结构: batch/ ├── batch_server.py # 启动程序 ├── conf/ # 配置文件目录 │ ├── hosts.ini # 主机信息配置文件 │ └── settings.py # 程序配置文件 └── modules/ # 核心模块 ├── batch_hosts.py # 主机信息模块 └── execute.py # 执行命令模块 # 使用说明: 通过序号选择主机或组,'q'表示返回上一级;
2. 程序结构
3. 程序代码
启动程序:
#!/usr/bin/env python # -*- coding:utf-8 -*- import os, configparser from conf import settings from modules import execute from modules import batch_hosts from multiprocessing import Process import threading configfile = settings.configfile conf = configparser.ConfigParser() conf.read(configfile) config = batch_hosts.Config(conf, configfile) if not os.path.isfile(configfile): config.create_conf() if __name__ == '__main__': while True: for index, item in enumerate(conf.sections(), 1): # 循环主机或组信息 print(index, item) choice = input('-->').strip() if not choice: continue if choice == 'q': break if choice.isdigit() == False: print('输入编号错误,请重新输入。') elif int(choice) > len(conf.sections()) or int(choice) < 1: # 输入的序号不在主机列表内 print('编号不在列表中,请重新输入') else: section_host = config.show_host(choice) # 通过choice 作为section索引获取主机信息字典 section_index = int(choice) - 1 print('[%s]'.center(30, '*') % conf.sections()[section_index]) for host in section_host: print('主机IP:', host['ip']) print('[请输入要执行的命令]') while True: command = input('-->').strip() if not command: continue if command == 'q': break process_list = [] # 通过多线程运行,每一台主机任务通过一个线程去执行 for host in section_host: exec_cmd = execute.ExecCommand(host, command) t = threading.Thread(target=exec_cmd.run,) t.start() # 并发执行,这里不需要join阻塞 process_list.append(t) for t in process_list: t.join()
conf /
主机信息文件:
[host1] password = 123456 username = root ip = 192.168.118.15 port = 22 [host2] password = 123456 username = root ip = 192.168.118.16 port = 22 [web group] group = host1,host2
程序环境变量配置:
#!/usr/bin/env python # -*- coding:utf-8 -*- import sys, os import configparser # 程序主目录 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) # 配置目录 CONF_DIR = os.path.join(BASE_DIR, r'conf') # 配置文件目录 configfile = os.path.join(CONF_DIR, 'hosts.ini')
modules /
主机信息处理模块
#!/usr/bin/env python # -*- coding:utf-8 -*- import os import configparser, paramiko from conf import settings class Config(object): '''对配置文件的创建和查看''' def __init__(self, config, configfile): self.configfile = configfile self.config = config def create_conf(self): '''创建配置文件''' self.config['host1'] = { 'ip': '192.168.118.15', 'port': 22, 'username': 'root', 'password': '123456' } self.config['host2'] = { 'ip': '192.168.118.16', 'port': 22, 'username': 'root', 'password': '123456' } self.config['web group'] = { 'group': 'host1,host2' } with open(self.configfile, 'w') as file: self.config.write(file) def show_host(self, choice): '''获取主机信息字典''' print('choice', choice) section_index = int(choice) - 1 section_name = self.config.sections()[section_index] section = self.config[section_name] host_data_list = [] if 'group' in section: host_list = section['group'].split(',') for host in host_list: host_data_list.append(self.config[host]) else: host_data_list.append(section) return host_data_list
命令执行模块
#!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import os class ExecCommand(object): '''执行命令类''' def __init__(self, host, command): self.hostname = host['ip'] self.port = int(host['port']) self.username = host['username'] self.password = host['password'] self.command = command def run(self): cmd = self.command.split()[0] if cmd.startswith('put') and hasattr(self, cmd): func = getattr(self, cmd) func() else: setattr(self, cmd, self.exec_command) func = getattr(self, cmd) func() def put(self): transport = paramiko.Transport(self.hostname, self.port) transport.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(self.command.split()[1], self.command.split()[2]) transport.close() print('【%s】上传文件【%s】成功!' % (self.hostname, self.command.split()[1])) def exec_command(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect(self.hostname, self.port, self.username, self.password) stdin, stdout, stderr = self.ssh.exec_command(self.command) res, err = stdout.read(), stderr.read() result = res if res else err # 三元运算,默认stdout,错误登录 stderr print('[%s]'.center(80, '*') % self.hostname) print(result.decode())
程序运行如下图: