python mymsql sqlalchemy

python 操作数据库有两种方法。

1. pymysql

方法1. pymysql 模块
	import pymysql
	db = pymysql.connect(user = 'root',password='password',host = '127.0.0.1',database='tmpdb')
	with db.cursor() as cursor:
		cursor.execue(sql)  # 具体的sql语句
		db.commit() # 涉及到增删改的时候需要执行
	db.close()

2. 调用 sqlalchemy 模块。 数据库的增删改查--> 简书权威指南sqlalchemysqlalchemy指南sqlalchemy底层示意图

数据库表是个二维表,包含多行多列,每一行其实可以认为是个object, 以下代码就是传说中的ORM技术:Object-Relational Mapping,把关系数据库的表结构映射到对象上,首先通过easy_install或者pip安装SQLAlchemy:

class User(object):
    def __init__(self, id, name):
        self.id = id
        self.name = name

[
    User('1', 'Michael'),
    User('2', 'Bob'),
    User('3', 'Adam')
]
from sqlalchemy import create_engine, Column, String, Integer, DateTime, Float #  sqlalchemy 中数据类型
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy.orm import sessionmaker # sessionmaker生成的是数据库会话类,这个类的实例session可用于操作数据库 

from deal.conf import MYSQL


engine = create_engine("mysql+pymysql://{}:{}@{}/{}".format(MYSQL.user, MYSQL.pw, MYSQL.host, MYSQL.db), echo=False, pool_recycle=60*60*7)
#    echo参数为True时,会显示每条执行的SQL语句,可以关闭,create_engine()返回一个Engine的实例,并且它表示通过数据库语法处理细节的核心接口,在这种情况下,数据库语法将会被解释称Python的类方法。
#    账号 密码 host  database # pool_recycle 是闲置连接自动断开的时间  https://blog.csdn.net/u013673976/article/details/45939297

Session = sessionmaker(bind=engine)

Base = declarative_base() # 在Base  基类基础上创建 新的class

'''
在使用ORM技术时,
	1. 构造进程首先描述数据的表,
	2.定义我们用来映射那些表的类。

	其中12一般一起执行,通过使用Declarative方法,我们可以创建一些包含描述要被映射的实际数据库表的准则的映射类。使用Declarative方法定义的映射类依据一个基类,这个基类是维系类和数据表关系的目录——我们所说的Declarative base class。在一个普通的模块入口中,应用通常只需要有一个base的实例。我们通过declarative_base()功能创建一个基类:
	from sqlalchemy.ext.declarative import declarative_base
	Base = declarative_base()
'''

class RecordFile(Base):

    __tablename__ = "sr_recordfile"
    
    id = Column(Integer, primary_key=True)
    person_type = Column(Integer)
    personid = Column(String(20))
    remotefileid = Column(String)
    update_isblack_time = Column(DateTime)
    dayoffile = Column(String)
    process_state_ctime = Column(DateTime)

class RecordFileDia(Base):

    __tablename__ = "sr_recordfile_dia"
    
    id = Column(Integer, primary_key=True)
    pass 同上个类


class AlarmScores(Base):

    __tablename__ = "alarm_scores"

    id = Column(Integer, primary_key=True)
    insert_time = Column(DateTime)
    combined_id = Column(String)

    test_spk2utt = Column(String)
    enroll_spk2utt = Column(String)
    test_telnumber = Column(String)
    enroll_telnumber = Column(String)
    score = Column(Float)
    dayofidx = Column(String)

    

前面是用sqlalchemy定义了连接,然后接下来是最直接的操作了。其中contextmanager使用方法---> blog

"""
Most query in this module is base on the assumption:
    `filename` is unique.
Thus, filename is used as condition to fetch target record/records.
"""

from contextlib import contextmanager
from functools import wraps
from os import path

from pymysql import err
from sqlalchemy import exc

from .tasktype import TaskType
from .models import Session, RecordFile as RF, AlarmScores, RecordFileDia, RecordFileDia as RFD, MultiTel
from .errors import AudioRecordNotFound, NoMatchedInfoFoundInDia

def try_again(fn):
    """
    For MySQL connection error or operational error, try again
    From my experience, if being used properly, there is little 
    chance for raising exception.
    """
    @wraps(fn)   # https://blog.csdn.net/yuyexiaohan/article/details/82860807
    def inner(*args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except (exc.OperationalError, exc.InternalError, err.InterfaceError, err.OperationalError,err.InternalError) as e:
            print("We met a unexpected Error concerning mysql connection! Trying again...")
            return fn(*args, **kwargs)
    return inner

'''
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(User).filter(User.id=='5').one()

'''

@contextmanager
def session_scope():
    s = Session()
    try:
        yield s #  返回 s
        s.commit()
    except:
        s.rollback()
        raise
    finally:
        s.close()


@try_again
def select_all(task_type: TaskType):  #  python3  有object:class 的 传参提醒功能。
    """
    Currently only avaiable for fetching AM and CU
    """
    with session_scope() as s:
        query = s.query(RF.fileid, RF.personid, RF.filename).filter(RF.process_state==0).filter(RF.fileid != "")
        if task_type == TaskType.am:
            query = query.filter(RF.person_type==2).filter(RF.checkflag==1)
        elif task_type == TaskType.cu:
            query = query.filter(RF.person_type==4)
        else:
            raise TypeError("task type: %r is not supported here" % task_type)
        
        d = {"{}-{}".format(q.personid, q.filename) : q.fileid for q in query}
        return d


@try_again
def select_black_list(task_type: TaskType):
    with session_scope() as s:
        if task_type == TaskType.fq:
            query = s.query(RFD.telnumber, RFD.filename, RFD.fileid).filter(RFD.process_state==0).filter(RFD.person_type==1).filter(RFD.isblack==1)
            return ['{}-{}'.format(q.telnumber, q.fileid) for q in query]
        else:
            msg = "person_type of {} has not yet declared with blacklist".format(task_type)
            raise ValueError(msg)



@try_again
def fileid2utt(fileid, task_type):  # 根据fileid 查询 tel-*.wav, 用到了first 
    with session_scope() as s:
        if task_type == TaskType.fq:
            query = s.query(RFD.telnumber, RFD.filename).filter(RFD.fileid==fileid).first()
            if query is not None:
                return "{}-{}".format(query.telnumber, query.filename)
        elif task_type == TaskType.am or task_type == TaskType.cu:
            query = s.query(RF.personid, RF.filename).filter(RF.fileid==fileid).first()
            if query is not None:
                return '{}-{}'.format(query.personid, query.filename)
        else:
            raise TypeError("task type error")
        
        if query is None:
            msg = "fileid `{}` does not exist in db `sr_recordfile`".format(fileid)
            raise ValueError(msg)


@try_again  
def update_state(filename, state, dealfiletodir=None): # update 
    with session_scope() as s:
        query = s.query(RF).filter(RF.filename==filename).first()
        if query:
            query.process_state = state
            if dealfiletodir:
                query.dealfiletodir = dealfiletodir
            return True
        else:
            msg = "There is no such record whose filename is `{}`".format(filename)
            raise AudioRecordNotFound(msg)


@try_again
def update_by_filename(filename, name, value): #  首先根据filename 查询到某条记录,然后 将对应的name 设定为对应的value
    with session_scope() as s:
        query = s.query(RF).filter(RF.filename==filename).first()
        if query:
            setattr(query, name, value) # http://www.runoob.com/python/python-func-setattr.html  python 自带 函数 setattr()
            return True
        else:
            msg = "There is no such record whose filename is `{}`".format(filename)
            raise AudioRecordNotFound(msg)
            

@try_again
def find_unscored(task_type: TaskType):  # 查询到未打分的数据,
    """
    NOTICE: except for find unscored records, here we also mark 
    these records as being scored by setting `scoredflag` equal to -1.

    Return of this fuction can be an empty list, thus the caller should
    determine if return value is empty before further processing.
    """
    with session_scope() as s:
        if task_type == TaskType.fq:
            query = s.query(RF).filter(RF.person_type==1).filter(RF.registerflag==1).filter(RF.scoredflag==0).filter(RF.process_state==0)
            rv = []
            for q in query:
                q.scoredflag = -1
                rv.append((q.telnumber, q.filename))
            return rv
            # return [(q.telnumber, q.filename) for q in query]
        elif task_type == TaskType.am or task_type == TaskType.cu:
            query = s.query(RF).filter(RF.person_type==task_type.person_type).filter(RF.registerflag==1).filter(RF.scoredflag==0).filter(RF.process_state==0)
            rv = []
            for q in query:
                q.scoredflag = -1
                rv.append((q.personid, q.filename))
            return rv
            # return [(q.personid, q.filename) for q in query]
        else:
            raise TypeError("Unsupported task type %r" % task_type)


@try_again
def find_fq_tasks():
    with session_scope() as s:
        query = s.query(RF.downloadtodir, RF.filename, RF.telnumber) .filter(RF.person_type==1).filter(RF.process_state==10).filter(RF.downloadtodir != "").filter(RF.downloaded==1)
        return [(q.downloadtodir, q.filename, q.telnumber) for q in query]


@try_again
def update_pushed_flag(remote_fileid, value):
    with session_scope() as s:
        query = s.query(RF).filter(RF.remotefileid==remote_fileid).first()
        if query:
            query.pushedflag = value  # 更新数据
        else:
            msg = "There is no such record whose remote_fileid is `{}`".format(remote_fileid)
            raise AudioRecordNotFound(msg)


@try_again
def insert_into_alarm(items):  # items 是个list, 每个item 是一行数据 dict形式
    items = [AlarmScores(**i) for i in items]
    with session_scope() as s:
        s.bulk_save_objects(items)   # https://www.jianshu.com/p/87ac06124fe1    bulk_save_objects 批量插入数据


required_field_str = """person_type, personid, num, callid, caller,
called, telnumber, recordtime, skilltime, piecesid,等众多属性"""
required_fields = [i.strip() for i in required_field_str.split(',')]  #众多属性

# `Field` needs to be modified: fileid, fileid_parent, process_state 
    
@try_again
def fetch_from_old_tab(filename):
    with session_scope() as s:
        q = s.query(RF).filter(RF.filename==filename).first()
        if q:
            rv = {}
            for key in required_fields:
                rv[key] = getattr(q, key)
            return rv  # 获取道数据的众多属性
        else:
            msg = "There is no such record whose filename is `{}`".format(filename)
            raise AudioRecordNotFound(msg)

@try_again
def insert_into_recordfile_dia(rfd_list): # 将各种数据写入到dia 表中
    items = [RecordFileDia(**i) for i in rfd_list]
    # https://blog.csdn.net/dongyouyuan/article/details/79236673 批量插入
    with session_scope() as s:
        s.bulk_save_objects(items)  # 同 insert_into_alarm  list  中多个item  每个item 包含多个dict


@try_again
def fullfill_alarm_score_by_filename(filename):
    with session_scope() as s:
        query = s.query(RFD.person_type, RFD.fileid, RFD.telnumber).filter(RFD.filename==filename).first()
        if query:
            return dict(person_type=query.person_type,fileid=query.fileid,telnumber=query.telnumber)
        else:
            msg = 'In `sr_recordfile_dia`, no such record with filename as: `{}`'.format(filename)
            raise NoMatchedInfoFoundInDia(msg)


@try_again
def fetch_info_by_fileid(fileid):
    with session_scope() as s:
        query = s.query(RFD.person_type, RFD.filename,RFD.telnumber, RFD.piecesid, RFD.caller) .filter(RFD.fileid==fileid).first()
        if query:
            return dict(person_type=query.person_type,filename=query.filename,telnumber=query.telnumber,piecesid=query.piecesid,caller=query.caller)
        else:
            msg = 'In `sr_recordfile_dia`, no such record with fileid as: `{}`'.format(fileid)
            raise NoMatchedInfoFoundInDia(msg)


@try_again
def fetch_info_by_filename(filename):
    with session_scope() as s:
        query = s.query(RFD.telnumber, RFD.fileid).filter(RFD.filename==filename).first()
        if query:
            return dict(fileid=query.fileid,telnumber=query.telnumber)
        else:
            msg = 'In `sr_recordfile_dia`, no such record with filename as: `{}`'.format(filename)
            raise NoMatchedInfoFoundInDia(msg)


@try_again
def fetch_fp_by_fileid(fileid):
    """
    fetch `dealfiletodir` from `sr_recordfile_dia` by `fileid`
    NOTE: also return telnumber
    """
    with session_scope() as s:
        query = s.query(RFD.dealfiletodir, RFD.filename, RFD.telnumber).filter(RFD.fileid==fileid).first()
        if query:
            return path.join(query.dealfiletodir, query.filename), query.telnumber
        else:
            raise ValueError("fileid: %s does not exist" % fileid)


def get_number_pairs():
    """Fetch all number pairs that belongs to a same person"""
    with session_scope() as s:
        query = s.query(MultiTel.numbera, MultiTel.numberb).all() # 直接获取全部,
        if query:
            return [(q.numbera, q.numberb) for q in query]
        else:
            return 


########## For testing ###################
def get_by_filename(filename, name):
    with session_scope() as s:
        query = s.query(RF).filter(RF.filename==filename).first()
        if query:
            return getattr(query, name)
        else:
            raise AudioRecordNotFound
                    
关注公众号 海量干货等你
原文地址:https://www.cnblogs.com/sowhat1412/p/12734309.html