python连接sqlserver和MySQL实现增删改查

参考python核心编程

编写一个用户洗牌的脚本,根据用户输入,选择连接sqlserver或者MySQL,创建数据库、表,随机生成数据,并实现增删改查。

其中,为了兼容python2和python3,统一了打印函数、录入函数、动态导包等

一些字段声明为全局变量,这样我们在后续的学习中,可以引用这些变量和函数。

ushuffle.py:

#python 3.6
#统一输出函数,使用distutils.log.warn()函数代替版本2中的print语句和版本3中的print函数
from distutils.log import warn as printf
from random import randrange
#检测内置函数中是否包括raw_input(),如果存在说明版本是2(或者1),不存在说明是3。这样的话,无论是哪个版本,最终scanf函数表示了该功能
if isinstance(__builtins__,dict) and 'raw_input' in __builtins__:
    scanf = raw_input
elif hasattr(__builtins__,'raw_input'):
    scanf = raw_input
else:
    scanf = input

FIELDS = ('login','userid','projid')
COLSIZ = 10
RDBMSs = {'s':'sql server','m':'mysql'}
DBNAME = 'test'
DB_EXC = None
DBUSER = 'root'
DBSERVER = '127.0.0.1'
NAMELEN = 10

tformat = lambda s: str(s).title().ljust(COLSIZ)#str().title()将单词首字母大写;ljust()左对齐,并使用空格填充直至达到指定长度
cformat = lambda s: s.upper().ljust(COLSIZ) #

def setup():
    return RDBMSs[scanf('''
    请选择数据库类型:
    (Sql) Server
    (M)ySQL
    
    请键入:''').strip().lower()[0]]

def connect(db,DBNAME):
    global DB_EXC
    dbDir = '%s_%s' % (db,DBNAME)
    if db == 'mysql':
        try:
            #python3.0以前的版本
            import MySQLdb
            import _mysql_exceptions as DB_EXC
            try:
                cxn = MySQLdb.connect(user = DBUSER,password = 'Jwxjs123456',db=DBNAME)
            except DB_EXC.OperationalError:
                try:
                    cxn = MySQLdb.connect(user = DBUSER,password = 'Jwxjs123456',db=DBNAME)
                    cxn.query('create database %s' % DBNAME)
                    cxn.commit()
                    cxn.close()
                    cxn = MySQLdb.connect(db=DBNAME)
                except DB_EXC.OperationalError:
                    return None
        except ModuleNotFoundError: #ImportError
            try:
                #python3.0以上的版本
                import pymysql
                import pymysql.err as DB_EXC
                try:
                    cxn = pymysql.connect(**{'host':DBSERVER,'database':DBNAME,'user':DBUSER,'password':'Jwxjs123456'})
                except DB_EXC.InternalError:
                    #连接成功但是没有指定的数据库
                    cxn = pymysql.connect(**{'host':DBSERVER,'user':DBUSER,'password':'Jwxjs123456'})
                    cur = cxn.cursor()
                    cur.execute('create database %s;' % DBNAME)
                    cxn.commit()
                    cxn = pymysql.connect(**{'host':DBSERVER,'database':DBNAME,'user':DBUSER,'password':'Jwxjs123456'})
            except ImportError:
                return None
    elif db == 'sql server':
        import pymssql
        import _mssql
        #import pymssql.StandardError as DB_EXC
        try:
            cxn = pymssql.connect(**{'server':DBSERVER,'database':DBNAME,'password':'Jwxjs123456','user':'sa'})
            printf('     成功链接数据库%s' % DBNAME)
        except BaseException:
            #这里一定要设置为自动提交模式,否则创建数据库会失败
            cxn = pymssql.connect(**{'server':DBSERVER,'password':'Jwxjs123456','user':'sa','autocommit':True})
            
            cursor = cxn.cursor()
            cxn.commit()
            cursor.execute('create database %s;' % DBNAME)
            #cxn.close()
            cxn = pymssql.connect(**{'server':DBSERVER,'database':DBNAME,'password':'Jwxjs123456','user':'sa','autocommit':True})
            printf('数据库%s 不存在,新建该数据库' % DBNAME)
    return cxn

def create(cur):
    try:
        cur.execute('''
            create table users(login varchar(%s),
                userid int,
                projid int
            )
        ''' % NAMELEN)
        printf('……………新建users表成功')
    except BaseException:
        drop(cur)
        create(cur)
        printf('       已存在表users,删除后并新建此表……………………')
        
drop = lambda cur:cur.execute('drop table users')

NAMES =(
    ('arron',8312),('angle',7603),('dane',7306),
    ('jess',7912),('jim',7512),('larry',7311),
)

def randName():
    pick = set(NAMES)#内置函数set()创建一个无序不重复元素集,可进行关系测试,删除重复数据,还可以计算交集、差集、并集等
    while pick:
        yield pick.pop()

def insert(cur,db):
    cur.executemany('insert into users values(%s,%s,%s)',[(who,uid,randrange(1,5)) for who,uid in randName()])

#返回最后一次操作影响的行数,这里考虑右边对象是不是支持该属性,不支持的话则返回-1
getRC = lambda cur:cur.rowcount if hasattr(cur,'rowcount') else -1
    
    
def update(cur):
    fr = randrange(1,5)
    to = randrange(1,5)
    cur.execute('update users set projid = %s where projid = %s' % (to,fr))
    return fr,to,getRC(cur)
    
def dbDump(cur):
    cur.execute('select * from users')
    printf('
%s' % ''.join(map(cformat,FIELDS)))
    for data in cur.fetchall():
        printf(''.join(map(tformat,data)))#join()将序列中的元素以指定的字符连接生成一个新的字符串。

def delete(cur):
    rm = randrange(1,5)
    cur.execute('delete from users where projid = %s' % rm)
    return rm,getRC(cur)

drop = lambda cur:cur.execute('drop table users')
    
    
def main():
    #用户选择数据库类型
    db = setup()
    printf('
****将要连接到 %s(%s) 数据库****' % (DBNAME,db))
    #创建数据库连接
    cxn = connect(db,DBNAME)
    if not cxn:
        printf('
     连接数据库%s(%s) 失败,程序退出!!!' % (DBNAME,db))
        return
    else:
        printf('
    成功连接数据库%s(%s)    ' % (DBNAME,db))
    cur = cxn.cursor()
    printf('
      创建 users表……')
    create(cur)
    
    printf('
*****初始化users表******')
    insert(cur,db)
    dbDump(cur)
    
    printf('
 ***随机更改projid****')
    fr,to,num = update(cur)
    printf('	(%s 个用户被更改了) from (%s) to(%s)' % (num,fr,to))
    dbDump(cur)
    
    printf('
***随机删除某一组数据***')
    rm,num = delete(cur)
    printf('	(组%s中共%s个用户被删除了)' % (rm,num))
    dbDump(cur)
    
    printf('
*****删除users表*******')
    drop(cur)

    if db == 'mysql':
        cxn.commit() #sqlserver 的链接已经被设置为自动提交
    printf('
    关闭数据库连接     ')
    cxn.close()
    
if __name__ == '__main__':
    main()

运行结果如下:

原文地址:https://www.cnblogs.com/hiwuchong/p/9096542.html