paramiko-tools

own dev

# coding=utf-8
import paramiko
import os
import   logging
import json
import unittest
from stat import S_ISDIR,S_ISREG
logging.basicConfig(level = logging.ERROR,format = '%(asctime)s  - %(levelname)s -->%(funcName)s  at line %(lineno)d: 
 %(message)s')
log= logging.getLogger()


class  ParamikoSftp(object):

     def __init__(self,ip,port,user,pwd):
         self.port = port
         self.pwd = pwd
         self.user = user
         self.ip = ip
         self.cli=self.sftp_client()
         self.para_cli=self.ssh_client()
         self.para_sftp=self.para_cli.open_sftp()


     def sftp_client(self):
         self.tp = paramiko.Transport(self.ip, self.port)
         self.tp.connect(username=self.user, password=self.pwd)
         self.sftp = paramiko.SFTPClient.from_transport(self.tp)
         return self.sftp


     def ssh_client(self):
          client = paramiko.SSHClient()
          try:
              client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
              client.connect(self.ip, self.port, self.user, self.pwd)
              if client:
                  return client
          except Exception as e:
              log.error(e)


     # paramiko function
     def query_dir(self,dir)->list:
         connection = self.para_cli
         shell="ls -l  %s | awk '{print $9}'" % dir
         input, out, err = connection.exec_command(shell)
         res, err = out.readlines(), err.read()
         if res:
             files_list = [i.strip('
') for i in res if i.strip('
')]
             return files_list
         if err:
             raise FileExistsError(err.decode("utf-8"))


     # paramiko  function
     def callback_stdout(self,shell:str):
         connection=self.para_cli
         try:
             input, out, err = connection.exec_command(shell,get_pty=True)
             res, err = out.readlines(), err.read()
             if res:
                 i=''
                 for j in res:
                     i =i+j
                 return i
             if err:
                 return err.decode("utf-8")
         except Exception as f:
             log.error(f)

     def  touch(self,filepath):
        client =self.para_cli
        def create_file(filepath):
            client.exec_command("touch %s"%filepath)
        a=self.exists_file(filepath)
        if a:
            self.removes(filepath)
            create_file(filepath)
            info={"msg": "File {}  found and have remove it ,success create {}".format(filepath,filepath),"cdde":200,"status":"success"}
            log.info(json.dumps(info,indent=4,ensure_ascii=False) )

        else:
            create_file(filepath)


     #sftp func
     def mkdir(self,dir):
         client=self.cli
         try:
              bool=self.exists_dir(dir)
              if bool:
                  client.rmdir(dir)
                  client.mkdir(dir)
              else:
                  client.mkdir(dir)
         except Exception as e:
             log.error("{'msg': 'mkdir %s failed maybe it have exists same name file or dir ,'code':100,'status': 'failed'}"%dir)


     #sftp  func
     def  removes(self,filepath):
         client=self.cli
         try:
           if self.exists_file(filepath):
               client.remove(filepath)
           else:
               pass
         except FileNotFoundError as e:
               info={'msg': 'File %s Not Found Error'%filepath, 'retcode':100,'status': 'failed'}
               log.error(json.dumps(info,ensure_ascii=False,indent=4),exc_info=False)


     def list_dir(self,dir):
         client=self.cli
         try:
             res= client.listdir(dir)
             return res
         except FileNotFoundError:
             log.error("{'msg': '%s Not Found Error', 'retcode':100,'status': 'failed'}"%dir)

     # sftp function
     def rm(self,absdir):
         def isdirs(filepath, sftp):
             return S_ISDIR(sftp.stat(filepath).st_mode)
         def subsftp(absdir,sftp):
             files = sftp.listdir(absdir)
             try:
                 for file in files:
                     filepath = os.path.join(absdir, file)
                     fps = filepath.replace("\", "/")
                     if isdirs(fps, sftp):
                         self.rm(fps)
                     else:
                         sftp.remove(fps)
                         log.info("{'msg': 'remove file %s success,'retcode': 200}"  %(fps))
                 # rmdir empty dir
                 sftp.rmdir(absdir)
             except Exception as e:
                 log.error(e)
             else:
                 log.info("{'msg': 'finished rm %s del self or its subdir and subfile,'recode':200}" % (absdir))
         sftp=self.cli
         try:
             subsftp(absdir, sftp)
         except Exception as e:
             log.error(e)

     # sftp func
     def  exists_dir(self,dir):
         client=self.cli
         def isdirs(dir, sftp):
             return S_ISDIR(sftp.stat(dir).st_mode)
         try:
                isdirs(dir,client)
                return True
         except FileNotFoundError as f:
             return False


     #sftp func
     def exists_file(self,filepath):
          client=self.para_sftp
          try:
            client.stat(filepath)
          except Exception as e:
              return False
          else:
                log.info("exists %s file "%filepath)
                return True
     def  is_dir(self,dir):
         try:
             sftp = self.cli
             result = S_ISDIR(sftp.stat(dir).st_mode)
         except IOError:  # no such file
             result = False
         return result


     def  is_file(self,filepath):
         try:
             sftp=self.cli
             result = S_ISREG(sftp.stat(filepath).st_mode)
         except IOError:  # no such file
             result = False
         return result

     def makedirs(self,remotedir,mode=777):
         if self.is_dir(remotedir):
             pass

         elif self.is_file(remotedir):
             raise OSError("a file with the same name as the remotedir, "
                           "'%s', already exists." % remotedir)
         else:

             head, tail = os.path.split(remotedir)
             if head and not self.is_dir(head):
                 self.makedirs(head, mode)
             if tail:
                 self.cli.mkdir(remotedir, mode=mode)
     def sftp_close(self):
         self.cli.close()
         self.para_cli.close()
         self.para_sftp.close()


class TestSftp(unittest.TestCase):
    @classmethod
    def setUpClass(cls) -> None:
        ip = "192.168.110.151"
        port = 22
        user = "root"
        pwd = "admin"
        cls.client =ParamikoSftp(ip, port, user, pwd)
        log.info("start selfcheck method of sftp ")

    @classmethod
    def tearDownClass(cls) -> None:
        cls.client.sftp_close()
        log.info("finish selfcheck method of sftp ")


    def test_query_dir(self):
        """test listdir"""
        files=self.client.query_dir("/usr/local/listdir")
        self.assertIn('list.txt',files)
    def test_call_backstdout(self):
        shell="ls -l /usr/local"
        readlines=self.client.callback_stdout(shell)
        self.assertIn("redisdb",readlines)


    def  test_exsitdir(self):
        a=self.client.exists_dir("/usr/local")
        assert a==True

    def  test_exsistfile(self):
        b=self.client.exists_file("/usr/local/redisdb/logs/redis.log")
        assert b==True

    def test_touch(self):
         """create file """
         path="/usr/local/toutest.txt"
         self.client.touch(path)
         a=self.client.exists_file(path)
         self.assertEqual(a,True)
         import time
         time.sleep(1)
         self.client.removes(path)
    def test_remove(self):
        """remove file """
        path="/usr/local/tou.txt"
        self.client.touch(path)
        self.client.removes(path)
        a=self.client.exists_file(path)
        self.assertEqual(a,False)


    def  test_mkandrm(self):
        """test mkdir exists already and rm dir  """
        self.client.mkdir("/usr/local/test1")
        self.client.rm("/usr/local/test1")

    def test_makedirs(self):
        dir="/usr/local/makedir1"
        self.client.makedirs(dir)
        r=self.client.exists_dir(dir)
        self.assertEqual(r,True)
        self.client.rm(dir)

    def test_is_dir(self):
        path="/usr/local"
        r=self.client.is_dir(path)
        self.assertEqual(r,True)
        file = "/usr/local/redisdb/logs/redis.log"
        r2=self.client.is_dir(file)
        self.assertEqual(r2,False)

    def  test_isfile(self):
        file="/usr/local/redisdb/logs/redis.log"
        r=self.client.is_file(file)
        self.assertEqual(r,True)
        a="/usr/local"
        b=self.client.is_file(a)
        self.assertEqual(b,False)

if __name__ == '__main__':

    unittest.main(verbosity=2)

  测试:

[root@hostuser local]# python mutilinux.py
test_call_backstdout (__main__.TestSftp) ... ok
test_exsistfile (__main__.TestSftp) ... ok
test_exsitdir (__main__.TestSftp) ... ok
test_is_dir (__main__.TestSftp) ... ok
test_isfile (__main__.TestSftp) ... ok
test_makedirs (__main__.TestSftp) ... ok
test_mkandrm (__main__.TestSftp)
test mkdir exists already and rm dir ... ok
test_query_dir (__main__.TestSftp)
test listdir ... ok
test_remove (__main__.TestSftp)
remove file ... ok
test_touch (__main__.TestSftp)
create file ... ok

----------------------------------------------------------------------
Ran 10 tests in 1.909s

OK

原文地址:https://www.cnblogs.com/SunshineKimi/p/11837309.html