python 实现数据库中数据添加、查询与更新

一、前言

  最近做web网站的测试,遇到很多需要批量造数据的功能;比如某个页面展示数据条数需要达到10000条进行测试,此时手动构造数据肯定是不可能的,此时只能通过python脚本进行自动构造数据;本次构造数据主要涉及到在某个表里面批量添加数据、在关联的几个表中同步批量添加数据、批量查询某个表中符合条件的数据、批量更新某个表中符合条件的数据等。  

二、数据添加

  即批量添加数据到某个表中。

insert_data.py

import pymysql
import random
import time
from get_userinfo import get_userinfo
from get_info import get_info
from get_tags import get_tags
from get_tuser_id import get_utag


class DatabaseAccess():
    def __init__(self):
        self.__db_host = "xxxxx"
        self.__db_port = 3307
        self.__db_user = "root"
        self.__db_password = "123456"
        self.__db_database = "xxxxxx"
    # 连接数据库
    def isConnectionOpen(self):
        self.__db = pymysql.connect(
            host=self.__db_host,
            port=self.__db_port,
            user=self.__db_user,
            password=self.__db_password,
            database=self.__db_database,
            charset='utf8'
        )
    
    # 插入数据
    def linesinsert(self,n,user_id,tags_id,created_at):
 
        self.isConnectionOpen()
        # 创建游标
        global cursor
        conn = self.__db.cursor()
        try:
            sql1 = '''
            INSERT INTO `codeforge_new`.`cf_user_tag`(`id`, `user_id`, 
            `tag_id`, `created_at`, `updated_at`) VALUES ({}, {}, 
            {}, '{}', '{}');
            '''.format(n,user_id,tags_id,created_at,created_at)
            
            # 执行SQL   
            conn.execute(sql1,)
        except Exception as e:
            print(e)
        finally:
            # 关闭游标
            conn.close()
            self.__db.commit()
            
            self.__db.close()
    
    def get_data(self):
        
        # 生成对应数据 1000条
        for i in range(0,1001):
            created_at = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
            # print(create_at)
            # 用户id
            tuserids = []
            tuserid_list = get_utag()
            for tuserid in tuserid_list:
                tuserids.append(tuserid[0])
            # print(tuserids)
            userid_list = get_userinfo()
            user_id = random.choice(userid_list)[0]
            if user_id not in tuserids:
                user_id=user_id
            
                # 标签id
                tagsid_list = get_tags()
                tags_id = random.choice(tagsid_list)[0]
                self.linesinsert(i,user_id,tags_id,created_at)


if __name__ == "__main__":
    # 实例化对象
    db=DatabaseAccess()
    db.get_data()

二、数据批量查询

select_data.py


import
pymysql import pandas as pd import numpy as np def get_tags(): # 连接数据库,地址,端口,用户名,密码,数据库名称,数据格式 conn = pymysql.connect(host='xxx.xxx.xxx.xxx',port=3307,user='root',passwd='123456',db='xxxx',charset='utf8') cur = conn.cursor() # 表cf_users中获取所有用户id sql = 'select id from cf_tags where id between 204 and 298' # 将user_id列转成列表输出 df = pd.read_sql(sql,con=conn) # 先使用array()将DataFrame转换一下 df1 = np.array(df) # 再将转换后的数据用tolist()转成列表 df2 = df1.tolist() # cur.execute(sql) # data = cur.fetchone() # print(df) # print(df1) # print(df2) return df2 conn.close()

三、批量更新数据

update_data.py

import
pymysql import random def update_comment(num,code_id): #连接数据库 db = pymysql.connect(host='xxx.xxx.xxx.xxx',port=3307,user='root',passwd='123456',db='xxx',charset='utf8') #定义游标 cursor = db.cursor() try: update = "update cf_codes set score_num={} where id={}".format(num,code_id) # print(update) cursor.execute(update) print('数据更新成功') # 提交数据 db.commit() except: print('数据更新失败') # db.rollback() cursor.close() db.close()
原文地址:https://www.cnblogs.com/lxmtx/p/14089780.html