Python 使用MySQLdb模块通过ssh隧道连接mysql

# -*- coding: utf-8 -*-
import os, sys

import MySQLdb
from sshtunnel import SSHTunnelForwarder

from util.read_ini import ReadIni

db_name = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(db_name)


# 创建数据工具类
class SSHMySQL:
    def __init__(self):
        file_name = os.path.join(db_name, 'config\base.ini')
        self.read_ini = ReadIni(file_name=file_name, node='server')
        # 连接ssh
        self.server = self.get_server()
        # 通过ssh隧道连接mysql
        self.conn = self.get_conn()
        # 获取连接mysql的游标对象
        self.cur = self.conn.cursor()

    def __enter__(self):
        return self

    # 使用SSHTunnelForwarder方法连接ssh
    def get_server(self):
        ssh_host = self.read_ini.get_value('ssh_host')
        ssh_port = 22
        server = SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_password=self.read_ini.get_value('password'),
            ssh_username=self.read_ini.get_value('username'),
            remote_bind_address=('127.0.0.1', 3306),
        )
        return server

    # 使用ssh隧道连接mysql
    def get_conn(self):
        # 启动ssh
        self.server.start()
        # 连接数据库 host必须为127.0.0.1
        conn = MySQLdb.connect(host='127.0.0.1',
                               port=self.server.local_bind_port,
                               user='root',
                               passwd='',
                               db=self.read_ini.get_value('database'),
                               charset='utf8')
        return conn

    # 退出方法
    def __exit__(self, exc_type, exc_val, exc_tb):
        # 关闭数据库游标对象
        self.cur.close()
        # 关闭数据库连接
        self.conn.close()
        # 关闭ssh
        self.server.stop()
    
    # 获取单个数据结果
    def get_one(self, query, param=None):
        try:
            # 通过数据库游标执行SQL query为SQL语句块
            self.cur.execute(query, param)
            # 接受结果集
            result = self.cur.fetchone()
            # 处理结果集
            if result is not None:
                response = dict(zip([k[0] for k in self.cur.description], result))
            else:
                response = result
            # 返回结果集
            return response
        except Exception as e:
            # 捕获异常后回滚
            self.conn.rollback()
            raise e

    def get_more(self, query, param=None):
        try:
            self.cur.execute(query, param)
            result = self.cur.fetchall()
            if result is not None:
                response = [dict(zip([k[0] for k in self.cur.description], row)) for row in result]
            else:
                response = result
            return response
        except Exception as e:
            self.conn.rollback()
            raise e

    def update(self, query, param=None):
        return self.__edit(query, param)

    def delete(self, query, param=None):
        return self.__edit(query, param)

    def insert_one(self, query, param=None):
        return self.__edit(query, param)

    def __edit(self, query, param):
        count = 0
        try:
            self.cur.execute(query, param)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            raise e
        return count

    def insert_many(self, query, param=None):
        try:
            self.cur.executemany(query, param)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            raise e


if __name__ == '__main__':
    with SSHMySQL() as db:
        # sql = "SELECT * FROM backup_manager WHERE `name` LIKE %s;"
        # args = ('backup%',)
        # res = db.get_more(sql, args)
        # print(res)

        query = 'SELECT UUID FROM virtualmachine WHERE label LIKE %s;'
        args = ('lxq_nginx' + '%',)
        res = db.get_more(query, args)
        print(res)
不积跬步,无以至千里;不积小流,无以成江海。
原文地址:https://www.cnblogs.com/xuezhimin-esage-2020/p/14473461.html