MySQL4-SQLAlchemy框架实现

参考         http://www.cnblogs.com/wupeiqi/articles/8259356.html

概述

作用

1,提供简单的规则
2,自动将类转换成SQL语句并执行

两种设计模型

DBfirst:手动创建数据库以及表 --> ORM框架 --> 自动生成表
CODEfirst:手动创建类、数据库 --> ORM框架 --> 自动生成表  --->  SQLAlchemy

连接数据库

conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"
ngine = create_engine(conn, max_overflow=5)    # 连接mysql

表操作

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

Base = declarative_base()


class Type(Base):
    __tablename__ = 'type'
    tid = Column(Integer, primary_key=True, autoincrement=True)
    tname = Column(String(32))


class User(Base):
    __tablename__ = 'users'                                     # 表名
    id = Column(Integer, primary_key=True)                      # 主键
    name = Column(String(32))
    email = Column(String(32))
    type_id = Column(Integer, ForeignKey('type.tid'))           # 外键

    # __table_args__ = (                                        # 联合
    #     UniqueConstraint('id', 'name', name='uix_id_name'),   # 联合唯一
    #
    #     Index('ix_id_name', 'name', 'extra')                  # 联合索引
    # )


conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"


def create_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表 --> 全建表


def drop_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.drop_all(engine)                  # 找到继承了Base的所有的类,删除表 --> 全删表


create_table(conn)
SQLAlchemy操作表.py
autuincrement = True    # 自增
nullable = True            # 不为空
default = '  '            # 默认值
index = True            # 索引
unique = True            # 唯一索引
Column可传参数

数据行操作

增删改查

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

Base = declarative_base()


class Type(Base):
    __tablename__ = 'type'
    tid = Column(Integer, primary_key=True, autoincrement=True)
    tname = Column(String(32))


class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    email = Column(String(32))
    type_id = Column(Integer, ForeignKey('type.tid'))


conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"


def create_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表


def drop_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.drop_all(engine)                # 找到继承了Base的所有的类,删除表


# create_table(conn)

engine = create_engine(conn, max_overflow=5)
Session = sessionmaker(bind=engine)
session = Session()     # 获取一个连接

"""增加一条用户"""
# obj1 = Type(tname='普通用户')
# session.add(obj1)

"""增加多条用户"""
# objs = [
#     Type(tname='超级用户'),
#     Type(tname='白金用户'),
#     Type(tname='黑金用户'),
# ]
# session.add_all(objs)

"""查全部"""
# type_list = session.query(Type).all()
# for row in type_list:
#     print(row.tid, row.tname)

"""查符合条件的"""
# type_list = session.query(Type).filter(Type.tid > 2)
# for row in type_list:
#     print(row.tid, row.tname)

"""查某个字段[列]"""
# type_list = session.query(Type.tname).all()
# for row in type_list:
#     print(row.tname)

"""删【先查到再删】"""
# session.query(Type).filter(Type.tid > 2).delete()

"""普通改【先查到再改】"""
# session.query(Type).filter(Type.tid > 0).update({'tname': '白金'})

"""在原有基础上改【字符串类型】"""
session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False)

"""在原有基础上改【数字类型类型】"""
# session.query(Type).filter(Type.tid > 0).update({'num': Type.num+1}, synchronize_session = 'evaluate')  # 字段没有创建num字段,无法测试

session.commit()    # 提交
session.close()     # 关闭连接
具体代码实现.py
- 增:
    """增加一条用户"""
    obj1 = Type(tname='普通用户')
    session.add(obj1)
    
    """增加多条用户"""
    objs = [
        Type(tname='超级用户'),
        Type(tname='白金用户'),
        Type(tname='黑金用户'),
    ]
    session.add_all(objs)
    

- 删:
    """删【先查到再删】"""
    session.query(Type).filter(Type.tid > 2).delete()
    
- 改:
    """普通改【先查到再改】"""
    session.query(Type).filter(Type.tid > 0).update({'tname':'白金'})

    """在原有基础上改【字符串类型】"""
    session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False)

    """在原有基础上改【数字类型类型】"""
    session.query(Type).filter(Type.tid > 0).update({'num': Type.num+1}, synchronize_session = 'evaluate')  
        # 字段没有创建num字段,无法测试

- 查:
    """查全部"""
    type_list = session.query(Type)
    for row in type_list:
        print(row.tid, row.tname)
        
    """查符合条件的"""
    type_list = session.query(Type).filter(Type.tid > 0 )
    for row in type_list:
        print(row.tid, row.tname)
        
    """查某个字段[列]"""
    # type_list = session.query(Type.tname)
    for row in type_list:
        print(row.tname)
分解代码

查的扩展

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy import and_, or_
Base = declarative_base()


class Type(Base):
    __tablename__ = 'type'
    tid = Column(Integer, primary_key=True, autoincrement=True)
    tname = Column(String(32))


class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    email = Column(String(32))
    type_id = Column(Integer, ForeignKey('type.tid'))


conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"


def create_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表


def drop_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.drop_all(engine)                # 找到继承了Base的所有的类,删除表


engine = create_engine(conn, max_overflow=5)
Session = sessionmaker(bind=engine)
session = Session()     # 获取一个连接

"""查全部"""
# type_list = session.query(Type)
# for row in type_list:
#     print(row.tid, row.tname)

"""查符合条件的"""
# type_list = session.query(Type).filter(Type.tid > 0 )
# for row in type_list:
#     print(row.tid, row.tname)

"""查某个字段[列]"""
# type_list = session.query(Type.tname)
# for row in type_list:
#     print(row.tname)

"""查询符合条件的第一个"""
# ret = session.query(Users).filter_by(name='alex').first()
# print(ret.name, ret.email, ret.type_id)

"""and"""
# ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric')
# for row in ret:
#     print(row.name)

"""between"""
# ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric')
# for row in ret:
#     print(row.id, row.name)

"""in"""
# ret = session.query(Users).filter(Users.id.in_([1, 3, 4]))
# for row in ret:
#     print(row.id, row.name)

"""not in"""
# ret = session.query(Users).filter(~Users.id.in_([1, 3, 4]))
# for row in ret:
#     print(row.id, row.name)

"""子查询"""
# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric')))
# for row in ret:
#     print(row.id, row.name)

""" and or """
# from sqlalchemy import and_, or_
# ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric'))
# for row in ret:
#     print(row.id, row.name)

# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric'))
# for row in ret:
#     print(row.id, row.name)

# ret = session.query(Users).filter(
#     or_(
#         Users.id < 2,
#         and_(Users.name == 'eric', Users.id > 3),
#         Users.email != ""
#     ))
# for row in ret:
#     print(row.id, row.name)

"""通配符"""
# ret = session.query(Users).filter(Users.name.like('e%'))
# for row in ret:
#     print(row.id, row.name)

# ret = session.query(Users).filter(~Users.name.like('e%'))     # not like
# for row in ret:
#     print(row.id, row.name)

"""类似limit,但是通过切片取值"""
# ret = session.query(Users)[1:3]
# for row in ret:
#     print(row.id, row.name)

"""排序"""
# ret = session.query(Users).order_by(Users.id.desc())
# for row in ret:
#     print(row.id, row.name)

# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc())
# for row in ret:
#     print(row.id, row.name)

"""分组"""
from sqlalchemy.sql import func
# ret = session.query(Users.name).group_by(Users.name)
# for row in ret:
#     # print(row.name)
#     print(row[0])

# ret = session.query(
#     Users.name,
#     func.max(Users.id),
#     func.sum(Users.id),
#     func.min(Users.id)).group_by(Users.name)
# print(ret)
# for row in ret:   # row 是一个元祖
#     print(row[0], row[1], row[2], row[3])

# ret = session.query(
#     func.max(Users.id),
#     func.sum(Users.id),
#     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2)   # 对聚合函数的返回值做第二次使用
# for row in ret:
#     print(row[0], row[1], row[2])

"""连表"""
# ret = session.query(Users).join(Type)                         # inner join
# print(ret)
# for row in ret:
#     print(row)

# result = session.query(Users).join(Type, isouter=True)        # left join
# print(result)
# for row in result:
#     print(row.name)

"""子查询"""
# 【一】 select * from user where user.id in (子查询)
# q1 = session.query(Users.id).filter_by(name='eric')     # 查找到名字是eric的用户id 的【子查询】
# ret = session.query(Users).filter(Users.id.in_(q1))
# for row in ret:
#     print(row.id, row.name)

# 【二】 select * from (子查询)
# q2 = session.query(Type).filter(Type.tid > 1).subquery()    # 查找到type的tid大于1的所有信息 的【子查询】
# ret = session.query(q2)
# for row in ret:
#     print(row.tid, row.tname)

# 【三*****】 select users.id,users.name,(子查询) from users
q3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar()   # 选出tid == type_id 的 tname
ret = session.query(Users.id, Users.name, q3)
for row in ret:
    print(row)
查的扩展.py
"""查全部"""
type_list = session.query(Type)
for row in type_list:
    print(row.tid, row.tname)
    
"""查符合条件的"""
type_list = session.query(Type).filter(Type.tid > 0 )
for row in type_list:
    print(row.tid, row.tname)
    
"""查某个字段[列]"""
# type_list = session.query(Type.tname)
for row in type_list:
    print(row.tname)

"""查询符合条件的第一个"""
ret = session.query(Users).filter_by(name='alex').first()
print(ret.name, ret.email, ret.type_id)

"""and"""
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric')
for row in ret:
    print(row.name)

"""between"""
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric')
for row in ret:
    print(row.id, row.name)

"""in"""
ret = session.query(Users).filter(Users.id.in_([1, 3, 4]))
for row in ret:
    print(row.id, row.name)

"""not in"""
ret = session.query(Users).filter(~Users.id.in_([1, 3, 4]))
for row in ret:
    print(row.id, row.name)

"""子查询"""
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric')))
for row in ret:
    print(row.id, row.name)

""" and or """
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric'))
for row in ret:
    print(row.id, row.name)

ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric'))
for row in ret:
    print(row.id, row.name)

ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.email != ""
    ))
for row in ret:
    print(row.id, row.name)

"""通配符"""
ret = session.query(Users).filter(Users.name.like('e%'))
for row in ret:
    print(row.id, row.name)

ret = session.query(Users).filter(~Users.name.like('e%'))     not like
for row in ret:
    print(row.id, row.name)

"""类似limit,但是通过切片取值"""
ret = session.query(Users)[1:3]
for row in ret:
    print(row.id, row.name)

"""排序"""
ret = session.query(Users).order_by(Users.id.desc())
for row in ret:
    print(row.id, row.name)

ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc())
for row in ret:
    print(row.id, row.name)

"""分组"""
from sqlalchemy.sql import func
ret = session.query(Users.name).group_by(Users.name)
for row in ret:
    print(row.name)
    print(row[0])

ret = session.query(
    Users.name,
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name)
print(ret)
for row in ret:   # row 是一个元祖
    print(row[0], row[1], row[2], row[3])

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2)   # 对聚合函数的返回值做第二次使用
for row in ret:
    print(row[0], row[1], row[2])

"""连表"""
ret = session.query(Users).join(Type)                         # inner join
print(ret)
for row in ret:
    print(row)

result = session.query(Users).join(Type, isouter=True)        # left join
print(result)
for row in result:
    print(row.name)

"""子查询"""
# 【一】 select * from user where user.id in (子查询)
q1 = session.query(Users.id).filter_by(name='eric')     # 查找到名字是eric的用户id 的【子查询】
ret = session.query(Users).filter(Users.id.in_(q1))
for row in ret:
    print(row.id, row.name)

# 【二】 select * from (子查询)
q2 = session.query(Type).filter(Type.tid > 1).subquery()    # 查找到type的tid大于1的所有信息 的【子查询】
ret = session.query(q2)
for row in ret:
    print(row.tid, row.tname)

# 【*****三】 select users.id,users.name,(子查询) from users
q3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar()   # 选出tid == type_id 的 tname
ret = session.query(Users.id, Users.name, q3)
for row in ret:
    print(row)
分解代码

常用操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

"""
# Model结构
Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    depart_id = Column(Integer)
"""





engine = create_engine(
        "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()



# 1 指定列
# result =session.query(Users.id, Users.name.label('cname')).all()
# print(result)
# for i in result:
#     print(i.id, i.cname)

# 2 默认条件and
# ret = session.query(Users).filter(Users.id >= 1, Users.name == 'alexX').all()
# print(ret)

# 3 between
# ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'helloX').all()
# for i in ret:
#     print(i.id)

# 4 in
# ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
# ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# for i in ret:
#     print(i.id)

# 5 子查询
# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='helloX'))).all()
# for i in ret:
#     print(i.id)

# 6 and 和 or
from sqlalchemy import and_, or_
# ret = session.query(Users).filter(Users.id < 3, Users.name == 'helloX').all()
# ret = session.query(Users).filter(and_(Users.id < 3, Users.name == 'helloX')).all()
# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'helloX')).all()

# ret = session.query(Users).filter(
#     or_(
#         Users.id > 2,
#         and_(Users.name == 'alexX', Users.id < 3),
#     )).all()
#
# for i in ret:
#     print(i.id)

# 7 filter_by
# ret = session.query(Users).filter_by(name='alexX').all()
# for i in ret:
#     print(i.id)

# 8 通配符
# ret = session.query(Users).filter(Users.name.like('A%')).all()  # 注意不区分大小写
# ret = session.query(Users).filter(Users.name.like('%x')).all()  # 注意不区分大小写
# ret = session.query(Users).filter(~Users.name.like('A%')).all()  # 注意不区分大小写

# for i in ret:
#     print(i.id)

# 9 切片
# result = session.query(Users)[1:4]  # 取第二个到第五个(不包括第五个)记录
# for i in result:
#     print(i.id)

# 10 排序
# ret = session.query(Users).order_by(Users.name.desc()).all()    # 从大到小
# ret = session.query(Users).order_by(Users.name.asc(), Users.id.desc()).all()    # 从按名字从小到大, 再按id从大到小
# for i in ret:
#     print(i.id, i.name)

# 11 group by
# from sqlalchemy.sql import func
# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).all()
#
# for i in ret:
#     print(i)

# from sqlalchemy.sql import func
# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).having(func.count(Users.id) > 1).all()
#
# for i in ret:
#     print(i)

# 12.union 和 union all
"""
select id,name from users
UNION
select id,name from users;
"""
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Users.depart_id).filter(Users.id <= 2)
# ret = q1.union(q2).all()
# for i in ret:
#     print(i)


# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Users.depart_id).filter(Users.id <= 2)
# ret = q1.union_all(q2).all()
# for i in ret:
#     print(i)
常用操作

便利的连表

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
Base = declarative_base()


class Type(Base):
    __tablename__ = 'type'
    tid = Column(Integer, primary_key=True, autoincrement=True)
    tname = Column(String(32))


class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    email = Column(String(32))
    type_id = Column(Integer, ForeignKey('type.tid'))

    users_type = relationship('Type', backref='type_users')       # 做关联,更便利的连表


conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"


def create_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表


def drop_table(conn):
    engine = create_engine(conn, max_overflow=5)    # 连接mysql
    Base.metadata.drop_all(engine)                # 找到继承了Base的所有的类,删除表


engine = create_engine(conn, max_overflow=5)
Session = sessionmaker(bind=engine)
session = Session()     # 获取一个连接

"""正向操作"""
print('----正向操作----')
result_list = session.query(Users)
for row in result_list:
    print(row.name, row.type_id, row.users_type.tname)

"""反向操作"""
print('----反向操作----')
result_list = session.query(Type)
for row in result_list:
    # print(row.tid, row.tname)
    print('会员等级:%s' % row.tname)
    for i in row.type_users:
        print(i.id, i.name, i.type_id)
便利的连表.py
- 正向操作
    result_list = session.query(Users)
    for row in result_list:
        print(row.name, row.type_id, row.users_type.tname)
    
    
- 反向操作
    result_list = session.query(Type)
    for row in result_list:
        # print(row.tid, row.tname)
        print('会员等级:%s' % row.tname)
        for i in row.type_users:
            print(i.id, i.name, i.type_id)    
代码分解
原文地址:https://www.cnblogs.com/sunch/p/9598641.html