[mysql]pymysql插入500万数据

pymysql插入500万数据

需求说明

吸取mysql怎么快速插入500万行数据?(效率要高)

分析

  • (1)使用pymysql多行插入(提高效率)

  • (2)使用python协程(遇到I/O操作就切换任务,无需等待--提高效率)

建表

# 建库
create database mydb01;
# 使用库
use mydb01;
# 建表
create table tbl_students(
    id int primary key auto_increment,
    name varchar(20) unique,
    gender varchar(6),
    email varchar(40)
); 

image-20201109175156874

代码耗时130秒

#!/usr/bin/env python
# coding: utf-8
import pymysql
import gevent
import time


class DBUtil:
    def __init__(self, host, port, username, password, db, charset='utf8'):
        self.host = host  # mysql主机地址
        self.port = port  # mysql端口
        self.username = username  # mysql远程连接用户名
        self.password = password  # mysql远程连接密码
        self.db = db  # mysql使用的数据库名
        self.charset = charset  # mysql使用的字符编码,默认为utf8
        self.connect()  # __init__初始化之后,执行的函数

    def connect(self):
        # pymysql连接mysql数据库
        # 需要的参数host,port,user,password,db,charset
        self.conn = pymysql.connect(host=self.host,
                                    port=self.port,
                                    user=self.username,
                                    password=self.password,
                                    db=self.db,
                                    charset=self.charset
                                    )
        # 连接mysql后执行协程方法
        self.asynchronous()

    #线程
    def run(self, nmin, nmax):
        # 创建游标
        self.cur = self.conn.cursor()

        # 定义sql语句,插入数据id,name,gender,email
        sql = "insert into tbl_students(id,name,gender,email) values (%s,%s,%s,%s)"

        # 定义总插入行数为一个空列表
        data_list = []
        for i in range(nmin, nmax):
            # 添加所有任务到总的任务列表
            result = (i, 'mayun' + str(i), 'male', 'mayun' + str(i) + '@qq.com')
            data_list.append(result)

        # 执行多行插入,executemany(sql语句,数据(需一个元组类型))
        content = self.cur.executemany(sql, data_list)
        if content:
            print(f'成功插入第{nmax - 1}条数据')

        # 提交数据,必须提交,不然数据不会保存
        self.conn.commit()

    def asynchronous(self):
        # g_l 任务列表
        # 定义了异步的函数: 这里用到了一个gevent.spawn方法
        max_line = 10000  # 定义每次最大插入行数(max_line=10000,即一次插入10000行)
        g_l = [gevent.spawn(self.run, i, i + max_line) for i in range(1, 5000001, max_line)]

        # gevent.joinall 等待所以操作都执行完毕
        gevent.joinall(g_l)
        self.cur.close()  # 关闭游标
        self.conn.close()  # 关闭pymysql连接


if __name__ == '__main__':
    start_time = time.time()  # 计算程序开始时间
    st = DBUtil('127.0.0.1', 3306,'root','root','mydb01','utf8')  # 实例化类,传入必要参数
    print('程序耗时{:.2f}'.format(time.time() - start_time))  # 计算程序总耗时
  • 耗时

image-20201109174806833

  • 结果查询

image-20201109174911228

500万数据耗时130秒

更多交流,请加QQ:390351113.请备注好友来自博客园<IT自学吧>
原文地址:https://www.cnblogs.com/itzixueba/p/14034724.html