连接mysql数据库

封装pymysql, 代码展示:

# encoding:utf-8
import pymysql.cursors




class MysqlOperation(object):
    def __init__(self, config):
        self.connection = pymysql.connect(host=config['mysql_host'],
                                          port=config['mysql_port'],
                                          user=config['mysql_user'],
                                          # pymysql直接连接是passwd,用连接池连接是password
                                          passwd=config['mysql_passwd'],
                                          db=config['mysql_db'],
                                          charset='utf8',
                                          cursorclass=pymysql.cursors.DictCursor
                                          )
    
    def read_sql(self, sql):
        with self.connection.cursor() as cursor:
            try:
                cursor.execute(sql)
                result = cursor.fetchall()
                return result
            except Exception as e:
                self.connection.rollback()  # 回滚
                print('事务失败', e)
    
    def insert_sql(self, sql):
        with self.connection.cursor() as cursor:
            try:
                cursor.execute(sql)
                self.connection.commit()
            except Exception as e:
                self.connection.rollback()
                print('事务失败', e)
    
    def update_sql(self, sql):
        # sql_update ="update user set username = '%s' where id = %d"
        
        with self.connection.cursor() as cursor:
            try:
                cursor.execute(sql)  # 像sql语句传递参数
                # 提交
                self.connection.commit()
            except Exception as e:
                # 错误回滚
                self.connection.rollback()
    
    def delect_sql(self, sql_delete):
        with self.connection.cursor() as cursor:
            try:
                cursor.execute(sql_delete)  # 像sql语句传递参数
                # 提交
                self.connection.commit()
            except Exception as e:
                # 错误回滚
                self.connection.rollback()
    
    def read_one(self, sql):
        with self.connection.cursor() as cursor:
            try:
                cursor.execute(sql)
                result = cursor.fetchone()
                return result
            except Exception as e:
                self.connection.rollback()  # 回滚
                print('事务失败', e)
    
    def reConnect(self):
        try:
            self.connection.ping()
        except:
            self.connection()

    def callpro_sql(self, proc_name, *args):
        with self.connection.cursor(self.connection.cursorclass) as cursor:
            try:
                cursor.callproc(proc_name, args)  # 传递存储过程名和参数
                self.connection.commit()           # 提交
                print('调用存储过程结束')
            except Exception as e:
                self.connection.rollback()  # 回滚
                print('事务失败', e)
原文地址:https://www.cnblogs.com/qianslup/p/13176072.html