sqlachelmy的使用

一、增删改查的使用

数据库表的初始化

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from sqlalchemy import create_engine


Base = declarative_base()

class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    depart_id = Column(Integer)

def create_all():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)

def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    drop_all()
    create_all()
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/mytest?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() # 创建表结构,只有继承了Base的类才会被初始化 Base.metadata.create_all(engine)

增删改查

1. 增加
obj = Users(name='alex')
session.add(obj)
session.commit()

session.add_all([
        Users(name='小东北'),
        Users(name='龙泰')
])
session.commit()
session.close()
增加
session.query(Users).filter(Users.id >= 2).delete()
session.commit()
删除
session.query(Users).filter(Users.id == 4).update({Users.name:'二郎神'})
session.query(Users).filter(Users.id == 4).update({'name':'孙悟空'})
session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False)
session.commit()

# synchronize_session代表以字符串的形式更新,如果不添加此参数,默认以数字的形式进行更新。
更改
result = session.query(Users).all()
for row in result:
        print(row.id,row.name)

result = session.query(Users).filter(Users.id >= 2)
for row in result:
        print(row.id,row.name)

result = session.query(Users).filter(Users.id >= 2).first()
print(result)
查看

指定查询字段(列)

# 原生sql
select id,name as cname from users;

# sqlachelmy语句
result = session.query(Users.id,Users.name.label('cname')).all()
for item in result:
        print(item[0],item.id,item.cname)

and查询(默认)

session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()

between查询

session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()

in查询

session.query(Users).filter(Users.id.in_([1,3,4])).all()
session.query(Users).filter(~Users.id.in_([1,3,4])).all()

# in查询要使用in_
# ~代表反向查询

子查询

session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()

and 和 or查询

from sqlalchemy import and_, or_
session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()
# and_为默认的使用方法
session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()

filter_by

session.query(Users).filter_by(name='alex').all()

# filter_by与filter功能相同,只是filter传入的是一个值

通配符查询

ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 相当于原生sql中的 like 'e%'

切片

result = session.query(Users)[1:2]

排序

ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

group_by

from sqlalchemy.sql import func

ret = session.query(
        Users.depart_id,
        func.count(Users.id),
).group_by(Users.depart_id).all()
for item in ret:
        print(item)

from sqlalchemy.sql import func

ret = session.query(
        Users.depart_id,
        func.count(Users.id),
).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
for item in ret:
        print(item)

# func中含有sql计算方法
# 一旦使用了func方法,再想过滤查询只能使用having方法

union 和 union_all

"""
select id,name from users
UNION
select id,name from users;
"""
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

# 二者的区别在于union会合并重复的查询结果,而union_all不会,它让然会将重复的结果累加到结果下面。

# union查询类似于left join或者right join查询

 二、外键查询

1.ForeignKey

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship


Base = declarative_base()

class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    depart_id = Column(Integer,ForeignKey("depart.id"))

    dp = relationship("Depart", backref='pers')
数据库模型

使用

# 1.直接查询
ret = session.query(Users).all()
for row in ret:
    print(row.id, row.name, row.depart_id)
    
session.close()

# 2.联表查询
ret = session.query(Users.id,Users.name,Depart.title).join(Depart).all()
# 二者结果相同,外键自动匹配主键,默认为inner查询,添加isouter=True后变为left join
ret = session.query(Users.id,Users.name,Depart.title).join(Depart, Users.depart_id==Depart.id,isouter=True).all()
for row in ret:
    # print(row)    (1, 'alex', '研发')
    print(row.id, row.name, row.title)  # 1 alex 研发

session.close()

# 3.外键关联查询
ret = session.query(Users).all()
for row in ret:
    print(row.id, row.name, row.dp.title)

session.close()

# 4.反向查询
ret = session.query(Depart).filter(Depart.title=="运维").first()
for row in ret.pers:
    print(row.id,row.name,ret.title)

session.close()

# 5.外键表中创建一个值,引用外键的表中创建多个数据
# 方式一:各自设置自己的表
d1=Depart(title="前端")
session.add(d1)
session.commit()

u1=Users(name="小强",depart_id=d1.id)
session.add(u1)
session.commit()

session.close()

# 方式2:引用外键的表通过relationship正向设置外键表的值
u1=Users(name="小红",dp=Depart(title="java开发"))
session.add(u1)
session.commit()

session.close()

# 方式3:外键表通过backref反向设置引用外键的表
d1=Depart(title="大数据")
d1.pers=[Users(name="AAA"),Users(name="BBB"),Users(name="CCC")]
session.add(d1)
session.commit()

session.close()

2.m2m查询

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship


Base = declarative_base()

class Student(Base):
    __tablename__ = 'student'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)

    course_list = relationship('Course', secondary='student2course', backref='student_list')

class Course(Base):
    __tablename__ = 'course'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class Student2Course(Base):
    __tablename__ = 'student2course'
    id = Column(Integer, primary_key=True, autoincrement=True)
    student_id = Column(Integer, ForeignKey('student.id'))
    course_id = Column(Integer, ForeignKey('course.id'))

    __table_args__ = (
        UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
        # Index('ix_id_name', 'name', 'extra'),                          # 联合索引
    )
数据库模型
# 1.录入数据
session.add_all([
    Student(name='张三'),
    Student(name='李四'),
    Course(title='物理'),
    Course(title='化学'),
])

session.commit()

session.add_all([
    Student2Course(student_id=1,course_id=1),
    Student2Course(student_id=1,course_id=2),
    Student2Course(student_id=2,course_id=1),
])

session.commit()
session.close()

# 2.三张表关联
ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Student2Course.id.asc())
for row in ret:
    print(row)
    
session.close()

# 3.三张表关联后,筛选数据
ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=="张三").order_by(Student2Course.id.asc())
for row in ret:
    print(row)

session.close()

# 4.使用relationship后,跨表多对多查询
ret = session.query(Student).filter(Student.name=="张三").first()
# 在设置了relationship的表中,通过设置的字段名称来进行m2m查询
for row in ret.course_list:
    print(row.title)

ret2 = session.query(Course).filter(Course.title=="物理").first()
# 在没有设置relationship的表中,通过设置了relationship表中的backref的值来进行m2m查询
for row in ret2.student_list:
    print(row.name)
    
session.close()

# 5.多对多关系插入数据
obj = Course(title="英语")
obj.student_list = [Student(name="小红"), Student(name="小强")]

session.add(obj)
session.commit()

session.close()

三、sqlalchemy的两种连接方式

方式一:每连接一次,需要指定生成一个线程

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/mywork?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

def task():
    # 去连接池中获取一个连接
    session = SessionFactory()

    ret = session.query(Student).all()

    # 将连接交还给连接池
    session.close()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
View Code

方式二:每一次连接自动生成新的线程

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/mywork?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    ret = session.query(Student).all()
    # 将连接交还给连接池
    session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
View Code

四、sqlalchemy执行原生sql

方式一:

# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'alex'})
session.commit()
print(cursor.lastrowid)

# 这里使用变量赋值需要主意冒号":value"

方式二:

conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
    "select * from t1"
)
result = cursor.fetchall()
cursor.close()
conn.close()

# 不使用sessionmaker,直接使用连接池
原文地址:https://www.cnblogs.com/ttyypjt/p/11084132.html