SQLAlchemy的使用

1. 介绍

  SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

2. orm单表操作

  2.1 orm操作数据表

from coreschema import Integer
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index



Base = declarative_base()

class Users(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False, index=True)
    depart_id = Column(Integer)



# 可以将 engine 提取出来共用
engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,     # 超过连接池大小外最多创建的连接
        pool_size=5,        # 连接池大小
        pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )


def create_tb():
    """
    创建表
    :return: 
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,     # 超过连接池大小外最多创建的连接
        pool_size=5,        # 连接池大小
        pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置),-1表示不重置
    )
    Base.metadata.create_all(engine)    # create_all() 必须是这个函数名, 表示创建表


def drop_tb():
    """
    删除表
    :return: 
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,     # 超过连接池大小外最多创建的连接
        pool_size=5,        # 连接池大小
        pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)

# 执行操作
create_tb()
drop_tb()

  2.2 orm操作数据行

from sqlalchemy import create_engine, and_, or_
from sqlalchemy.orm import sessionmaker
from models import Users



engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,     # 超过连接池大小外最多创建的连接
        pool_size=5,        # 连接池大小
        pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 创造一个连接池 SessionFactory
= sessionmaker(bind=engine) # 每次进行数据库操作时都需要创建一个session, 表示从连接池中获取到一个连接 session = SessionFactory() ###################### 单表的 crud 操作 ''' # 1 增加 user = Users(name="张三") session.add(user) session.commit() # 批量增加 session.add_all([ Users(name="李四"), Users(name="王五"), Users(name="赵六"), ]) session.commit() # 2 查找 result = session.query(Users).all() print(result) # [Users object, Users object, Users object, .....] for item in result: print(item.id, "----",item.name) result = session.query(Users).filter(Users.id >= 3) print(result) # sql 语句 for i in result: print(i.name) result = session.query(Users).filter(Users.id >= 3).first() print(result.name) # 3 删除 ret = session.query(Users).filter(Users.id > 3).delete() print(ret) # 1 返回删除的数据量 session.commit() # 4 修改 ret = session.query(Users).filter(Users.id == 1).update({Users.name:"张三三"}) print(ret) # 1 返回修改的数据量 session.commit() session.query(Users).filter(Users.id == 2).update({"name":"李思思"}) session.commit() session.query(Users).filter(Users.id == 3).update({"name":Users.name+"老五"}, synchronize_session=False) # 当要拼接字符串时候, 需要加参数 synchronize_session=False # 数字相加时, 不需要这个参数 session.commit() ###################### 单表的其他操作 # 1 获取指定字段信息 ret = session.query(Users.id,Users.name,Users.name.label("nick_name")).all() print(ret) for i in ret: print(i[0],i.name,i.nick_name) #2 and, 两种方式一样 ret = session.query(Users).filter(Users.id==2,Users.name=="李思思").all() print(ret) ret = session.query(Users).filter(and_(Users.id==2,Users.name=="李思思")).all() print(ret) # 3 or ret = session.query(Users).filter(or_(Users.id==1,Users.id==2)).all() print(ret) # and 和 or 结合使用 ret = session.query(Users).filter(or_( Users.id == 1, and_(Users.name=="李思思", Users.id>=1), )).all() print(ret) # 4 between, 闭区间, 包括前后范围(1,3) ret = session.query(Users).filter(Users.id.between(1,3)).all() print(ret) # 5 in 和 ~in(就是not in) ret = session.query(Users).filter(Users.id.in_([1,2,3,4])).all() print(ret) ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() print(ret) # 6 子查询 ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(or_(Users.id>2, Users.name=="张三三")))).all() print(ret) # 7 filter_by, 里面是参数, 不是语句(比如 name == "李思思") ret = session.query(Users).filter_by(name="李思思").all() print(ret) # 8 通配符 ( % :表示0个或多个, _ :表示一个) ret = session.query(Users).filter(Users.name.like("李%")).all() print(ret) ret = session.query(Users).filter(~Users.name.like("李%")).all() print(ret) ret = session.query(Users).filter(Users.name.like("李__")).all() print(ret) # 9 切片 ret = session.query(Users).filter(Users.id>=1)[1:3] print(ret) # 10 order_by 排序 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 11 group_by 分组 from sqlalchemy.sql import func ret = session.query( Users.depart_id, func.count(Users.id) ).group_by(Users.depart_id).all() for i in ret: print(i) # group_by 和 having 结合使用 from sqlalchemy.sql import func ret = session.query( Users.id, func.count(Users.id) ).group_by(Users.depart_id).having(func.count(Users.id)>=2).all() print(ret) for i in ret: print(i) # 12 union 和 union_all # select id,name from users UNION select id,name from users; q1 = session.query(Users.name).filter(Users.id > 3) q2 = session.query(Users.name).filter(Users.id >= 1) ret = q1.union(q2).all() # union() 去重 print(ret) # [('赵六',), ('张三三',), ('李思思',), ('王五老五',)] q1 = session.query(Users.name).filter(Users.id > 3) q2 = session.query(Users.name).filter(Users.id >= 1) ret = q1.union_all(q2).all() # union_all() 不去重 print(ret) # [('赵六',), ('张三三',), ('李思思',), ('王五老五',), ('赵六',)] '''

 3. orm多表操作

  3.1 一对多操作

   操作数据表

from sqlalchemy import Column, create_engine, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()

class Depart(Base):
    __tablename__ = "depart"

    id = Column(Integer, primary_key=True)
    title = Column(String(32), nullable=True, index=True)


class Users(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=True, index=True)
    dep_id = Column(Integer, ForeignKey("depart.id"))
    dp = relationship("Depart", backref="uuss")


def create_tb():
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=10,  # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.create_all(engine)


create_tb()

注意: 为什么有了 foreignkey 外键, 还要 relationship ? 这两者什么关系?

  relationship 是为了简化联合查询 join 等, 创建的两个表之间的虚拟关系, 这种关系与标的结构时无关的. 他与外键十分相似, 确实, relationship 必须在外键的基础上才允许使用, 不然会报错.  

  如果上面 User 模型类中只存在 foreignkey, 需要使用手写 join 才能取出数据; 而如果有 relationship 则可以使两张表之间产生关联, 类似于合成一张表, 可以直接取出数据而不需要手写 join 操作, 具体使用如下. 

  操作数据行

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from models import Users, Depart



engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,     # 超过连接池大小外最多创建的连接
        pool_size=5,        # 连接池大小
        pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()

"""
# 1 查询所有用户
ret = session.query(Users).all()
print(ret)


# 2 查询所有用户及对应的部门(不用relationship的操作)
ret = session.query(Users,Depart).join(Depart).all()
for i in ret:
    print(i[0].id, i[0].name, i[1].title)


ret = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.dep_id == Depart.id).all()
                                                            # 默认有 Users.dep_id == Depart.id, 并且是 inner join
for i in ret:
    print(i.id, i.name, i.title)


ret = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.dep_id == Depart.id, isouter=True).all()
                                                            # isouter=True , 表示 左外连接
for i in ret:
    print(i.id, i.name, i.title)


# 3 relationship 使用(有了relationship后就可以不用手写join()连表)
# 正查: 查询每个人及其所在部门名称
ret = session.query(Users).all()
for i in ret:
    print(i.id, i.name, i.dp.title)

# 反查: 查询it部门的人员名称
ret = session.query(Depart).filter(Depart.title=="it").first()
for i in ret.uuss:
    print(ret.title, i.id, i.name)


# 4 创建一个研发部门, 并在该部门中添加一个员工--霸天虎
user = Users(name="霸天虎", dp=Depart(title="研发"))
session.add(user)
session.commit()
"""

# 5 创建一个汽车人的部门, 并添加员工--大黄蜂, 威震天, 小蜜蜂
depart = Depart(title="汽车人")
depart.uuss = [Users(name="威震天"), Users(name="大黄蜂"), Users(name="小蜜蜂")]
session.add(depart)
session.commit()

  3.2 多对多操作

   操作数据表

from sqlalchemy import Column, create_engine, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()


class Student(Base):
    __tablename__ = "student"
    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=True, index=True)
    course_lst = relationship("Course", secondary="student2course", backref="student_lst")   # 添加了多对多信息



class Course(Base):
    __tablename__ = "course"
    id = Column(Integer, primary_key=True)
    title = Column(String(32), nullable=True, index=True)



class Student2Course(Base):
    __tablename__ = "student2course"
    id = Column(Integer, primary_key=True)
    sid = Column(Integer, ForeignKey("student.id"))
    cid = Column(Integer, ForeignKey("course.id"))

    __table_args__ = (
        UniqueConstraint('sid', 'cid', name='unix_stu_cou'),        # 联合唯一索引
        # Index('ix_id_name', 'name', 'extra'),                     # 联合索引
    )

   操作数据行

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from models import Users, Depart, Student, Course, Student2Course


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,     # 超过连接池大小外最多创建的连接
        pool_size=5,        # 连接池大小
        pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
        pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()

"""
# 1 录入数据
session.add_all([
    Student(name="猫女"),
    Student(name="薇恩"),
    Course(title="ad"),
    Course(title="ap")
])
session.commit()


session.add_all([
    Student2Course(sid=1,cid=1),
    Student2Course(sid=1,cid=2),
    Student2Course(sid=2,cid=1),
])
session.commit()

# 2 每个学生选的所有课程名 (不用多对多字段)
ret = session.query(Student2Course.id, Student.id, Student.name, Course.title).join(Student,Student2Course.sid==Student.id,isouter=True).join(Course,Student2Course.cid==Course.id,isouter=True).all()
print(ret)
for i in ret:
    print(i)

# 3. 猫女 选的所有课 (不用多对多字段)
ret = session.query(Student2Course.id, Student.id, Student.name, Course.title).join(Student,Student2Course.sid==Student.id,isouter=True).join(Course,Student2Course.cid==Course.id,isouter=True).filter(Student.name=="猫女").all()
for i in ret:
    print(i)

# 3. 猫女 选的所有课 (用了多对多字段)
student = session.query(Student).filter(Student.name=="猫女").first()
for i in student.course_lst:
    print(student.name, i.title)

# 4. 选了 ap课 的所有人
course = session.query(Course).filter(Course.title=="ad").first()
for i in course.student_lst:
    print(course.title, i.name)
"""

# 5. 创建一个课程,创建2学生并选择该课程
course = Course(title="ye")
course.student_lst = [Student(name="小强"), Student(name="狮子狗"), Student(name="斯巴达")]
session.add(course)
session.commit()

 4. 多线程连接数据库连接池

  4.1 方式1: scoped_session ( 推荐使用 )

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,     # 连接池大小
        pool_timeout=10, # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
# scoped_session里面维护着一个 threading.local(), 为每一个线程独立开辟一个空间,在这里就可以放每一个线程的连接
session = scoped_session(SessionFactory)


def task():
    # 进行数据库操作
    ret = session.query(Student).all()

    # 将连接交还给连接池
    session.remove()




from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()

  4.2 方式2: 在执行函数中获取连接

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student, Course, Student2Course

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,     # 连接池大小
    pool_timeout=10, # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)


def task():
    # 去连接池中获取一个连接
    session = SessionFactory()

    # 数据库操作
    ret = session.query(Student).all()

    # 将连接交还给连接池
    session.close()





from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
原文地址:https://www.cnblogs.com/bk9527/p/10994509.html