【模块】:PyMySQL

PyMySQL

1、安装

[root@localhost ~]# pip install PyMySQL

2、初识

创建数据表结构:

mysql> CREATE TABLE `users` (
    ->     `id` int(11) NOT NULL AUTO_INCREMENT,
    ->     `email` varchar(255) COLLATE utf8_bin NOT NULL,
    ->     `password` varchar(255) COLLATE utf8_bin NOT NULL,
    ->     PRIMARY KEY (`id`)
    -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
    -> AUTO_INCREMENT=1 ;
Query OK, 0 rows affected (0.07 sec)

使用示例:

import pymysql.cursors

# Connect to the database
connection = pymysql.connect(host='192.168.1.134',
                             port=3306,
                             user='remote',
                             password='tx_1234abc',
                             db='Jefrey',
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor)

try:
    with connection.cursor() as cursor:   #增加
        # Create a new record
        sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
        cursor.execute(sql, ('webmaster@python.org', 'very-secret'))

    # connection is not autocommit by default. So you must commit to save your changes.
    connection.commit()   # 提交,不然数据库不生效

    with connection.cursor() as cursor:   #查询
        # Read a single record
        sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
        cursor.execute(sql, ('webmaster@python.org',))
        result = cursor.fetchone()
        print(result)
finally:
    connection.close()

# {'id':1, 'password': 'very-secret'}

3、connection对象

Representation of a socket with a mysql server.        与mysql建立socket

The proper way to get an instance of this class is to call connect().

Establish a connection to the MySQL database. Accepts several arguments:

参数:

class pymysql.connections.Connection(host=None, user=None, password='', database=None, port=0, unix_socket=None, charset='', sql_mode=None, read_default_file=None, conv=None, use_unicode=None, client_flag=0, cursorclass=<class 'pymysql.cursors.Cursor'>, init_command=None, connect_timeout=10, ssl=None, read_default_group=None, compress=None, named_pipe=None, no_delay=None, autocommit=False, db=None, passwd=None, local_infile=False, max_allowed_packet=16777216, defer_connect=False, auth_plugin_map={}, read_timeout=None, write_timeout=None, bind_address=None) 

host – Host where the database server is located
•user – Username to log in as
•password – Password to use.
•database – Database to use, None to not use a particular one.
•port – MySQL port to use, default is usually OK. (default: 3306)
•bind_address – When the client has multiple network interfaces, specify the interface from which to connect to the host. Argument can be a hostname or an IP address.
•unix_socket – Optionally, you can use a unix socket rather than TCP/IP.
•charset – Charset you want to use.
•sql_mode – Default SQL_MODE to use.
•read_default_file – Specifies my.cnf file to read these parameters from under the [client] section.
•conv – Conversion dictionary to use instead of the default one. This is used to provide custom marshalling and unmarshaling of types. See converters.
•use_unicode – Whether or not to default to unicode strings. This option defaults to true for Py3k.
•client_flag – Custom flags to send to MySQL. Find potential values in constants.CLIENT.
•cursorclass – Custom cursor class to use.
•init_command – Initial SQL statement to run when connection is established.
•connect_timeout – Timeout before throwing an exception when connecting. (default: 10, min: 1, max: 31536000)
•ssl – A dict of arguments similar to mysql_ssl_set()’s parameters. For now the capath and cipher arguments are not supported.
•read_default_group – Group to read from in the configuration file.
•compress – Not supported
•named_pipe – Not supported
•autocommit – Autocommit mode. None means use server default. (default: False)
•local_infile – Boolean to enable the use of LOAD DATA LOCAL command. (default: False)
•max_allowed_packet – Max size of packet sent to server in bytes. (default: 16MB) Only used to limit size of “LOAD LOCAL INFILE” data packet smaller than default (16KB).
•defer_connect – Don’t explicitly connect on contruction - wait for connect call. (default: False)
•auth_plugin_map – A dict of plugin names to a class that processes that plugin. The class will take the Connection object as the argument to the constructor. The class needs an authenticate method taking an authentication packet as an argument. For the dialog plugin, a prompt(echo, prompt) method can be used (if no authenticate method) for returning a string from the user. (experimental)
•db – Alias for database. (for compatibility to MySQLdb)
•passwd – Alias for password. (for compatibility to MySQLdb)
Parameters

 方法:

autocommit_mode= None 
  specified autocommit mode. None means use server default.
begin() 
  Begin transaction.
close() 
  Send the quit message and close the socket
commit() 
  Commit changes to stable storage
cursor(cursor=None) 
  Create a new cursor to execute queries with
ping(reconnect=True) 
  Check if the server is alive
rollback() 
  Roll back the current transaction
select_db(db) 
  Set current db
show_warnings() 
  SHOW WARNINGS

 

4、cursor对象

class pymysql.cursors.Cursor(connection)

This is the object you use to interact with the database.   cursor对象用于数据库交互

Do not create an instance of a Cursor yourself. Call connections.Connection.cursor().  不要自己用Cursor生成实例,使用Connection.cursor()

方法:

callproc(procname, args=()) 
Execute stored procedure procname with args
procname – string, name of procedure to execute on server
args – Sequence of parameters to use with procedure
Returns the original args.

close()     关闭
    Closing a cursor just exhausts all remaining data.

execute(query, args=None)  #执行sql语句
    Execute a query  

    Parameters:
        •query (str) – Query to execute.  
        •args (tuple, list or dict) – parameters used with query. (optional)
 
    Returns:    Number of affected rows   #返回查询到的个数
    Return type:    int
    If args is a list or tuple, %s can be used as a placeholder in the query. If args is a dict, %(name)s can be used as a placeholder in the query.

executemany(query, args) 
    Run several data against one query

    Parameters:
        •query – query to execute on server
        •args – Sequence of sequences or mappings. It is used as parameter.
 
    Returns:    Number of rows affected, if any.
    This method improves performance on multiple-row INSERT and REPLACE. Otherwise it is equivalent to looping over args with execute().

fetchall() 
    Fetch all the rows

fetchmany(size=None) 
    Fetch several rows

fetchone() 
    Fetch the next row

max_stmt_length= 1024000 
    Max statement size which executemany() generates.
    Max size of allowed statement is max_allowed_packet -       packet_header_size. Default value of max_allowed_packet is 1048576.

mogrify(query, args=None) 
    Returns the exact string that is sent to the database by calling the     execute() method.
    This method follows the extension to the DB API 2.0 followed by Psycopg.

setinputsizes(*args) 
    Does nothing, required by DB API.
setoutputsizes(*args) Does nothing, required by DB API.

其他

class pymysql.cursors.SSCursor(connection) 
Unbuffered Cursor, mainly useful for queries that return a lot of data, or for connections to remote servers over a slow network.

Instead of copying every row of data into a buffer, this will fetch rows as needed. The upside of this is the client uses much less memory, and rows are returned much faster when traveling over a slow network or if the result set is very big.

There are limitations, though. The MySQL protocol doesn’t support returning the total number of rows, so the only way to tell how many rows there are is to iterate over every row returned. Also, it currently isn’t possible to scroll backwards, as only the current row is held in memory.
fetchall() 
Fetch all, as per MySQLdb. Pretty useless for large queries, as it is buffered. See fetchall_unbuffered(), if you want an unbuffered generator version of this method.
fetchall_unbuffered() 
Fetch all, implemented as a generator, which isn’t to standard, however, it doesn’t make sense to return everything in a list, as that would use ridiculous memory for large result sets.
fetchmany(size=None) 
Fetch many
fetchone() 
Fetch next row
read_next() 
Read next row

class pymysql.cursors.DictCursor(connection) 
A cursor which returns results as a dictionary

class pymysql.cursors.SSDictCursor(connection) 
An unbuffered cursor, which returns results as a dictionary
cursor其他对象

5、execute与executemany使用及性能对比

 execute 执行单条sql语句

源码:

def execute(self, query, args=None):
    """Execute a query
    """
    while self.nextset():
        pass

    query = self.mogrify(query,
                         args)  # sql语句拼接,加上引号 select * from users WHERE email='webmaster@python.org' and password='very-secret' 

    result = self._query(query)  # 最终执行connection的query方法,统计匹配的rows返回
    self._executed = query  # 设置执行过的语句为query,查询的方法会用到
    return result

示例:

# cursor
try:
    with connection.cursor() as cursor:
        # Create a new record
        sql = " select * from users WHERE email=%s and password=%s "
        rows_count = cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
        print(rows_count)
        result = cursor.fetchone()
        print(result)
finally:
   connection.close()

# 12   查不到时为 0
# {'id': 4, 'email': 'webmaster@python.org', 'password': 'very-secret'}

②  executemany 执行多条语句

源码:

def executemany(self, query, args):
    # type: (str, list) -> int
    """Run several data against one query """
    if not args:
        return

    m = RE_INSERT_VALUES.match(query)  # 正则匹配,暂忽略
    if m:
        q_prefix = m.group(1) % ()
        q_values = m.group(2).rstrip()
        q_postfix = m.group(3) or ''
        assert q_values[0] == '(' and q_values[-1] == ')'
        return self._do_execute_many(q_prefix, q_values, q_postfix, args,
                                     self.max_stmt_length,
                                     self._get_db().encoding)

    self.rowcount = sum(self.execute(query, arg) for arg in args)  # 重点!还是循环执行execute方法
    return self.rowcount= 1

示例:

# executemany
try:
    with connection.cursor() as cursor:
        # Create a new record
        sql = " select * from users WHERE email=%s and password=%s "
        rows_count = cursor.executemany(sql, [('webmaster@python.org', 'very-secret'),('webmaster@python.org', 'test')])
        print(rows_count)
        result = cursor.fetchone()   # 查到的是后一条语句的第一条,一会儿剖析fitchone方法 fitchall会打印两次查询的所有
        print(result)

finally:
   connection.close()

# 26
# {'id': 16, 'email': 'webmaster@python.org', 'password': 'test'}

数据库各插入10000条数据,打印下执行时间 

两者都共用了一个方法,感觉应该不会差很多(打脸)

# 性能对比 环境python3.6
try:
    # execute方法
    with connection.cursor() as cursor:
        # Create a new record
        start = time.time()
        sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
        for i in range(10000):
            rows_count = cursor.execute(sql,('webmaster@python.org','test'+str(i)))
        connection.commit()
        print('execut insert 10000 rows cost %s',time.time()-start)

    # executemany方法
    with connection.cursor() as cursor:
        # Create a new record
        start = time.time()
        sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
        # for i in range(10000):
        rows_count = cursor.executemany(sql, (('webmaster@python.org', 'test' + str(i)) for i in range(10000,20000)))
        connection.commit()
        print('executmany insert 10000 rows cost %s', time.time() - start)

    # executemany换成列表
    with connection.cursor() as cursor:
        # Create a new record
        start = time.time()
        sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
        li = []
        for i in range(20000,30000):
            li.append(('webmaster@python.org', 'test' + str(i)))
        rows_count = cursor.executemany(sql, li)
        connection.commit()
        print('executmany insert 10000 rows cost %s', time.time() - start)

finally:
   connection.close()

# execut insert 10000 rows cost %s 5.356306791305542
# executmany insert 10000 rows cost %s 0.09076881408691406
# executmany insert 10000 rows cost %s 0.08979249000549316

注:什么情况!!速度差了50多倍,executmany速度感人呀

6、fetchone、fetchmany、fetchall剖析

fetchone 查询一条数据,为空时返回None

源码:

def fetchone(self):
    """Fetch the next row"""
    self._check_executed()       # 检查是否有sql语句执行过
    if self._rows is None or self.rownumber >= len(self._rows):
        return None      # _rows所有匹配项的集合,列表形式
    result = self._rows[self.rownumber]   # 返回第rownumber项数据
    self.rownumber += 1
    return result

示例:

# fetchone方法
try:
    with connection.cursor() as cursor:   #查询
        # Read a single record
        sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s AND `password`=%s"
        cursor.execute(sql, ('webmaster@python.org','test1'))
        result = cursor.fetchone()
        print(result)
        result = cursor.fetchone()
        print(result)
finally:
    connection.close()

# {'id': 31, 'password': 'test1'}
# {'id': 10031, 'password': 'test1'}

 fetchmany 查询指定条数数据,为空时返回()

源码:

def fetchmany(self, size=None):
    """Fetch several rows"""
    self._check_executed()
    if self._rows is None:
        return ()
    end = self.rownumber + (size or self.arraysize)
    result = self._rows[self.rownumber:end]
    self.rownumber = min(end, len(self._rows))
    return result

示例:

# fetchmany方法
try:
    with connection.cursor() as cursor:   #查询
        # Read a single record
        sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s AND `password`=%s"
        cursor.execute(sql, ('webmaster@python.org','test1'))
        result = cursor.fetchmany(2)
        print(result)
        result = cursor.fetchmany(2)
        print(result)
finally:
    connection.close()

# [{'id': 31, 'password': 'test1'}, {'id': 10031, 'password': 'test1'}]
# [{'id': 20031, 'password': 'test1'}, {'id': 30031, 'password': 'test1'}]

fetchall 查询所有匹配数据,为空时返回()

源码:

def fetchall(self):
    """Fetch all the rows"""
    self._check_executed()
    if self._rows is None:
        return ()
    if self.rownumber:
        result = self._rows[self.rownumber:]
    else:
        result = self._rows
    self.rownumber = len(self._rows)
    return result

示例:

# fetchall方法
try:
    with connection.cursor() as cursor:   #查询
        # Read a single record
        sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s AND `password`=%s"
        cursor.execute(sql, ('webmaster@python.org','test10000'))
        result = cursor.fetchall()
        print(result)
        result = cursor.fetchall()
        print(result)
finally:
    connection.close()

# [{'id': 70030, 'password': 'test10000'}, {'id': 120030, 'password': 'test10000'}, {'id': 150030, 'password': 'test10000'}]
# []

分析:三种方式,都是先获取到执行sql语句的所有结果,最后进行切片获取,所以三种方式的性能是类似的

总结:增、删、改、查的使用不再赘述,传入相应的sql语句即可得到想要的结果

完整的模块

@util.singleton
class PbxDB(object):
    '''
    数据库操作
    '''
    def __init__(self):
        self.mutex = threading.Lock()
        self.conn = None
        self.DB_CONFIG = PBX_DB_CONFIG

    def connect(self):
        '''
        连接数据库
        :return:
        '''
        try:
            self.conn = pymysql.connect(host=self.DB_CONFIG['host'],
                                        port=self.DB_CONFIG['port'],
                                        db=self.DB_CONFIG['db'],
                                        user=self.DB_CONFIG['user'],
                                        password=self.DB_CONFIG['passwd'],
                                        cursorclass=pymysql.cursors.DictCursor,             # 数据字典格式
                                        charset='utf8')
        except Exception as e:
            exe = traceback.format_exc()
            logging.error(exe)

    def reconnect(self):
        '''
        重新连接
        :return:
        '''
        try:
            self.conn.close()
        except Exception as e:
            pass
        finally:
            self.conn = None
            self.connect()

    def find(self, sql, *args):
        '''
        查询所有匹配项
        :param sql:
        :param args:
        :return:    list[{dict},{dict}] or tuple(空)
        '''
        database = []
        count = 0
        while count < 3:
            try:
                self.mutex.acquire()
                if not self.conn:
                    self.connect()
                with self.conn.cursor() as cursor:
                    cursor.execute(sql, args)
                    database = cursor.fetchall()
                    self.conn.commit()
                break
            except OperationalError as e:
                self.reconnect()
                count += 1
            except InterfaceError as  e:
                self.reconnect()
                count += 1
            except Exception as e:
                logging.error('error[%s],sql[%s],*args[%s]', e, sql, args)
                break
            finally:
                self.mutex.release()
        return database

    def query(self, sql, *args):
        '''
        查询第一个匹配项
        :param sql:
        :param args:
        :return:  dict{} or None
        '''
        database = None
        count = 0
        while count < 3:
            try:
                self.mutex.acquire()
                if not self.conn:
                    self.connect()
                with self.conn.cursor() as cursor:
                    cursor.execute(sql, args)
                    database = cursor.fetchone()
                    self.conn.commit()
                break
            except OperationalError as e:
                self.reconnect()
                count += 1
            except InterfaceError as  e:
                self.reconnect()
                count += 1
            except Exception as e:
                logging.error('error[%s],sql[%s],*args[%s]', e, sql, args)
                break
            finally:
                self.mutex.release()
        return database

    def __getattr__(self, item):
        if item in ('update','insert','delete'):
            setattr(self,item,self.__execute)
            return getattr(self,item)

    def __execute(self, sql, *args):
        '''
        执行update、insert、delete操作
        :param sql:
        :param args:
        :return:
        '''
        count = 0
        while count < 3:
            try:
                self.mutex.acquire()
                if not self.conn:
                    self.connect()
                with self.conn.cursor() as cursor:
                    cursor.execute(sql, args)
                    self.conn.commit()
                break
            except OperationalError as e:
                self.reconnect()
                count += 1
            except InterfaceError as  e:
                self.reconnect()
                count += 1
            except Exception as e:
                logging.error('error[%s],sql[%s],*args[%s]', e, sql, args)
                break
            finally:
                self.mutex.release()
        return

  

原文地址:https://www.cnblogs.com/lianzhilei/p/7267648.html