【Python之路Day13】网络篇之Paramiko

Paramiko

paramiko模块,基于SSH用于连接远程服务器并执行相关操作。

一、安装

pip3 install paramiko

二、使用

1. SSHClient

用于连接远程服务器并执行基本命令

基于用户名密码连接:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko

#创建ssh对象

ssh = paramiko.SSHClient()

#允许连接不在Know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

#连接服务器
ssh.connect(hostname='172.16.30.162',port=22,username='root',password='123456')

#执行命令
stdin,stdout,stderr = ssh.exec_command('ls /tmp/')

#获取执行结果
result = stdout.read()

print(result.decode()) #返回的是bytes

#关闭连接
ssh.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko

transport = paramiko.Transport(('172.16.30.162',22))
transport.connect(username='root',password='123456')


ssh = paramiko.SSHClient()
ssh._transport = transport

stdin,stdout,stderr = ssh.exec_command('df -Th')

res = stdout.read()
print(res.decode())

transport.close()
SSHClient 封装 Transport

基于公钥密钥连接:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/Users/daniel/.ssh/id_rsa')

# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='172.16.30.162', port=22, username='root', pkey=private_key)

# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode())
# 关闭连接
ssh.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/Users/daniel/.ssh/id_rsa')

transport = paramiko.Transport(('172.16.30.162',22))
transport.connect(username='root',pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin,stdout,stderr = ssh.exec_command('df -Th')

print(stdout.read().decode())

transport.close()
SSHClient 封装 Transport

SFTPClient

用于连接远程服务器并执行上传下载

基于用户名密码 上传/下载

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko


transport = paramiko.Transport(('172.16.30.162',22))
transport.connect(username='root',password='123456')

sftp = paramiko.SFTPClient.from_transport(transport)

#上传本地文件/Users/daniel/test/zhoulibo.mp4 上传到服务器的 /tmp/zhoulibo.mp4
sftp.put('/Users/daniel/test/zhoulibo.mp4','/tmp/zhoulibo.mp4')

#将远端/tmp/zhoulibo.mp4 下载到 本地 /Users/daniel/zhoulibo.mp4
sftp.get('/tmp/zhoulibo.mp4','/Users/daniel/zhoulibo.mp4')

transport.close()

基于公钥/密钥的上传/下载:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/Users/daniel/.ssh/id_rsa')

transport = paramiko.Transport(('172.16.30.162',22))
transport.connect(username='root',pkey=private_key)

sftp = paramiko.SFTPClient.from_transport(transport)

#上传本地文件/Users/daniel/test/zhoulibo.mp4 上传到服务器的 /tmp/zhoulibo.mp4
sftp.put('/Users/daniel/test/zhoulibo.mp4','/tmp/zhoulibo.mp4')

#将远端/tmp/zhoulibo.mp4 下载到 本地 /Users/daniel/zhoulibo.mp4
sftp.get('/tmp/zhoulibo.mp4','/Users/daniel/zhoulibo.mp4')

transport.close()
View Code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko,uuid

class SSHConnection:

    def __init__(self,host='172.16.30.162',port=22,username='root',pwd='123456'):
        self.host = host
        self.port = port
        self.username = username
        self.pwd = pwd
        self.__k = None

    def create_file(self):
        file_name = str(uuid.uuid4())   #生成一个'46cad739-5937-4312-9156-00753b272581'的文件名 uuid并转换成str
        with open(file_name,'w') as f:  #打开文件,往里面写一个sb
            f.write('sb')
        return file_name

    def run(self):
        '''
        主方法
        :return:
        '''
        self.connect()
        self.upload('/tmp/test.py')   #调用upload方法,上传到服务器的/tmp/test.py
        self.rename('/tmp/test.py','/tmp/sb.py')  #改名为sb.py
        self.close()

    def connect(self):
        '''
        建立连接
        :return:
        '''
        transport = paramiko.Transport((self.host,self.port))
        transport.connect(username=self.username,password=self.pwd)
        self.__transport = transport

    def close(self):
        '''
        关闭连接
        :return:
        '''
        self.__transport.close()

    def upload(self,target_path):
        '''
        连接,上传
        :param target_path:
        :return:
        '''
        file_name = self.create_file()
        sftp = paramiko.SFTPClient.from_transport(self.__transport)
        #上传本地文件 到服务器的 test.py
        sftp.put(file_name, target_path)

    def rename(self, old_path, new_path):
        '''
        改名, 把/tmp/test.py 改名为 sb.py
        :param old_path:
        :param new_path:
        :return:
        '''
        ssh = paramiko.SSHClient()
        ssh._transport = self.__transport
        #执行命令
        cmd = "mv %s %s" %(old_path,new_path)
        stdin,stdout,stderr = ssh.exec_command(cmd)
        res = stdout.read()

    def cmd(self,command):
        '''
        可以执行其他命令
        :param command:
        :return:
        '''
        self.connect()
        ssh = paramiko.SSHClient()
        ssh._transport = self.__transport
        #执行命令
        stdin, stdout, stderr = ssh.exec_command(command)
        res = stdout.read()
        self.close()
        return res

ha = SSHConnection()

ha.run()
res = ha.cmd('ls')
print(res.decode())
Demo
# 对于更多限制命令,需要在系统中设置
/etc/sudoers
  
Defaults    requiretty
Defaults:cmdb    !requiretty

堡垒机

堡垒机执行流程:

  1. 管理员为用户在服务器上创建账号(将公钥放置服务器,或者使用用户名密码)
  2. 用户登陆堡垒机,输入堡垒机用户名密码,现实当前用户管理的服务器列表
  3. 用户选择服务器,并自动登陆
  4. 执行操作并同时将用户操作记录

PS:配置.brashrc实现ssh登陆后自动执行脚本,如:/usr/bin/python /home/daniel/menu.py

实现过程

1. 前奏

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko
import sys
import os,socket,select,getpass

transport = paramiko.Transport(('172.16.30.162',22))
transport.start_client()
transport.auth_password('root','123456')

#打开一个通道
channel = transport.open_session()

#获取一个终端
channel.get_pty()

#激活器
channel.invoke_shell()

#########
# 利用sys.stdin,肆意妄为执行操作
# 用户在终端输入内容,并将内容发送至远程服务器
# 远程服务器执行命令,并将结果返回
# 用户终端显示内容
#########

channel.close()
transport.close()

2. 肆意妄为(一)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko
import sys
import os,socket,select,getpass
from paramiko.py3compat import u


transport = paramiko.Transport(('172.16.30.162',22))
transport.start_client()
transport.auth_password('root','123456')

#打开一个通道
channel = transport.open_session()

#获取一个终端
channel.get_pty()

#激活器
channel.invoke_shell()

while True:
    #监视用户输入和服务器的返回的数据
    # sys.stdin处理用户输入
    # channel 是之前创建的通道, 用户接受服务器返回的信息
    readable, writeable, error = select.select([channel, sys.stdin,],[],[],1)
    if channel in readable:
        try:
            x = u(channel.recv(1024))
            if len(x) == 0:
                print('
**** EOF
')
                break
            sys.stdout.write(x)
            sys.stdout.flush()
        except socket.timeout:
            pass
    if sys.stdin in readable:
        inp = sys.stdin.readline()
        channel.sendall(bytes(inp,encoding='utf-8'))

channel.close()
transport.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko
import sys
import os,socket,select,getpass
from paramiko.py3compat import u


default_username = getpass.getuser()  #获取当前用户
username = input('Usersname[%s]:'%default_username).strip()   #提示用户输入用户名

if len(username) == 0:   #如果用户输入为空, 则用户名为默认当前系统用户名
    username = default_username

hostname = input('Hostname: ').strip()   #提示输入主机名
if len(hostname) == 0:            #如果为空,错误退出程序
    print('*** Hostname required.')
    sys.exit(1)

port = input('Port: [22] ').strip()   #提示输入端口
if len(port) == 0:            #如果为空, 端口为默认的22
    port = 22

transport = paramiko.Transport((hostname,port))
transport.start_client()


default_auth = "p"
auth = input('Auth by (p)assword or (r)sa key [%s]' %default_auth).strip()  #让用户选择是密码认证还是证书认证

if len(auth) == 0:
    auth = default_auth

if auth == 'r':
    default_path = os.path.join(os.environ['HOME'],'.ssh','id_rsa')  #拼接用户家目录/.ssh/id_rsa
    path = input('RSA key [%s]: '%default_path).strip()
    if len(path) == 0:
        path = default_path
    try:
        key = paramiko.RSAKey.from_private_key_file(path)  #加载用户key
    except paramiko.PasswordRequiredException:
        password = getpass.getpass('RSA key password: ')
        key = paramiko.RSAKey.from_private_key_file(path,password)
    transport.auth_publickey(username,key)

elif auth == 'p':
    password = getpass.getpass('Password for %s@%s '%(username, hostname))
    try:
        transport.auth_password(username,password)
    except paramiko.ssh_exception.AuthenticationException:
        print('33[31;1mAuthentication failed.33[0m')
        exit(1)




#打开一个通道
channel = transport.open_session()

#获取一个终端
channel.get_pty()

#激活器
channel.invoke_shell()

while True:
    #监视用户输入和服务器的返回的数据
    # sys.stdin处理用户输入
    # channel 是之前创建的通道, 用户接受服务器返回的信息
    readable, writeable, error = select.select([channel, sys.stdin,],[],[],1)
    if channel in readable:
        try:
            x = u(channel.recv(1024))
            if len(x) == 0:
                print('
**** EOF
')
                break
            sys.stdout.write(x)
            sys.stdout.flush()
        except socket.timeout:
            pass
    if sys.stdin in readable:
        inp = sys.stdin.readline()
        channel.sendall(bytes(inp,encoding='utf-8'))

channel.close()
transport.close()
完整Demo

3. 肆意妄为(二)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko
import sys
import termios,tty
import os,socket,select,getpass
from paramiko.py3compat import u


transport = paramiko.Transport(('172.16.30.162',22))
transport.start_client()
transport.auth_password('root','123456')

#打开一个通道
channel = transport.open_session()

#获取一个终端
channel.get_pty()

#激活器
channel.invoke_shell()

#########
#获取原 tty属性

oldtty = termios.tcgetattr(sys.stdin)

try:
    # 为tty设置新属性
    # 默认当前tty设备属性:
    #   输入一行回车,执行
    #   CTRL+C 进程退出,遇到特殊字符,特殊处理。

    # 这是为原始模式,不认识所有特殊符号
    # 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
    tty.setraw(sys.stdin.fileno())
    channel.settimeout(0.0)
    while True:
        #监视 用户输入 和远程服务器返回的数据 socket
        #阻塞, 直到句柄可读
        r,w,e = select.select([channel, sys.stdin],[],[],1)
        if channel in r:
            try:
                x = u(channel.recv(2048))
                if len(x) == 0:
                    print('
***EOF***
')
                    break

                sys.stdout.write(x)
                sys.stdout.flush()
            except socket.timeout:
                pass
        if sys.stdin in r:
            x = sys.stdin.read(1)
            if len(x) == 0:
                break

            channel.sendall(bytes(x,encoding='utf-8'))

finally:
    #重新设置终端属性
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)

channel.close()
transport.close()

PS: 注意上面代码无法再Pycharm中运行, 因为无法获取tty,在终端中运行可用

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import paramiko
import sys
import os,socket,select,getpass
import termios
import tty
from paramiko.py3compat import u


default_username = getpass.getuser()  #获取当前用户
username = input('Usersname[%s]:'%default_username).strip()   #提示用户输入用户名

if len(username) == 0:   #如果用户输入为空, 则用户名为默认当前系统用户名
    username = default_username

hostname = input('Hostname: ').strip()   #提示输入主机名
if len(hostname) == 0:            #如果为空,错误退出程序
    print('*** Hostname required.')
    sys.exit(1)

port = input('Port: [22] ').strip()   #提示输入端口
if len(port) == 0:            #如果为空, 端口为默认的22
    port = 22

transport = paramiko.Transport((hostname,port))
transport.start_client()


default_auth = "p"
auth = input('Auth by (p)assword or (r)sa key [%s]' %default_auth).strip()  #让用户选择是密码认证还是证书认证

if len(auth) == 0:
    auth = default_auth

if auth == 'r':
    default_path = os.path.join(os.environ['HOME'],'.ssh','id_rsa')  #拼接用户家目录/.ssh/id_rsa
    path = input('RSA key [%s]: '%default_path).strip()
    if len(path) == 0:
        path = default_path
    try:
        key = paramiko.RSAKey.from_private_key_file(path)  #加载用户key
    except paramiko.PasswordRequiredException:
        password = getpass.getpass('RSA key password: ')
        key = paramiko.RSAKey.from_private_key_file(path,password)
    transport.auth_publickey(username,key)

elif auth == 'p':
    password = getpass.getpass('Password for %s@%s '%(username, hostname))
    try:
        transport.auth_password(username,password)
    except paramiko.ssh_exception.AuthenticationException:
        print('33[31;1mAuthentication failed.33[0m')
        exit(1)




#打开一个通道
channel = transport.open_session()

#获取一个终端
channel.get_pty()

#激活器
channel.invoke_shell()

#获取原 tty属性
oldtty = termios.tcgetattr(sys.stdin)

try:
    # 为tty设置新属性
    # 默认当前tty设备属性:
    #   输入一行回车,执行
    #   CTRL+C 进程退出,遇到特殊字符,特殊处理。

    # 这是为原始模式,不认识所有特殊符号
    # 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
    tty.setraw(sys.stdin.fileno())
    channel.settimeout(0.0)

    while True:
        #监视 用户输入 和远程服务器返回的数据 socket
        #阻塞, 直到句柄可读
        r,w,e = select.select([channel, sys.stdin],[],[],1)
        if channel in r:
            try:
                x = u(channel.recv(2048))
                if len(x) == 0:
                    print('
***EOF***
')
                    break

                sys.stdout.write(x)
                sys.stdout.flush()
            except socket.timeout:
                pass
        if sys.stdin in r:
            x = sys.stdin.read(1)
            if len(x) == 0:
                break

            channel.sendall(bytes(x,encoding='utf-8'))

finally:
    #重新设置终端属性
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)

channel.close()
transport.close()
版本1
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)
import paramiko
import sys,os
import socket,getpass

from paramiko.py3compat import u

#windows 没有termios

try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False

def interactive_shell(channel):
    if has_termios:
        posix_shell(channel)  #如果标志为真,执行*inux方法
    else:
        windows_shell(channel) #否则, 执行windows_shell方法

def posix_shell(channel):
    import select
    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        channel.settimeout(0.0)
        log = open('handle.log','a+',encoding='utf-8')
        flag = False
        temp_list = []
        while True :
            r,w,e = select.select([channel,sys.stdin],[],[])
            if channel in r:
                try:
                    x = u(channel.recv(2048))
                    if len(x) == 0:
                        sys.stdout.write('
***EOF***
')
                        break
                    if flag:
                        if x.startswith('
'):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    sys.stdout.write(x)
                    sys.stdout.flush()

                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == '	':
                    flag = True
                else:
                    temp_list.append(x)

                if x == '
':
                    log.write(''.join((temp_list)))
                    log.flush()
                    temp_list.clear()

                channel.sendall(bytes(x,encoding='utf-8'))
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)


def windows_shell(channel):
    import threading

    sys.stdout.write('Line-buffered terminal emulation. Please F6 or ^Z to send EOF.

')

    def writeall(sock):
        while True:
            data = sock.recv(2048)
            if not data:
                sys.stdout.write('
***EOF***

')
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(channel,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            channel.sendall(bytes(d,encoding='utf-8'))

    except EOFError:
        pass

def run():
    transport = paramiko.Transport(('172.16.30.162',22))
    transport.start_client()
    transport.auth_password('root','123456')

    #打开一个通道
    channel = transport.open_session()
    #获取终端
    channel.get_pty()
    #激活器
    channel.invoke_shell()

    interactive_shell(channel)

    channel.close()
    transport.close()

if __name__ == '__main__':
    run()
版本二
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)
import paramiko
import sys,os
import socket,getpass

from paramiko.py3compat import u

#windows 没有termios

try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False

def interactive_shell(channel):
    if has_termios:
        posix_shell(channel)  #如果标志为真,执行*inux方法
    else:
        windows_shell(channel) #否则, 执行windows_shell方法

def posix_shell(channel):
    import select
    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        channel.settimeout(0.0)
        log = open('handle.log','a+',encoding='utf-8')
        flag = False
        temp_list = []
        while True :
            r,w,e = select.select([channel,sys.stdin],[],[])
            if channel in r:
                try:
                    x = u(channel.recv(2048))
                    if len(x) == 0:
                        sys.stdout.write('
***EOF***
')
                        break
                    if flag:
                        if x.startswith('
'):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    sys.stdout.write(x)
                    sys.stdout.flush()

                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == '	':
                    flag = True
                else:
                    temp_list.append(x)

                if x == '
':
                    log.write(''.join((temp_list)))
                    log.flush()
                    temp_list.clear()

                channel.sendall(bytes(x,encoding='utf-8'))
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)


def windows_shell(channel):
    import threading

    sys.stdout.write('Line-buffered terminal emulation. Please F6 or ^Z to send EOF.

')

    def writeall(sock):
        while True:
            data = sock.recv(2048)
            if not data:
                sys.stdout.write('
***EOF***

')
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(channel,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            channel.sendall(bytes(d,encoding='utf-8'))

    except EOFError:
        pass

def run():
    default_username = getpass.getuser()  #获取当前用户
    username = input('Usersname[%s]:'%default_username).strip()   #提示用户输入用户名

    if len(username) == 0:   #如果用户输入为空, 则用户名为默认当前系统用户名
        username = default_username

    hostname = input('Hostname: ').strip()   #提示输入主机名
    if len(hostname) == 0:            #如果为空,错误退出程序
        print('*** Hostname required.')
        sys.exit(1)

    port = input('Port: [22] ').strip()   #提示输入端口
    if len(port) == 0:            #如果为空, 端口为默认的22
        port = 22

    transport = paramiko.Transport((hostname,port))
    transport.start_client()

    default_auth = "p"
    auth = input('Auth by (p)assword or (r)sa key [%s]' %default_auth).strip()  #让用户选择是密码认证还是证书认证
    
    if len(auth) == 0:
        auth = default_auth

    if auth == 'r':
        default_path = os.path.join(os.environ['HOME'],'.ssh','id_rsa')  #拼接用户家目录/.ssh/id_rsa
        path = input('RSA key [%s]: '%default_path).strip()
        if len(path) == 0:
            path = default_path
        try:
            key = paramiko.RSAKey.from_private_key_file(path)  #加载用户key
        except paramiko.PasswordRequiredException:
            password = getpass.getpass('RSA key password: ')
            key = paramiko.RSAKey.from_private_key_file(path,password)
        transport.auth_publickey(username,key)

    elif auth == 'p':
        password = getpass.getpass('Password for %s@%s '%(username, hostname))
        try:
            transport.auth_password(username,password)
        except paramiko.ssh_exception.AuthenticationException:
            print('33[31;1mAuthentication failed.33[0m')
            exit(1)


    #打开一个通道
    channel = transport.open_session()
    #获取终端
    channel.get_pty()
    #激活器
    channel.invoke_shell()

    interactive_shell(channel)

    channel.close()
    transport.close()

if __name__ == '__main__':
    run()
终极版本

更多参见:paramoko源码 https://github.com/paramiko/paramiko 

参考博文:

  • http://www.cnblogs.com/wupeiqi/articles/5699254.html
原文地址:https://www.cnblogs.com/dubq/p/5707580.html