Python 连接池 结合 多线程操作 MySQL批量生成数据

Python 连接池 结合 多线程操作 MySQL

代码

import os, threading
from DBUtils.PooledDB import PooledDB
import pymysql, random, time
from queue import Queue


# from twisted.enterprise import adbapi
# from twisted.internet import reactor

# 创建一个有10个连接的mysql连接池.创建并维持10个线程并发写入1000000条随机数据到test.stu表中

class Test(threading.Thread):
    def __init__(self, n):
        super(Test, self).__init__()
        mysql_conf = {
            "host": "127.0.0.1",
            "user": "root",
            "passwd": "123456",
            "charset": "utf8",
            "db": "ep_mall",
            "cursorclass": pymysql.cursors.DictCursor
        }

        # 创建一个连接池,连接池初始最多容纳和创建25个连接,当连接池没有可用连接则阻塞
        # 使用连接池可以进行长连接,无需每次操作mysql时都建立连接,节省了建立连接的时间
        self.n = n
        self.pool = PooledDB(pymysql, maxconnections=25, blocking=True, **mysql_conf)
        self.alpha = list("qwertyuiopasdfghjklzxcvbnm")
        self.sex = ["m", "s"]

    def run(self):
        print("%s号线程开始任务" % self.n)
        sql = "insert into cc_test values (%s,%s,%s,%s,%s)"

        # 获取连接
        conn = self.pool.connection()
        cursor = conn.cursor()

        data_set = []
        try:
            for i in range(100):
                name = "".join(random.sample(self.alpha, 5))
                sex = random.choice(self.sex)
                age = random.randint(10, 60)
                classid = random.randint(1000, 9999)
                data_set.append((None, name, sex, age, classid))

            cursor.executemany(sql, data_set)  # 批量操作,提高效率
            conn.commit()
            print("%s号线程完成任务" % self.n)
        except:
            # 如果出现错误,要回滚
            conn.rollback()
            print("%s号线程任务失败" % self.n)
        finally:
            # 无论插入成功还是失败,记得将连接放回连接池供其他线程使用,否则该线程会一直被占用
            cursor.close()
            conn.close()  # 执行完sql操作后,将连接放回连接池,而不是真的关闭连接.如果不放回连接池,则该连接一直处于占用状态,其他线程就无法使用该连接


# 最多创建10个线程并发执行
start_time = time.time()
thread_list = []  # 创建线程池
for i in range(1000):
    thread_list.append(Test(i + 1))
    if len(thread_list) >= 10:  # 当列表中的线程有10个,就开始执行10个线程
        print(len(thread_list))
        print(i)
        for thread in thread_list:
            thread.start()

        for thread in thread_list:
            thread.join()  # 10个线程都等待执行完,也就是说,10个线程有一个线程没运行完就不能往下执行代码;
            # 这里会阻塞后面的thread_list=[]和print。但是多个线程间的join和join不会阻塞,
            # 也就是说执行完一个join还可以马上执行下一个join,但是执行完最后一个join不能马上执行 thread_list=[]

        thread_list = []  # 当所有线程运行完清空线程池

print("总共用时:" + str(time.time() - start_time))
原文地址:https://www.cnblogs.com/securitybob/p/13614328.html