python封装mysq操作,进行数据库的增删改

python操作mysql进行封装,封装的好处我就不提了,以下是我做项目时的一个封装,大家可以根据实际需要进行自己的一个封装

我封装的内容:

 1.数据库的配置文件

2.获取数据配置文件的地址

3.连接数据库的操作

4.操作mysql的语句

5.调用mysql的语句--执行

 封装1:数据库配置文件

config中db_conf.ini

里面主要是一些mysql的配置文件

[mysqlconf]
host=127.0.0.1
port=3306
user=root
password=123456
db_name=数据库名

封装2:读取配置文件地址public_data.py

import os
# 整个项目的根目录绝对路劲
baseDir = os.path.dirname(os.path.dirname(__file__))

# 数据库配置你文件绝对路径
config_path = baseDir + "/config/db_config.ini"
print(config_path)

  打印目录地址:

 封装3:数据库的连接 config_handler.py

# -*- coding:utf-8 -*-
#@Time : 2020/6/14 11:01
#@Author: 张君
#@File : config_handler.py

from configparser import ConfigParser
from config.public_data import config_path

class ConfigParse(object):
    def __init__(self):
        pass

    @classmethod
    def get_db_config(cls):
        #cls使用的类方法,cls就是指定本身

        cls.cfp = ConfigParser()
        cls.cfp.read(config_path)
        host = cls.cfp.get("mysqlconf", "host")
        port = cls.cfp.get("mysqlconf", "port")
        user = cls.cfp.get("mysqlconf", "user")
        password = cls.cfp.get("mysqlconf", "password")
        db = cls.cfp.get("mysqlconf", "db_name")
        return {"host":host, "port":port, "user":user, "password":password,"db":db}


if __name__ == '__main__':
    cp=ConfigParse()
    result=cp.get_db_config()
    print(result)

  

 封装4:sql语句的封装  db_handle.py

import datetime

import pymysql
from utils.config_handler import ConfigParse

class DB(object):
    def __init__(self):
        #获取mysql连接信息
        self.db_conf = ConfigParse.get_db_config()
        #获取连接对象
        self.conn = pymysql.connect(
            host = self.db_conf["host"],
            port = int(self.db_conf["port"]),
            user = self.db_conf["user"],
            password = self.db_conf["password"],
            database = self.db_conf["db"],
            charset = "utf8"
        )
        #获取数据的游标
        self.cur = self.conn.cursor()

    def close_connect(self):
        # 关闭数据连接
        #提交,物理存储
        self.conn.commit()
        #游标关闭
        self.cur.close()
        #连接对象关闭
        self.conn.close()

    def get_api_list(self):
        """获取所有的对象数据"""
        sqlStr = "select * from interface_api where status=1"
        self.cur.execute(sqlStr)
        data = self.cur.fetchall()
        #转成一个list
        apiList = list(data)
        return apiList

    def get_api_case(self, api_id):
        """获取某一条数据"""
        sqlStr = "select * from interface_test_case where api_id=%s and status=1" %api_id
        self.cur.execute(sqlStr)
        api_case_list = list(self.cur.fetchall())
        return api_case_list

    def get_rely_data(self, api_id, case_id):
        """获取所有数据库中的某一条"""
        sqlStr = "select data_store from interface_data_store where api_id=%s and case_id=%s" %(api_id, case_id)
        self.cur.execute(sqlStr)
        rely_data = eval(self.cur.fetchall()[0][0])
        return rely_data

    def write_check_result(self, case_id, errorInfo, res_data):
        """更新数据库表"""
        sqlStr = "update interface_test_case set error_info="%s", res_data="%s" where id=%s" %(errorInfo, res_data, case_id)
        self.cur.execute(sqlStr)
        self.conn.commit()
    def insert_dab(self):
        sqlStr="INSERT INTO `interface_api` VALUES (4, '修改博文', 'http://39.106.41.11:8080/getBlogContent/', 'get', 'url','0', '2018-07-27 22:13:30')"
        self.cur.execute(sqlStr)

    def get_api_case(self, api_id):
        """获取表中id"""
        sqlStr = "select * from interface_test_case where api_id=%s and status=1" % api_id
        self.cur.execute(sqlStr)
        api_case_list = list(self.cur.fetchall())
        return api_case_list

    def get_api_id(self, api_name):
        """获取表中id"""
        sqlStr = "select api_id from interface_api where api_name='%s'" % api_name
        self.cur.execute(sqlStr)
        api_id = self.cur.fetchall()[0][0]
        return api_id

    def update_store_data(self, api_id, case_id, store_data):
        
        sqlStr = "select data_store from interface_data_store where api_id=%s and case_id=%s" % (api_id, case_id)
        self.cur.execute(sqlStr)
        if self.cur.fetchall():
            sqlStr = "update interface_data_store set data_store="%s" where api_id=%s and case_id=%s" % (
            store_data, api_id, case_id)
            print(sqlStr)
            self.cur.execute(sqlStr)
            self.conn.commit()
        else:
            sqlStr = "insert into interface_data_store values(%s, %s, "%s", '%s')" % (
            api_id, case_id, store_data, datetime.now())
            self.cur.execute(sqlStr)
            self.conn.commit()

if __name__ == '__main__':
    db=DB()
    print(db.get_api_list())
    print(db.get_api_case(1))
    # print(db.get_rely_data(1,1))
    #print(db.insert_dab())

  

 使用ing,我新建了一个test.py文件

from utils.db_handler import DB
from action.get_rely import GetRely
from utils.HttpClient import HttpClient

def main():
    #连接数据库,获取连接实例对象
    db=DB()
    #从数据库中获取需要执行的api执行集合

    api_list=db.get_api_list()
    print(api_list)

main()

  

 至此一整套封装,与使用就完成了,为什么要分为几个文件夹,我遵循的一个原则, 使用、配置、公共方法是用不的文件夹整理,其实最终也就是你使用怎么方便,维护比较好,就怎么用

原文地址:https://www.cnblogs.com/chongyou/p/13124269.html