pymysql DAO简单封装

import pymysql
import logging

logger = logging.getLogger('app')


class MysqlBase:
    def __init__(self, **args):
        self.host = args.get('host')
        self.user = args.get('user')
        self.pwd = args.get('pwd')
        self.db = args.get('db')
        self.port = int(args.get('port', 3306))
        self.charset = args.get('charset', 'utf8')

    def __enter__(self):
        try:
            self.conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.pwd,
                db=self.db,
                charset=self.charset,
                port=self.port
            )
            self.cur = self.conn.cursor()
            return self
        except Exception as e:
            raise ValueError('mysql connect error: {}'.format((self.host, e)))

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cur.close()
        self.conn.close()

    def query(self, sql, args=None):
        try:
            self.cur.execute(sql, args=args)  # 执行sql语句
            res = self.cur.fetchall()  # 获取查询的所有记录
        except Exception as e:
            logger.error("query error: {}".format(e))
            print("query error: {}".format(e))
            raise e
        return res

    def update(self, sql):
        effect_rows = 0
        try:
            effect_rows = self.cur.execute(sql)
            # 提交
            self.conn.commit()
        except Exception as e:
            # 错误回滚
            self.conn.rollback()
            logger.error("update error: {}".format(e))
            print("update error: {}".format(e))
        return effect_rows


if __name__ == "__main__":
    with MysqlBase(host='10.51.1.37', port=3306, user='root', pwd='ysyhl9T!') as db:
        res = db.query(
            "select user,authentication_string from mysql.user where user='whisky2' group by authentication_string;")
        print(res)
#!/usr/bin/env python
# -*-coding:utf-8-*-
'''
role   : mysql操作
'''

import pymysql
from opssdk.logs import Log


class MysqlBase:
    def __init__(self, **args):
        self.host = args.get('host')
        self.user = args.get('user')
        self.pswd = args.get('passwd')
        self.db = args.get('db')
        self.port = int(args.get('port', 3306))
        self.charset = args.get('charset', 'utf8')

        log_path = '/log/yunwei/yunwei_mysql.log'
        self.log_ins = Log('111', log_path)
        try:
            self.conn = pymysql.connect(host=self.host, user=self.user,
                                        password=self.pswd, db=self.db, port=self.port, charset=self.charset)
            self.cur = self.conn.cursor()
        except:
            raise ValueError('mysql connect error {0}'.format(self.host))

    ###释放资源
    def __del__(self):
        self.cur.close()
        self.conn.close()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cur.close()
        self.conn.close()

    ### 查询
    def query(self, sql):
        try:
            self.cur.execute(sql)  # 执行sql语句
            res = self.cur.fetchall()  # 获取查询的所有记录
        except Exception as e:
            self.log_ins.write_log("error", e)
            raise e

        return res

    def change(self, sql):
        resnum = 0
        try:
            resnum = self.cur.execute(sql)
            # 提交
            self.conn.commit()
        except Exception as e:
            # 错误回滚
            self.log_ins.write_log("error", e)
            self.conn.rollback()
        return resnum
#!/usr/bin/env python
# -*-coding:utf-8 -*-
#
#  无法执行多个query,self.conn.close()放在CdbConn类的单独函数中,每次query之后要手动close;否则多次query,会自动关闭
import pymysql


class CdbConn():
    def __init__(self, db_host,  db_user, db_pwd, db_name, db_port=3306):
        self.db_host = db_host
        self.db_port = db_port
        self.db_user = db_user
        self.db_pwd = db_pwd
        self.db_name = db_name
        self.status = True
        self.conn = self.getConnection()


    def getConnection(self):
        try:
            conn =  pymysql.Connect(
                host=self.db_host,  # 设置MYSQL地址
                port=int(self.db_port),  # 设置端口号
                user=self.db_user,  # 设置用户名
                passwd=self.db_pwd,  # 设置密码
                db=self.db_name,  # 数据库名
                charset='utf8',  # 设置编码
                use_unicode=True
            )
            return conn
        except Exception as e:
            self.status = False
            print('数据库连接异常: ', e)

    def query(self, sqlString):
        cursor = self.conn.cursor()
        cursor.execute(sqlString)
        returnData = cursor.fetchall()
        cursor.close()
        # self.conn.close()
        return returnData

    def close(self):
        self.conn.close()

    def update(self, sqlString):
        cursor = self.conn.cursor()
        cursor.execute(sqlString)
        self.conn.commit()
        cursor.close()
        # self.conn.close()

v2改进版本: https://www.cnblogs.com/blueskyyj/p/10039972.html

# !/usr/bin/env python
# -*-coding:utf-8 -*-
# 多次调用query之后,要手动调用close方法关闭db连接
import pymysql


class CdbConn():
    def __init__(self, db_host, db_user, db_pwd, db_name, db_port=3306):
        self.db_host = db_host
        self.db_port = db_port
        self.db_user = db_user
        self.db_pwd = db_pwd
        self.db_name = db_name
        self.status = True
        self.conn = self.getConnection()

    def getConnection(self):
        try:
            conn = pymysql.Connect(
                host=self.db_host,  # 设置MYSQL地址
                port=int(self.db_port),  # 设置端口号
                user=self.db_user,  # 设置用户名
                passwd=self.db_pwd,  # 设置密码
                db=self.db_name,  # 数据库名
                charset='utf8',  # 设置编码
                use_unicode=True
            )
            return conn
        except Exception as e:
            self.status = False
            print('数据库连接异常:{}, detail: {} '.format(
                e, (self.db_host, self.db_name)))
            return None

    def query(self, sqlString):
        cursor = self.conn.cursor()
        try:
            cursor.execute(sqlString)
            returnData = cursor.fetchall()
            return returnData
        except Exception as e:
            print("fetch failed, detail:{}".format(e))
        finally:
            if cursor:
                cursor.close()  # 多个cursor不能同时存在? https://blog.csdn.net/emaste_r/article/details/75068902
            # self.conn.close() # 调用close方法关闭db连接

    def update(self, sqlString):
        cursor = self.conn.cursor()
        try:
            cursor.execute(sqlString)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            print("update failed, detail:{}".format(e))
        finally:
            if cursor:
                cursor.close()
            # self.conn.close() # 需先判断数据库连接是否存在 if self.conn: ,再调用close方法关闭db连接

    def close(self):
        if self.conn:
            self.conn.close()


if __name__ == "__main__":
    db = CdbConn(db_host='10.51.1.37', db_port=3306, db_name='information_schema', db_user='root', db_pwd='ysyhl9T!')
    db2 = CdbConn(db_host='10.51.1.37', db_port=3306, db_name='information_schema', db_user='root', db_pwd='ysyhl9T!')
    tt = db.query("show databases")
    ss = db.query("show databases")
    # dd = db2.query("show tables from mysql")
    print(tt)
    print(ss)
    # print(dd)
# -*- coding: UTF-8 -*- 

import MySQLdb
import MySQLdb.cursors
import logging
from django.db import connection
import traceback
logger = logging.getLogger('default')
import json


class Dao(object):
    _CHART_DAYS = 90

    # 测试指定的mysql实例连接是否成功
    def check_mysql_connect(self, ipaddress, mysqlport, username, password):
        listDb = []
        conn = None
        cursor = None
        connectflag=2

        try:
            conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,
                                   charset='utf8',connect_timeout=10)
            cursor = conn.cursor()
            sql = "show databases"
            n = cursor.execute(sql)
            listDb = [row[0] for row in cursor.fetchall()
                      if row[0] not in ('information_schema', 'performance_schema', 'mysql', 'test')]
            connectflag=1
        except MySQLdb.Warning as w:
            traceback.print_exc()
            #raise Exception(w)
        except MySQLdb.Error as e:
            traceback.print_exc()
            #raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return connectflag


    # 检查复制同步状态
    def check_slave_status(self, ipaddress, mysqlport, username, password):
        listDb = []
        conn = None
        cursor = None

        master_host='NOHOST'
        master_port=0
        master_user=''
        master_log_file=''
        read_master_log_pos=0
        relay_master_log_file=''
        exec_master_log_pos=0
        slave_io_running='No'
        slave_sql_running='No'
        seconds_behind_master=-1
        count=0
        io_succesed_count=0
        sql_successed_count=0
        try:
            conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,
                                   charset='utf8', connect_timeout=10, cursorclass=MySQLdb.cursors.DictCursor)
            cursor = conn.cursor()
            sql = "show slave status"
            n = cursor.execute(sql)
            for row in cursor.fetchall():
                count = count +1
                master_host=row['Master_Host']
                master_port=row['Master_Port']
                master_user = row['Master_User']
                master_log_file = row['Master_Log_File']
                read_master_log_pos = row['Read_Master_Log_Pos']
                relay_master_log_file = row['Relay_Master_Log_File']
                exec_master_log_pos = row['Exec_Master_Log_Pos']
                cur_slave_io_running = row['Slave_IO_Running']
                cur_slave_sql_running = row['Slave_SQL_Running']
                cur_seconds_behind_master = row['Seconds_Behind_Master']
                if (cur_slave_io_running == 'Yes'):
                    io_succesed_count = io_succesed_count + 1
                if (cur_slave_sql_running == 'Yes'):
                    sql_successed_count = sql_successed_count + 1
                if (cur_seconds_behind_master == 'NULL'):
                    seconds_behind_master = -1
                elif (cur_seconds_behind_master > seconds_behind_master):
                    seconds_behind_master = cur_seconds_behind_master
            if ( io_succesed_count == count ):
                slave_io_running = 'Yes'
            if ( sql_successed_count == count ):
                slave_sql_running = 'Yes'
            if (count == 0 ):
                slave_io_running = 'No'
                slave_sql_running = 'No'
        except MySQLdb.Warning as w:
            traceback.print_exc()
            #raise Exception(w)
        except MySQLdb.Error as e:
            traceback.print_exc()
            #raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return (master_host,master_port,master_user,master_log_file,read_master_log_pos,relay_master_log_file,exec_master_log_pos,slave_io_running,slave_sql_running,seconds_behind_master)



    # 检查库表数量
    def check_table_num(self, ipaddress, mysqlport, username, password):
        listDb = []
        conn = None
        cursor = None

        dbnum = 0
        tablenum = 0
        try:
            conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,
                                   charset='utf8', cursorclass=MySQLdb.cursors.DictCursor)
            conn.select_db('information_schema')
            cursor = conn.cursor()
            sql = "select count(*) dbnum from SCHEMATA where SCHEMA_NAME not in ('information_schema','mysql','performance_schema','sys')"
            effect_row = cursor.execute(sql)
            row = cursor.fetchone()
            dbnum = row['dbnum']
            sql = "select count(*) tablenum from TABLES where TABLE_SCHEMA not in ('information_schema','mysql','performance_schema','sys')"
            effect_row = cursor.execute(sql)
            row = cursor.fetchone()
            tablenum = row['tablenum']
        except MySQLdb.Warning as w:
            traceback.print_exc()
            # raise Exception(w)
        except MySQLdb.Error as e:
            traceback.print_exc()
            # raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return (dbnum,tablenum)

    # 连进指定的mysql实例里,读取所有databases并返回
    def getAlldbByCluster(self, masterHost, masterPort, masterUser, masterPassword):
        listDb = []
        conn = None
        cursor = None

        try:
            conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword,
                                   charset='utf8')
            cursor = conn.cursor()
            sql = "show databases"
            n = cursor.execute(sql)
            listDb = [row[0] for row in cursor.fetchall()
                      if row[0] not in ('information_schema', 'performance_schema', 'mysql', 'test')]
        except MySQLdb.Warning as w:
            raise Exception(w)
        except MySQLdb.Error as e:
            raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return listDb

    # 连进指定的mysql实例里,读取所有tables并返回
    def getAllTableByDb(self, masterHost, masterPort, masterUser, masterPassword, dbName):
        listTb = []
        conn = None
        cursor = None

        try:
            conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                   charset='utf8')
            cursor = conn.cursor()
            sql = "show tables"
            n = cursor.execute(sql)
            listTb = [row[0] for row in cursor.fetchall()
                      if row[0] not in (
                          'test')]
        except MySQLdb.Warning as w:
            raise Exception(w)
        except MySQLdb.Error as e:
            raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return listTb

    # 连进指定的mysql实例里,读取所有Columns并返回
    def getAllColumnsByTb(self, masterHost, masterPort, masterUser, masterPassword, dbName, tbName):
        listCol = []
        conn = None
        cursor = None

        try:
            conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                   charset='utf8')
            cursor = conn.cursor()
            sql = "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s';" % (
                dbName, tbName)
            n = cursor.execute(sql)
            listCol = [row[0] for row in cursor.fetchall()]
        except MySQLdb.Warning as w:
            raise Exception(w)
        except MySQLdb.Error as e:
            raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return listCol

    # 连进指定的mysql实例里,执行sql并返回
    def mysql_query(self, masterHost, masterPort, masterUser, masterPassword, dbName, sql, limit_num=0):
        result = {'column_list': [], 'rows': [], 'effect_row': 0}
        conn = None
        cursor = None

        try:
            conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                   charset='utf8')
            cursor = conn.cursor()
            effect_row = cursor.execute(sql)
            if int(limit_num) > 0:
                rows = cursor.fetchmany(size=int(limit_num))
            else:
                rows = cursor.fetchall()
            fields = cursor.description

            column_list = []
            if fields:
                for i in fields:
                    column_list.append(i[0])
            result = {}
            result['column_list'] = column_list
            result['rows'] = rows
            result['effect_row'] = effect_row

        except MySQLdb.Warning as w:
            logger.warning(str(w))
            result['Warning'] = str(w)
        except MySQLdb.Error as e:
            logger.error(str(e))
            result['Error'] = str(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                try:
                    conn.rollback()
                    conn.close()
                except:
                    conn.close()
        return result

    # 连进指定的mysql实例里,执行sql并返回
    def mysql_execute(self, masterHost, masterPort, masterUser, masterPassword, dbName, sql):
        result = {}
        conn = None
        cursor = None

        try:
            conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                   charset='utf8')
            cursor = conn.cursor()
            effect_row = cursor.execute(sql)
            # result = {}
            # result['effect_row'] = effect_row
            conn.commit()
        except MySQLdb.Warning as w:
            logger.warning(str(w))
            result['Warning'] = str(w)
        except MySQLdb.Error as e:
            logger.error(str(e))
            result['Error'] = str(e)
        finally:
            if result.get('Error') or result.get('Warning'):
                conn.close()
            elif cursor is not None:
                cursor.close()
                conn.close()
        return result

    def getWorkChartsByMonth(self):
        cursor = connection.cursor()
        sql = "select date_format(create_time, '%%m-%%d'),count(*) from sql_workflow where create_time>=date_add(now(),interval -%s day) group by date_format(create_time, '%%m-%%d') order by 1 asc;" % (
            Dao._CHART_DAYS)
        cursor.execute(sql)
        result = cursor.fetchall()
        return result

    def getWorkChartsByPerson(self):
        cursor = connection.cursor()
        sql = "select engineer, count(*) as cnt from sql_workflow where create_time>=date_add(now(),interval -%s day) group by engineer order by cnt desc limit 50;" % (
            Dao._CHART_DAYS)
        cursor.execute(sql)
        result = cursor.fetchall()
        return result


    # 取出otter中的频道、管道、cannal相关信息
    def get_otter_pipeline_infos(self, ipaddress, mysqlport, username, password,dbname):
        otter_pipeline_info_dict_list = []
        conn = None
        cursor = None

        try:
            conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,db=dbname,
                                   charset='utf8', connect_timeout=10, cursorclass=MySQLdb.cursors.DictCursor)
            cursor = conn.cursor()
            sql = """SELECT c.ID channelid ,c.`NAME` channelname ,p.ID pipelineid ,p.`NAME` pipelinename ,p.PARAMETERS pipelineparams
FROM CHANNEL c,PIPELINE p
where c.id = p.CHANNEL_ID
order by c.ID
            """
            n = cursor.execute(sql)
            for row in cursor.fetchall():
                otter_pipeline_info_dict={}
                otter_pipeline_info_dict["channelid"] = row['channelid']
                otter_pipeline_info_dict["channelname"] = row['channelname']
                otter_pipeline_info_dict["pipelineid"] = row['pipelineid']
                otter_pipeline_info_dict["pipelinename"] = row['pipelinename']
                pipelineparams=row['pipelineparams']
                jsonmsg=json.loads(pipelineparams)
                canalname=jsonmsg['destinationName']
                otter_pipeline_info_dict["canalname"] = canalname
                otter_pipeline_info_dict_list.append(otter_pipeline_info_dict)
        except MySQLdb.Warning as w:
            traceback.print_exc()
            #raise Exception(w)
        except MySQLdb.Error as e:
            traceback.print_exc()
            #raise Exception(e)
        finally:
            if cursor is not None:
                cursor.close()
            if conn is not None:
                conn.commit()
                conn.close()
        return otter_pipeline_info_dict_list
MySQLDB dao封装

  

https://www.cnblogs.com/wardensky/p/4783010.html

#coding=utf-8 
#!/usr/bin/python

import pymysql


class MYSQL:
    """
    对pymysql的简单封装
    """
    def __init__(self,host,user,pwd,db):
        self.host = host
        self.user = user
        self.pwd = pwd
        self.db = db

    def __GetConnect(self):
        """
        得到连接信息
        返回: conn.cursor()
        """
        if not self.db:
            raise(NameError,"没有设置数据库信息")
        self.conn = pymysql.connect(host=self.host,user=self.user,password=self.pwd,database=self.db,charset="utf8")
        cur = self.conn.cursor()
        if not cur:
            raise(NameError,"连接数据库失败")
        else:
            return cur

    def ExecQuery(self,sql):
        """
        执行查询语句
        返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段

        调用示例:
                ms = MYSQL(host="localhost",user="sa",pwd="123456",db="PythonWeiboStatistics")
                resList = ms.ExecQuery("SELECT id,NickName FROM WeiBoUser")
                for (id,NickName) in resList:
                    print str(id),NickName
        """
        cur = self.__GetConnect()
        cur.execute(sql)
        resList = cur.fetchall()

        #查询完毕后必须关闭连接
        self.conn.close()
        return resList

    def ExecNonQuery(self,sql):
        """
        执行非查询语句

        调用示例:
            cur = self.__GetConnect()
            cur.execute(sql)
            self.conn.commit()
            self.conn.close()
        """
        cur = self.__GetConnect()
        cur.execute(sql)
        self.conn.commit()
        self.conn.close()

def main():

    mysql = MYSQL(host="192.168.163.36",user="wisdomhr",pwd="wisdomhr",db="WISDOMHR")
    resList = mysql.ExecQuery("SELECT CITY FROM RES_SCHOOL")
    for inst in resList:
        print(inst)
if __name__ == '__main__':
    main()
# 调用:
#!/usr/bin/python

#version 3.4

import wispymysql

mysql = wispymysql.MYSQL(host="192.168.163.36",user="wisdomhr",pwd="wisdomhr",db="WISDOMHR")
selectsql = "SELECT ID, CITY FROM RES_SCHOOL WHERE CITY LIKE '%
%'"
result = mysql.ExecQuery(selectsql)

for (dbid, city) in result:
    rightcity = city.replace('
','')
    updatesql= "UPDATE RES_SCHOOL SET CITY = '" + rightcity + "' WHERE ID = " + str(dbid)
    print(updatesql)
    mysql.ExecNonQuery(updatesql)

http://justcode.ikeepstudying.com/2019/01/python%EF%BC%9Amysql%E8%BF%9E%E5%BA%93%E5%8F%8A%E7%AE%80%E5%8D%95%E5%B0%81%E8%A3%85%E4%BD%BF%E7%94%A8-python-mysql%E6%93%8D%E4%BD%9C%E7%B1%BB/

原文地址:https://www.cnblogs.com/yum777/p/11676806.html