python 数据库连接池

数据库连接池:
from DBUtils.PooledDB import PooledDB
import pymysql

Release_Write_database_setting = {
    "host": '192.168.32.6',
    "port": 31306,
    "user": 'root',
    "password": 'xxx',
    "database": 'tornado',
    "charset": 'utf8',
    "cursorclass": pymysql.cursors.DictCursor
}

Release_Query_database_setting = {
    "host": '192.168.32.8',
    "port": 31306,
    "user": 'root',
    "password": 'xxx',
    "database": 'tornado',
    "charset": 'utf8',
    "cursorclass": pymysql.cursors.DictCursor
}

Development_Write_database_setting = {
    "host" : '192.168.185.4',
    "port" : 31306,
    "user" : 'root',
    "password" : 'xxx',
    "database" :  'tornado20200708quan',
    # "database" :  'tornado',
    "charset" : 'utf8',
    "cursorclass" : pymysql.cursors.DictCursor
}

Development_Query_database_setting = {
    "host" : '192.168.185.5',
    "port" : 31306,
    "user" : 'root',
    "password" : 'xxx',
    "database" :  'tornado20200708quan',
    "charset" : 'utf8',
    "cursorclass" : pymysql.cursors.DictCursor
}
# 发布时更改
Query_database_setting = Development_Query_database_setting
Write_database_setting = Development_Write_database_setting


class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
            return cls._instances[cls]

class DBpool():
    __metaclass__ = Singleton
    _pool = None
    def __init__(self):
        # 密码、 用户名相关 等
        self._pool = PooledDB(
            creator=pymysql,
            mincached=1,
            maxcached=20,
            **Query_database_setting
        )
        self._conn = None
        self._cursor = None
        self.getCoon()

    def getCoon(self):
        return self._pool.connection()


    # def getCoon(self):
    #     self._conn = self._pool.connection()
    #     self._cursor = self._conn.cursor()
    #     # return self._pool.connection()

    # def execut(self, sql, param=()):
    #     count = self._cursor.execute(sql, param)
    #     return count

参考 1:普通数据库连接 对比 数据库连接池
 

不用数据库连接池的写法:

import MySQLdb
conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
cur=conn.cursor()
SQL="select * from table1"
r=cur.execute(SQL)
r=cur.fetchall()
cur.close()
conn.close()
 

使用 PooledDB

from DBUtils.PooledDB import PooledDB
import pymysql
import xlwt
import os
import threading
import zipfile


class CreateExcel(object):
"""
查询数据库,并生成excel 文档
"""
def __init__(self, mysql_info):
self.mysql_info = mysql_info
self.pool = PooledDB(pymysql, 5, host=self.mysql_info['host'], user=self.mysql_info['user'],
password=self.mysql_info['password'], db=self.mysql_info['db'], port=self.mysql_info['port'],
charset='utf-8')
连接参数定义:

1. mincached,最少的空闲连接数,如果空闲连接数小于这个数,pool会创建一个新的连接
2. maxcached,最大的空闲连接数,如果空闲连接数大于这个数,pool会关闭空闲连接
3. maxconnections,最大的连接数,
4. blocking,当连接数达到最大的连接数时,在请求连接的时候,如果这个值是True,请求连接的程序会一直等待,直到当前连接数小于最大连接数,如果这个值是False,会报错,
5. maxshared 当连接数达到这个数,新请求的连接会分享已经分配出去的连接
 

这里的 5  应该是默认的连接数吧

在uwsgi中,每个http请求都会分发给一个进程,连接池中配置的连接数都是一个进程为单位的(即上面的最大连接数,都是在一个进程中的连接数),而如果业务中,一个http请求中需要的sql连接数不是很多的话(其实大多数都只需要创建一个连接),配置的连接数配置都不需要太大。
连接池对性能的提升表现在:
1.在程序创建连接的时候,可以从一个空闲的连接中获取,不需要重新初始化连接,提升获取连接的速度
2.关闭连接的时候,把连接放回连接池,而不是真正的关闭,所以可以减少频繁地打开和关闭连接
参考 2 : PooledDB参数解释
 

版本环境 python 3.7 DBUtils 1.3 mysqlclient 1.4.6 连接池初始化

pool = PooledDB(creator=MySQLdb, mincached=0, maxcached=0,
maxshared=0, maxconnections=0,
blocking=False,maxusage=None,
setsession=None, reset=True,
failures=None, ping=1,
*args, **kwargs)
参数说明 creator

#creator => 任何符合DB-API 2.0规范的函数或者兼容的数据库模块
mincached

#mincached => 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached

#maxcached => 链接池中最大闲置的链接数(0和None不限制)
maxshared

#maxshared => maximum number of shared connections
(0 or None means all connections are dedicated)
When this maximum number is reached, connections are
shared if they have been requested as shareable
maxconnections

#maxconnections => 允许的最大链接数(0或None表示不限制)
blocking

#blocking => 链接池没有可用链接后,是否阻塞等待。
True表示阻塞等待,直到获取到链接;
False不等待,抛异常退出
maxusage

#maxusage => 同一个链接最多被重复使用的次数(0和None表示无限制)
setsession

#setsession => 可选的会话命令:开始会话前执行的命令列表。
例如["set datestyle to…","set time zone…"]
reset

#reset => 当连接放回池中时,重置连接的方式,默认为True。
False或者None表示使用begin()开启了事务的链接,会执行回滚;
安全起见,建议使用True,当为True时表示所有链接都执行回滚操作
failures

#failures => 当默认的(OperationalError,InternalError)异常不能满足要求时,
可以自定义抛出异常:默认为None;
自定义为传入的为tuple或者issubclass(failures, Exception)
ping

#ping => 检查连接是否仍然处于活动状态的方式
0 = None = never,
1 = default = whenever fetched from the pool,
2 = when a cursor is created,
4 = when a query is executed,
7 = always, and all other bit combinations of these values
args, kwargs

#args, kwargs => 传递给creator的参数
使用

# -*- coding: utf-8 -*-
# @Time : 2020/1/26 0026 20:28
# @Email : lofish@foxmail.com(撸小鱼)

import MySQLdb
import MySQLdb.cursors
from DBUtils.PooledDB import PooledDB
import datetime


class DbManager(object):

def __init__(self, host, port, db_name, user_name, password):
cmds = ["set names utf8mb4;"]
conn_args = {'host': host,
'port': port,
'db': db_name,
'user': user_name,
'passwd': password,
'charset': 'utf8',
'cursorclass': MySQLdb.cursors.DictCursor
}
# 初始化时,链接池中至少创建的空闲的链接,0表示不创建,mincached: 5
# 链接池中最大闲置的链接数(0和None不限制): 20
self._pool = PooledDB(MySQLdb, mincached=5, maxcached=20, setsession=cmds, **conn_args)

def connection(self):
return self._pool.connection()


_db_manager = None


def create_db_manager(host, port, dbname, username, password):
global _db_manager
if _db_manager is None:
_db_manager = DbManager(host, port, dbname, username, password)
return _db_manager
 

参考 3 : 样例代码(采纳)
 

python使用dbutils的PooledDB连接池,操作数据库
1、使用dbutils的PooledDB连接池,操作数据库。

这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间

2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。

使用PooledDB解决。

# coding=utf-8
"""
使用DBUtils数据库连接池中的连接,操作数据库
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql


class MysqlClient(object):
__pool = None;

def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host='127.0.0.1', port=3306, db='test',
user='root', passwd='123456', charset='utf8mb4'):
"""

:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最大数量
:param maxshared:共享连接的最大数量
:param maxconnections:创建连接池的最大数量
:param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
:param maxusage:单个连接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码
"""

if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()

def __get_conn(self):
self._conn = self.__pool.connection();
self._cursor = self._conn.cursor();

def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print e

def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
print count
return count

@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime对象转成字符串,使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict

def select_one(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result

def select_many(self, sql, param=()):
"""
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result

def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count

def begin(self):
"""开启事务"""
self._conn.autocommit(0)

def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()


if __name__ == "__main__":
mc = MysqlClient()
sql1 = 'SELECT * FROM shiji WHERE id = 1'
result1 = mc.select_one(sql1)
print json.dumps(result1[1], ensure_ascii=False)

sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
param = (2, 3, 4)
print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
 

参考 4 : 样例代码
 

#!/usr/bin/env python
#_*_ coding:utf-8_*_

import tornado.ioloop
import tornado.web
import tornado.escape
import pymssql,pymysql
from DBUtils.PooledDB import PooledDB


class Database:
def __init__(self,*db):
if len(db) == 5:
#mysql
self.host = db[0]
self.port = db[1]
self.user = db[2]
self.pwd = db[3]
self.db = db[4]
else:
#mssql
self.host = db[0]
self.port = None
self.user = db[1]
self.pwd = db[2]
self.db = db[3]
self._CreatePool()

def _CreatePool(self):
if not self.db:
raise NameError + '没有设置数据库信息'
if (self.port == None):
self.Pool = PooledDB(creator=pymssql, mincached=2, maxcached=5, maxshared=3, maxconnections=6,
blocking=True, host=self.host, user=self.user,
password=self.pwd, database=self.db, charset="utf8")
else:
self.Pool = PooledDB(creator=pymysql, mincached=2, maxcached=5, maxshared=3, maxconnections=6,
blocking=True, host=self.host, port=self.port,
user=self.user, password=self.pwd, database=self.db, charset="utf8")

def _Getconnect(self):
self.conn = self.Pool.connection()
cur = self.conn.cursor()
if not cur:
raise "数据库连接不上"
else:
return cur
# 查询sql

def ExecQuery(self, sql):
cur = self._Getconnect()
cur.execute(sql)
relist = cur.fetchall()
cur.close()
self.conn.close()
return relist
# 非查询的sql

def ExecNoQuery(self, sql):
cur = self._Getconnect()
cur.execute(sql)
self.conn.commit()
cur.close()
self.conn.close()

gdbp = Database
class MainHadle(tornado.web.RequestHandler):
def get(self,*args):
filename = self.get_argument('filename')
print(filename)
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition', 'attachment; filename=%s'%filename.encode('utf-8'))
with open(filename,'rb') as f:
while True:
data = f.read(1024)
if not data:
break
self.write(data)

class MainIndex(tornado.web.RequestHandler):
def get(self):
self.write('Hello')

class CheckUser(tornado.web.RequestHandler):
def get(self):
user = self.get_argument('user')
pwd = self.get_argument('passwd')
#print(user)

if user != '' and pwd != '':
lssql = "select usr_code,password from sb_user where usr_code= '%s' " % user
#print(lssql)
rds = gdbp.ExecQuery(lssql)
if rds[0][1] == pwd :
js_str = tornado.escape.json_encode('{"result":"true","msg":""}')
self.write(js_str)
else:
js_str = tornado.escape.json_encode('{"result":"false","msg":"用户或密码错误"}')
self.write(js_str)
#print(rds[0][0])
else:
js_str = tornado.escape.json_encode('{"result":"false","msg":"参数错误"}')
self.write(js_str)




def make_app():
return tornado.web.Application([(r"/download",MainHadle),(r"/",MainIndex),(r"/checkuser",CheckUser)])

def make_dbpool():
global gdbp
gdbp = Database('172.20.1.2','sa','xxx','MPL')


if __name__ == '__main__':
app = make_app()
app.listen(8888)
make_dbpool()
tornado.ioloop.IOLoop.current().start()

————————————————
版权声明:本文为CSDN博主「Lucky@Dong」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zzddada/article/details/113771780

原文地址:https://www.cnblogs.com/liang715200/p/14787825.html