python操作数据库

说说python操作mysql数据库,python要操作数据库首先导入pymysql这个模块,然后跟平时用工具连接数据库差不多,第1步,首先创建连接,调用connect方法传入host,即mysql地址,
port,端口号,然后数据库的用户名和密码,最后再传入,要操作的数据库名称。第二步,获取游标对象,调用创建好的连接来获取游标,游标是什么?游标是用来存储查询数据的结果。
第3步,调用游标里面的execute方法执行sql语句,返回的是执行完sql语句受影响的行数int类型。第4步,fetchone每次只显示一条结果,可以循环遍历查询到的结果,fetchall是获取全部的执行结果,返回的对象是一个元祖。最后是关闭游标和关闭连接。

查询demo:

import pymysql

# TODO SQL注入写法: ' or 1=1 or '1 , ' or '1'='1

class Student():
    def __init__(self):
        self.host = 'localhost'
        self.port = 3306
        self.user = 'root'
        self.password = 'root'
        self.database = 'school'
        self.charset = 'utf8'

    def show_student_info(self):
        '''显示所以学生信息'''
        # todo 创建连接
        conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset=self.charset)

        # todo 获取游标对象
        cursor = conn.cursor()

        # todo sql查询和执行
        sql = ('select * from student')
        cursor.execute(sql)

        # todo 循环获取数据
        for temp in cursor.fetchall():
            print(temp)

        # todo 关闭游标和连接
        cursor.close()
        conn.close()


    def show_address(self):
        '''查询所有的家庭住址'''
        # todo 创建连接
        conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,database=self.database, charset=self.charset)
        # todo 获取游标对象
        obj = conn.cursor()

        # todo sql查询和执行
        sql = ('select address from student')
        obj.execute(sql)

        # todo 循环获取数据
        for temp in obj.fetchall():
            print(temp)

        # todo 关闭游标和连接
        obj.close()
        conn.close()

    def show_id_name(self):
        '''查询所有的家庭住址'''
        # todo 创建连接
        conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,database=self.database, charset=self.charset)

        # todo 获取游标对象
        obj = conn.cursor()

        # todo sql查询和执行
        sql = ('select id,name from student')
        obj.execute(sql)

        # todo 循环获取数据
        for temp in obj.fetchall():
            print(temp)

        # todo 关闭游标和连接
        obj.close()
        conn.close()

    def get_name_info(self):
        '''根据学生姓名查询学生信息'''
        conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,database=self.database, charset=self.charset)

        # todo 获取游标对象
        obj = conn.cursor()

        # todo 输入学生姓名
        find_name = input('请输入学生姓名:')

        # todo sql查询和执行
        sql = "select * from student where name='{0}'".format(find_name)
        print(sql)
        obj.execute(sql)

        # todo 循环获取数据
        for temp in obj.fetchall():
            print(temp)

        # todo 关闭游标和连接
        obj.close()
        conn.close()

    def print_option(self):
        '''选项信息'''
        print('------学生信息表------')
        print('1: 查询所有的学生信息')
        print('2: 查询所有的家庭住址')
        print('3: 查询所以的学生学号和名字')
        print('4: 根据名字查询学生信息')
        option = input('请输入对应功能的序号:')
        print('------显示如下------')
        return option

    def run(self):
        '''循环调用'''
        while True:
            option = self.print_option()

            if option=="1":
                self.show_student_info()

            elif option=="2":
                self.show_address()

            elif option=="3":
                self.show_id_name()
            elif option=="4":
                self.get_name_info()

            else:
                print('输入有误,请重新输入')

def main():
    # todo 创建一个学生对象
    sd = Student()
    # todo 调用执行
    sd.run()


if __name__ == '__main__':
    main()

 工具类:

#! /usr/bin/env python3
# -*- coding: UTF-8 -*-

"""
@Time: 2019/12/31 19:43
@Author: hengxin
"""

import pymysql
import datetime
import decimal
import json
from tools.Log import Log
from tools.Config import Config


class DataBaseOperate(object):
    L = Log("DataBaseOperate")
    C = Config()

    def operate(self, sql):
        db = pymysql.connect(host=self.C.DB_HOST,
                             port=self.C.DB_PORT,
                             user=self.C.DB_USER,
                             passwd=self.C.DB_PWD)
        db.ping(reconnect=True)
        con = db.cursor(cursor=pymysql.cursors.DictCursor)
        try:
            # 此处新增 单次连接执行多条SQL的功能, 兼容书写时首尾多输入空格的情况
            sql_list = sql.strip().split(";")
            try:
                # 此处兼容以分号结尾的单句SQL仍返回一维列表
                sql_list.remove('')
            except ValueError as e:
                self.L.logger.error(e)
                raise Exception("SQL请以分号 ; 结束")
            if len(sql_list) < 2:
                con.execute(sql)
                self.L.logger.info('\n\t' + sql)
                effect_row = con.rowcount
                if sql.lower().startswith('select'):
                    self.L.logger.info('\n\t' + sql)
                    # if effect_row != 1:
                    #     self.L.logger.info(sql)
                    # else:
                    #     pass
                    self.L.logger.info("影响行数 %s" % effect_row)
                else:
                    pass
                results = con.fetchall()
                db.commit()
                # print(results)
                for result in results:
                    for fields in result:
                        if isinstance(result[fields], datetime.datetime):
                            result[fields] = str(result[fields].strftime('%Y-%m-%d %H:%M:%S'))
                        elif isinstance(result[fields], datetime.date):
                            result[fields] = str(result[fields].strftime('%Y-%m-%d'))
                        elif isinstance(result[fields], decimal.Decimal):
                            result[fields] = float(result[fields])
            else:
                results = []
                for sql in sql_list:
                    if sql != '':
                        con.execute(sql)
                        self.L.logger.info(sql)
                        effect_row = con.rowcount
                        if sql.lower().startswith('select'):
                            self.L.logger.info('\n\t' + sql)
                            # if effect_row != 1:
                            #     self.L.logger.info(sql)
                            # else:
                            #     pass
                        else:
                            pass
                        self.L.logger.info("影响行数 %s" % effect_row)
                        results.append(con.fetchall())
                        db.commit()
                    else:
                        pass
                for result in results:
                    for r in result:
                        for fields in r:
                            if isinstance(r[fields], datetime.datetime):
                                r[fields] = str(r[fields].strftime('%Y-%m-%d %H:%M:%S'))
                            elif isinstance(r[fields], datetime.date):
                                r[fields] = str(r[fields].strftime('%Y-%m-%d'))
                            elif isinstance(r[fields], decimal.Decimal):
                                r[fields] = float(r[fields])
            con.close()
            # if sql.lower().startswith('select'):
            self.L.logger.debug("\n" + json.dumps(results, ensure_ascii=False,
                                                  sort_keys=True, indent=2, separators=(',', ': ')))
            # else:
            #     pass
            return results
        except Exception as e:
            db.rollback()
            self.L.logger.error(e)
            raise KeyError(e)


原文地址:https://www.cnblogs.com/xiamaojjie/p/11375998.html