Flask-SQLAlchemy数据库操作

建表

  1 # -*- coding: UTF-8 -*-
  2 
  3 from . import db
  4 
  5  
  6 
  7 #多对多关系表,两组一对多,即为多对多
  8 
  9 class Boy2Girl(db.Model):
 10 
 11     __tablename__ = 'boy2girl'
 12 
 13     nid = db.Column(db.Integer,primary_key=True)
 14 
 15     #建立一对多关系,ForeignKey传入对应表的__tablename__.id
 16 
 17     boy_id = db.Column(db.Integer,db.ForeignKey('boy.id'))
 18 
 19     # 建立一对多关系,ForeignKey传入对应表的__tablename__.id
 20 
 21     girl_id = db.Column(db.Integer,db.ForeignKey('girl.id'))
 22 
 23  
 24 
 25 class Boy(db.Model):
 26 
 27     __tablename__ = 'boy'
 28 
 29     id = db.Column(db.Integer,primary_key=True)
 30 
 31     name = db.Column(db.String(32),unique=True)
 32 
 33     age = db.Column(db.Integer)
 34 
 35     #并不会在数据库中生成列,作用为ORM调用时可以通过.来调用另一张表
 36 
 37     #例如boy.girl,girl.boy
 38 
 39     #传入的参数为另一张表的类名,关系表的表名,反向查询时的名字,可以为任意名字
 40 
 41     girl = db.relationship('Girl',secondary = Boy2Girl.__tablename__,backref = 'boy')
 42 
 43     car = db.relationship('Car',backref = 'boy')
 44     
 45     #自引用
 46     
 47     father_id = db.Column(db.Integer,db.ForeignKey('boy.id'))
 48     
 49     #自引用时需添加remote_side=[id],一对一添加uselist=Flase
 50     
 51     father = db.relationship('Comment',backref = 'child',remote_side=[id],uselist=Flase)
 52     
 53     def __repr__(self):
 54 
 55         return '<Boy %r>'%self.name
 56 
 57 
 58 #高级多对多,关联表和普通多对多一致,好处是可以直接访问关联表,调用其中的其他字段
 59         
 60 #多对多关系表,两组一对多,即为多对多
 61 
 62 class Girl2Car(db.Model):
 63 
 64     __tablename__ = 'girl2car'
 65 
 66     nid = db.Column(db.Integer,primary_key=True)
 67 
 68     #建立一对多关系,ForeignKey传入对应表的__tablename__.id
 69 
 70     car_id = db.Column(db.Integer,db.ForeignKey('car.id'))
 71 
 72     # 建立一对多关系,ForeignKey传入对应表的__tablename__.id
 73 
 74     girl_id = db.Column(db.Integer,db.ForeignKey('girl.id'))        
 75     
 76     create_time = db.Column(db.DateTime,default=datetime.datetime.utcnow)
 77         
 78 
 79 class Girl(db.Model):
 80 
 81     __tablename__ = 'girl'
 82 
 83     id = db.Column(db.Integer,primary_key=True)
 84 
 85     name = db.Column(db.String(32),unique=True)
 86 
 87     age = db.Column(db.Integer)
 88     
 89     #cascade删除时删除所有关联项
 90     
 91     #高级多对多,需要指定foreign_keys = [Girl2Car.car_id],手工管理外键
 92     
 93     car = db.relationship('Girl2Car',foreign_keys = [Girl2Car.car_id],backref=db.backref('girl',lazy='joined'),
 94                                lazy='dynamic',cascade='all,delete-orphan')
 95                                
 96 
 97     
 98     def __repr__(self):
 99 
100         return '<Girl %r>' % self.name
101 
102  
103 
104 class Car(db.Model):
105 
106     __tablename__ = 'car'
107 
108     id = db.Column(db.Integer,primary_key=True)
109 
110     name = db.Column(db.String(32),unique=True)
111 
112     #建立一对多关系,ForeignKey传入对应表的__tablename__.id,写在多的一对多中多的那边
113 
114     boy_id = db.Column(db.Integer,db.ForeignKey('boy.id'))
115     
116     #cascade删除时删除所有关联项
117     
118     #高级多对多,需要指定foreign_keys = [Girl2Car.girl_id],手工管理外键
119 
120     girl = db.relationship('Girl2Car',foreign_keys = [Girl2Car,girl_id],backref=db.backref('car',lazy='joined'),
121                                lazy='dynamic',cascade='all,delete-orphan')
122 
123     def __repr__(self):
124 
125         return '<Car %r>'%self.name
126         
127         

查询

 1 #查询全部
 2 boys = Boy.query.all()
 3 
 4 #排序
 5 boys = Boy.query.order_by(Boy.age.desc()).all()
 6 
 7 #`filter_by`和`filter`都是过滤条件,只是用法有区别`filter_by`里面能用`=`,不能用`!= `还有`> <` 等等
 8 
 9 #过滤条件查询一个
10 boy1 = Boy.query.filter_by(id=1).first()
11 boy1 = Boy.query.filter(Boy.id=1).first()
12 
13 #过滤条件查多个
14 boys = Boy.query.filter(Boy.id>1).all()
15 
16 #切片查询
17 #限制返回条数
18 boys = Boy.query.filter(Boy.name!='jingqi').limit(2).all()
19 #从第三条开始返回查询结果
20 boys = Boy.query.filter(Boy.name!='jingqi').offset(2).all()
21 #切片返回记录
22 boys = Boy.query.filter(Boy.name!='jingqi').slice(2,3).all()
23 
24 #模糊匹配,是十分耗费时间的,能不用就尽量不要用。
25 boys = Boy.query.filter(Boy.name.like('jingqi')).all()
26 boys = Boy.query.filter(Boy.name.notlike('jingqi')).all()
27         
28 #成员属于
29 boys = Boy.query.filter(Boy.name.in_(['jingqi','jingqi1'])).all()
30 boys = Boy.query.filter(Boy.name.notin_(['jingqi','jingqi1'])).all()
31         
32 #选择条件
33 from flask_sqlalchemy import or_,and_,all_,any_
34 boys = Boy.query.filter(or_(User.name=='jingqi',User.age==12)).all()
35 boys = Boy.query.filter(and_(User.name=='jingqi',User.age==12)).all()
36 

 分页查询

第一个参数表示当前页,第二个参数代表每页显示的数量,error_out=True的情况下如果指定页没有内容将出现404错误,否则返回空的列表

 1 #分页查询,page为第几页
 2 pagination = Boy.query.order_by(Boy.id.desc()).paginate(page, per_page=10, error_out=False)
 3 boys = pagination.items
 4 
 5 has_next :是否还有下一页
 6 has_prev :是否还有上一页
 7 items : 返回当前页的所有内容
 8 next(error_out=False) : 返回下一页的Pagination对象
 9 prev(error_out=False) : 返回上一页的Pagination对象
10 page : 当前页的页码(从1开始)
11 pages : 总页数
12 per_page : 每页显示的数量
13 prev_num : 上一页页码数
14 next_num :下一页页码数
15 query :返回 创建这个Pagination对象的查询对象
16 total :查询返回的记录总数

增删改

#创建删除表,可以在flask shell中操作
db.create_all()
db.drop_all()

#
user1 = User(username='Jim',role=admin_role)
db.session.add(user1)
db.session.commit()


#
db.session.delete(admin_role)
db.session.commit()

#
admin_role.name = 'AD'
db.session.add(admin_role)
db.session.commit()

#回滚,在commit前都可以回滚到上次commit时的状态
db.seesion.rollback()

原生sql语句

1 id = request.args.get('id')
2 #原生sql
3 user1 = db.session.execute("SELECT * from user WHERE id="+id)
4 #防注入,使用占位符
5 conn = db.session.connection()
6 user1 = conn.execute("SELECT * from user WHERE id=%s", [id])

高级部分,以下部分搞清楚后才能更好的做优化

高级查询可以尝试先写原生sql,再转换为orm

 1 #需要原生sqlalchemy中调用func,可以通过func.的方式使用sql中的内置函数
 2 from sqlalchemy import func
 3 
 4 #联合查询,join,左右联合,尽量不使用联合查询,联合查询优于子查询
 5 all_school_count = School.query.join(User).filter(School.leader_id == user1.id).count()
 6 
 7 #联合查询,union,上下联合,两张表用于联合的列名需要一致,不一致时可使用label改成一致
 8 #坑比较多,不能对整个联合表排序(可以排序一个表),不能把外键表的数据加进来(会使数据莫名变多)
 9 user_data = session.query(User.name,User.age)
10 tec_data = session.query(Teacher.tec_name.label('name'), Teacher.tec_age.label('age'))
11 result = user_data.union_all(tec_data).all()
12 
13 
14 #分组查询,使用having限制条件,group_by,having
15 all_school_count = db.session.query(func.count(School.id)).group_by(School.leader_id).having(func.count(School.id)>1).first()[0]
16 
17 #偏移offset,限制数量limit,
18 user1_schools = School.query.filter(School.leader_id==user1.id).offset(5).limit(3).all()
19 
20 #slice, sliece(start, end) 从start取到end.
21 user1_schools = School.query.filter(School.leader_id == user1.id).slice(5, 8).all()
22 
23 
24 
25 #事务,正常时手动commit,异常时手动回滚rollback,全部成功或者全部回滚
26 try:
27    school1 = School(name='transaction'+str(num), leader_id=1)
28    db.session.add(school1)
29    school2= School(name='transaction'+str(int(num)+1), leader_id=1)
30    db.session.add(school2)
31 except:
32    db.session.rollback()
33 else:
34    db.session.commit()
35 
36 #年级
37 class Grade(db.Model):
38     __tablename__ = 'grade'
39     id = db.Column(db.Integer, primary_key=True, autoincrement=True)
40     name = db.Column(db.String(64), nullable=False,index=True)
41 
42     school_id = db.Column(db.Integer, db.ForeignKey('school.id',ondelete='RESTRICT'))
43     school = db.relationship('School', foreign_keys=school_id, backref='grade')
44 
45     from sqlalchemy import text, event
46     #触发器,定义在模型中
47     @staticmethod
48     def chang_all_grade_count(school_id):
49         #子查询
50         user1 = User.query.filter(User.id==(db.session.query(School.leader_id).filter(School.id == school_id)).subquery()).first()
51         all_grade_count = user1.all_grade_count
52         all_grade_count+=1
53         db.session.execute(text(
54             "update `user` set all_grade_count=:all_grade_count where id=:user_id"),
55             {'user_id': user1.id,'all_grade_count':all_grade_count})
56 
57     @staticmethod
58     def on_insert(mapper, connection, target):
59         Grade.chang_all_grade_count(target.school_id)
60 
61     @staticmethod
62     def on_delete(mapper, connection, target):
63         Grade.chang_all_grade_count(target.school_id)
64 
65     def __repr__(self):
66         return '<Grade %r>' % self.name
67 
68 #注册触发器,在增删改时触发
69 db.event.listen(Grade,'after_insert',Grade.on_insert)
70 db.event.listen(Grade,'after_delete',Grade.on_delete)

通过上下文管理器的方式管理事务

'''
集成并在原本的SQLAlchemy类中添加新功能,方便使用
'''
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
class SQLAlchemy(_SQLAlchemy):
    #事务上下文管理器
    @contextmanager
    def auto_commit(self):
        try:
            yield
            self.session.commit()
        except Exception as e:
            self.session.rollback()
            raise e

db = SQLAlchemy()

lazy和ondelete

relationship中的lazy属性
select: 首次访问按需加载;
immediate: 源对象加载后就加载;
joined: 加载记录,但使用连接;
subquery: 立即加载记录,使用子查询;
noload: 永不加载;
dynamic: 不加载记录,但提供加载记录的查询。

#直接获取backref的数据,查询结果为数据列表
lazy="select"
#查询结果为查询对象,只可以用在一对多和多对多关系中,不可以用在一对一和多对一中
lazy="dynamic"
#表面看查询结果同dynamic,但sql层面是通过join完成的,所以是一次查询,结果都出来了
#但也查询出了很多冗余数据,且join查询本身较慢,而dynamic需要多次查询
lazy="joined"


使用ondelete指定约束, 外建约束有以下几种:
RESTRICT:删除父表数据时,如果子表有数据在使用该字段的数据时,会阻止删除(默认为此约束)
NO ACTION:在MySQL中,同RESTRICT
CASCADE:级联删除,删除父表的某一条数据时,子表中使用该外建的数据也会被删除
SET NULL:父表数据被删除,删除父表的某一条数据时,子表中使用该外建的数据设置为NULL

#删除父数据,与其关联的子数据全部删除
relationship中设置cascade='all, delete-orphan' 与 passive_deletes = True,
ForeignKey中设置ondelete = 'CASCADE'

乐观锁、悲观锁

乐观锁,假定操作不会修改数据,在查询时正常查询,在修改时做判断
适合查多改少,经常被并发修改的数据可能老是出现版本不一致导致有的线程操作常常失败
悲观锁,假定操作会修改数据,在查询时加锁,修改完后释放锁
适合短事务(长事务导致其它事务一直被阻塞,影响系统性能),查少该多

乐观锁实现:
为数据表新加一列version
1、A 线程准备往小明的账户上加100, 1, 读取到小明 有 1000 元, 1000 + 100 事务未提交 ,读取到的版本号(oversion)为0
2、B线程准备往小明的账户上加100, 1, 读取到小明 有 1000 元, 1000 + 100 事务未提交 ,读取到的版本号(oversion)为0
3、A 线程提交事务,对比版本号,如果数据库该条数据的版本号和线程所持有该条数据的版本号一致,说明数据没有修改过。
更新余额以及版本号 小明账户余额变成1100 版本号(version)变成+ 14、B 线程提交事务 对比版本号,发现说持有的数据和数据中的版本不一致。本次事务回滚. 悲观锁实现: 使用with_for_update加锁,commit后释放锁 addr = Address.query.filter_by(user_id=3).with_for_update().first()
原文地址:https://www.cnblogs.com/cx59244405/p/9910547.html