python的零碎知识

1.Python代码操作git

  • 安装

    pip3 install gitpython
    
  • 操作git

    import os
    from git.repo import Repo # gitpython
    
    def clone():
        download_path = os.path.join('codes', 'fuck')
    
        # git clone -b master https://gitee.com/wupeiqi/xxoo.git
        # git clone -b v1 https://gitee.com/wupeiqi/xxoo.git
        Repo.clone_from('https://gitee.com/wupeiqi/xxoo.git', to_path=download_path, branch='master')
    
    def pull():
        # git pull origin master
        local_path = os.path.join('codes', 'fuck')
        repo = Repo(local_path)
        repo.git.pull()
    
    def tag_list():
        local_path = os.path.join('codes', 'fuck')
        repo = Repo(local_path)
        tag_list = [item.name for item in repo.tags]
        print(tag_list)
    
    def commits_list():
        import json
        local_path = os.path.join('codes', 'fuck')
        repo = Repo(local_path)
        commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',date='format:%Y-%m-%d %H:%M',max_count=50)
    
        commit_list = [json.loads(item) for item in commit_log.split('
    ') ]
        print(commit_list)
    
    def branches_list():
        pull()
        local_path = os.path.join('codes', 'fuck')
        repo = Repo(local_path)
    
        branches = repo.remote().refs
        branch_list = [item.remote_head for item in branches if item.remote_head != "HEAD"]
    
    def checkout_to_branch(branch='dev'):
        local_path = os.path.join('codes', 'fuck')
        repo = Repo(local_path)
    
        # before = repo.git.branch()
        # print('当前所在分支:',before)
    
        repo.git.checkout(branch)
    
        # after = repo.git.branch()
        # print('当前所在分支:',after)
    
    def checkout_to_commit(commit='ec1d728'):
        # commits_list()
        local_path = os.path.join('codes', 'fuck')
        repo = Repo(local_path)
        repo.git.reset('--hard', commit)
    
  • 封装到一个类中,以后当做工具。

    import os
    import json
    from git.repo import Repo
    from git.repo.fun import is_git_dir
    
    
    class GitRepository(object):
        """
        git仓库管理
        """
    
        def __init__(self, local_path, repo_url, branch='master'):
            #本地存储代码的路径 E:Django_issuecode_issuecodesxxx
            self.local_path = local_path
            #远程仓库地址 https://gite.com/zhanyu/xxx.git
            self.repo_url = repo_url
            # 这个默认为空下边自动赋值
            self.repo = None
            # 调用下边的方法
            self.initial(branch)
    
        def initial(self,branch):
            """
            初始化git仓库
            :param repo_url:
            :param branch:
            :return:
            """
            # 判断本地的仓库中有没有文件,没有就创建
            if not os.path.exists(self.local_path):
                os.makedirs(self.local_path)
            # codes/luffycity/.git
            git_local_path = os.path.join(self.local_path, '.git')
    		# 用来判断是不是有这个文件
            if not is_git_dir(git_local_path):
                # 第一次拉文件的时候是需要克隆的,不能直接拉取
                self.repo = Repo.clone_from(self.repo_url, to_path=self.local_path, branch=branch)
            else:
                # 如果有这个.git文件就实例化Repo这个类;
                self.repo = Repo(self.local_path)
    
        def pull(self):
            """
            从线上拉最新代码
            :return:
            """
            self.repo.git.pull()
    
        def branches(self):
            """
            获取所有分支
            :return:
            """
            branches = self.repo.remote().refs
            return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]]
    
        def commits(self):
            """
            获取所有提交记录
            :return:
            """
            commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',
                                           max_count=50,
                                           date='format:%Y-%m-%d %H:%M')
            return [json.loads(item) for item in commit_log.split('
    ') ]
    
        def tags(self):
            """
            获取所有tag
            :return:
            """
            return [tag.name for tag in self.repo.tags]
    
        def change_to_branch(self, branch):
            """
            切换分值
            :param branch:
            :return:
            """
            self.repo.git.checkout(branch)
    
        def change_to_commit(self, branch, commit):
            """
            切换commit
            :param branch:
            :param commit:
            :return:
            """
            self.change_to_branch(branch=branch)
            self.repo.git.reset('--hard', commit)
    
        def change_to_tag(self, tag):
            """
            切换tag
            :param tag:
            :return:
            """
          self.repo.git.checkout(tag)
    
    
    if __name__ == '__main__':
        local_path = os.path.join('codes', 'luffycity')
        repo_object = GitRepository(local_path, 'https://gitee.com/wupeiqi/xxoo.git')
    

2.解压缩文件

import shutil

# 压缩文件: py2、py3
"""
abs_file_path = shutil.make_archive(
    base_name="files/ww",  # 压缩包文件路劲
    format='tar',  # “zip”, “tar”
    root_dir='codes/luffycity'  # 被压缩的文件目录
)
print(abs_file_path)
"""

# 解压缩:py3
# shutil._unpack_zipfile('files/ww.zip', 'xxxxxx/new')
# shutil._unpack_tarfile('files/ww.zip', 'xxxxxx/new')

# 解压缩:py2/py3
"""
import zipfile
z = zipfile.ZipFile('files/ww.zip', 'r')
z.extractall(path='xxxxxx/luffy')
z.close()

import tarfile
tar = tarfile.TarFile('code/www.tar', 'r')
tar.extractall(path='/code/x1/')  # 可设置解压地址
tar.close()
"""

3.基于paramiko操作远程服务器

import paramiko


class SSHProxy(object):

    def __init__(self, hostname, port, username, private_key_path):
        self.hostname = hostname
        self.port = port
        self.username = username
        self.private_key_path = private_key_path

        self.transport = None

    def open(self):
        private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path)
        self.transport = paramiko.Transport((self.hostname, self.port))
        self.transport.connect(username=self.username, pkey=private_key)

    def close(self):
        self.transport.close()

    def command(self, cmd):
        ssh = paramiko.SSHClient()
        ssh._transport = self.transport
        stdin, stdout, stderr = ssh.exec_command(cmd)
        result = stdout.read()
        ssh.close()
        return result

    def upload(self, local_path, remote_path):
        sftp = paramiko.SFTPClient.from_transport(self.transport)
        sftp.put(local_path, remote_path)
        sftp.close()

    def __enter__(self):
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


if __name__ == '__main__':
    with SSHProxy('10.211.55.25', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
        proxy.upload('xx','xx')
        proxy.command('ifconfig')
        proxy.command('ifconfig')
        proxy.upload('xx', 'xx')
    with SSHProxy('10.211.55.26', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
        proxy.upload('xx','xx')
        proxy.command('ifconfig')
        proxy.command('ifconfig')
        proxy.upload('xx', 'xx')

4.本地执行命令

import subprocess

result = subprocess.check_output('dir', cwd='D:wupeiqicodeziwencodes', shell=True)
print(result.decode('gbk'), type(result))
原文地址:https://www.cnblogs.com/zhufanyu/p/11954090.html