04-ORM

一、ORM

ORM 全拼Object-Relation Mapping,中文意为 对象-关系映射。主要实现模型对象到关系数据库数据的映射

优点 :

  • 只需要面向对象编程, 不需要面向数据库编写代码.

    • 对数据库的操作都转化成对类属性和方法的操作.

    • 不用编写各种数据库的sql语句.

  • 实现了数据模型与数据库的解耦, 屏蔽了不同数据库操作上的差异.

    • 不再需要关注当前项目使用的是哪种数据库。

    • 通过简单的配置就可以轻松更换数据库, 而不需要修改代码.

缺点 :

  • 相比较直接使用SQL语句操作数据库,有性能损失.

  • 根据对象的操作转换成SQL语句,根据查询的结果转化成对象, 在映射过程中有性能损失.

二、Flask-SQLAlchemy

flask默认提供模型操作,但是并没有提供ORM,所以一般开发的时候我们会采用flask-SQLAlchemy模块来实现ORM操作。

SQLAlchemy是一个关系型数据库框架,它提供了高层的 ORM 和底层的原生数据库的操作。flask-sqlalchemy 是一个简化了 SQLAlchemy 操作的flask扩展。

SQLAlchemy: https://www.sqlalchemy.org/

中文文档: https://www.osgeo.cn/sqlalchemy/index.html

 

安装 flask-sqlalchemy【清华源】

pip install flask-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple

如果连接的是 mysql 数据库,需要安装 mysqldb 驱动

pip install flask-mysqldb -i https://pypi.tuna.tsinghua.edu.cn/simple

安装flask-mysqldb时,注意

安装 flask-mysqldb的时候,python底层依赖于一个底层的模块 mysql-client模块
如果没有这个模块,则会报错如下:
​
Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-21hysnd4/mysqlclient/

解决方案:

sudo apt-get install libmysqlclient-dev python3-dev
​
运行上面的安装命令如果再次报错如下:
   dpkg 被中断,您必须手工运行 ‘sudo dpkg --configure -a’ 解决此问题。
​
则根据提示执行命令以下命令,再次安装mysqlclient
    sudo dpkg --configure -a
    apt-get install libmysqlclient-dev python3-dev
​
解决了mysqlclient问题以后,重新安装 flask-mysqldb即可。
pip install flask-mysqldb -i https://pypi.tuna.tsinghua.edu.cn/simple

1、数据库连接设置

  • 在 Flask-SQLAlchemy 中,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的 SQLALCHEMY_DATABASE_URI 键中

    config.py,配置文件代码:

class Config(object):
    DEBUG = True
    SECRET_KEY = "*(%#4sxcz(^(#$#8423"
    # 数据库链接配置 = 数据库名称://登录账号:登录密码@数据库主机IP:数据库访问端口/数据库名称?charset=编码类型
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
  • 其他设置:

# 动态追踪修改设置,如未设置只会提示警告
SQLALCHEMY_TRACK_MODIFICATIONS = True
#查询时会显示原始SQL语句
SQLALCHEMY_ECHO = True
  • 配置完成需要去 MySQL 中创建项目所使用的数据库

$ mysql -uroot -p123456
mysql > create database students charset=utf8mb4;

2、常用的SQLAlchemy字段类型

模型字段类型名python中数据类型说明
Integer int 普通整数,一般是32位
SmallInteger int 取值范围小的整数,一般是16位
BigInteger int或long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 普通数值,一般是32位
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串做了优化
Unicode unicode 变长Unicode字符串
UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化
Boolean bool 布尔值
Date datetime.date 日期
Time datetime.time 时间
DateTime datetime.datetime 日期和时间
LargeBinary str 二进制文件内容

 

3、常用的SQLAlchemy列约束选项

选项名说明
primary_key 如果为True,代表表的主键
unique 如果为True,代表这列不允许出现重复的值
index 如果为True,为这列创建索引,提高查询效率
nullable 如果为True,允许有空值,如果为False,不允许有空值
default 为这列定义默认值

 4、定义模型类

from flask import Flask,render_template,request
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    DEBUG = True
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)

db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")

    def __repr__(self):
        return self.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")

    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))

    def __repr__(self):
        return self.name

@app.route("/")
def index():
    return "Ok"

if __name__ == '__main__':
    # with app.app_context():
    #     db.create_all() # 根据模型创建所有的数据表
    #     # db.drop_all()   # 删除模型对应的所有数据表
    app.run()

5、数据库操作

from flask import Flask,render_template,request
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    DEBUG = True
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)

db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")

    def __repr__(self):
        return self.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")

    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))

    def __repr__(self):
        return self.name

@app.route("/")
def index():
    """数据库基本操作"""

    """添加数据"""
    # 添加一条数据
    # student1 = Student(name="xiaohong",age=16,money=100,sex=True)
    # db.session.add(student1)
    # db.session.commit()

    # 批量添加多条数据
    # data_list = [
    #     Student(name="xiaohui1号",age=16,money=1000, sex=True),
    #     Student(name="xiaohui2号",age=16,money=1000, sex=True),
    #     Student(name="xiaohui3号",age=16,money=1000, sex=True),
    #     Student(name="xiaohui4号",age=16,money=1000, sex=True),
    #     Student(name="xiaohui5号",age=16,money=1000, sex=True),
    #     Student(name="xiaohui6号",age=16,money=1000, sex=True),
    # ]
    # db.session.add_all(data_list)
    # db.session.commit()

    """查询数据"""
    # 根据主键ID查询一条数据,如果ID不存在,则返回None不会报错!
    # student = Student.query.get(100)
    # if student is None:
    #     print("当前学生不存在!")
    # else:
    #     print(student)
    #     print(student.name,student.age) # 获取属性

    # 根据查询条件获取一条数据
    # 模型.query.filter(模型.字段==条件值).first()
    # student = Student.query.filter(Student.id==1).first()
    # print(student)
    # print(student.name,student.money)

    # 根据查询条件获取多条数据
    # 模型.query.filter(模型.字段==条件值).all()
    # student_list = Student.query.filter(Student.id < 5).all()
    # print(student_list)
    # """打印效果;
    # [xiaoming, xiaohong, xiaohui1号, xiaohui2号]
    # """
    # for student in student_list:
    #     print(student.name, student.money)

    """更新数据"""
    # 先查询后修改
    # student = Student.query.filter(Student.name=="xiaoming").first()
    # student.money+=1000
    # db.session.commit()

    # 直接根据条件修改
    # Student.query.filter(Student.name=="xiaoming",Student.money==1100).update({Student.money:2000}) # 乐观锁
    # 实现类似django的F函数效果,字段值累加
    # Student.query.filter(Student.name=="xiaoming").update({Student.money:Student.money+500}) # 乐观锁
    # db.session.commit()

    """删除数据"""
    # 先查询后删除
    # student = Student.query.filter(Student.name=="xiaohui6号").first()
    # db.session.delete(student)
    # db.session.commit()

    # 直接根据条件进行删除操作
    Student.query.filter(Student.name=="xiaohui5号").delete()
    db.session.commit()

    return "Ok"

if __name__ == '__main__':
    # with app.app_context():
    #     db.create_all() # 根据模型创建所有的数据表
    #     # db.drop_all()   # 删除模型对应的所有数据表
    app.run()

 三、数据库查询

 1、关于likeand_ or_ not_order_bycountlimitpaginate分页器

from flask import Flask,jsonify,render_template
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)
db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")

    def __repr__(self):
        return self.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")

    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))

    def __repr__(self):
        return self.name

@app.route("/")
def index():
    """数据库进阶操作"""
    """filter设置判断条件
    ==  判断相等
    >=
    <
    >
    <=
    !=
    """
    # student = Student.query.filter(Student.name=="xiaohui32号").first()
    # if student is None:
    #     return jsonify({"error":"100404","errmsg":"没有该学生信息!"})
    # print(student)
    """filter设置模糊查询"""
    # like模糊条件
    # 模型.字段.like("%值%")  等价于  模型.字段.contains("值")    包含xxx
    # 模型.字段.like("值%")   等价于  模型.字段.startswith("值")  以xxx开头
    # 模型.字段.like("%值")   等价于  模型.字段.endswith("值")    以xxx结尾
    # 模型.字段.like("__")    值长度为2个字符的.几个下划线代表几个字符

    # student_list = Student.query.filter(Student.name.like("%xiaohui%")).all()
    # student_list = Student.query.filter(Student.name.startswith("xiao")).all()
    # student_list = Student.query.filter(Student.name.like("________")).all()

    """filter_by设置精确条件查找数据"""
    # filter_by 只支持一个等号作为判断条件,而且字段左边不需要声明模型类名
    # 可以用于获取一条数据,也可以获取多条数据
    # student = Student.query.filter_by(money=1000).first()
    # print(student)

    """filter多条件查询"""
    # 多条件需要基于逻辑运算来编写,当然,可以其他的声明方式
    """and_ 并且, 与"""
    # from sqlalchemy import and_
    # # 方式1:
    # student_list1 = Student.query.filter(Student.money==1000,Student.sex==True).all()
    # # 方式2:
    # student_list2 = Student.query.filter(and_(Student.money==1000,Student.sex==True)).all()
    #
    # print(student_list1, student_list2)

    """or_ 或者,或"""
    # from sqlalchemy import or_
    # student_list = Student.query.filter( or_(Student.age > 17, Student.age < 15) ).all()
    # print(student_list)

    """not_ 排除,非"""
    from sqlalchemy import not_
    # student_list = Student.query.filter(not_(Student.age > 17)).all()
    # print(student_list)

    """filter值范围查询"""
    # 查询年龄=15或者17或者19的
    # student_list = Student.query.filter(Student.age.in_([15,17,19])).all()
    # print(student_list)

    """order_by结果排序"""
    # order_by(模型.字段.desc())   db.desc(模型.字段)    倒序
    # order_by(模型.字段.asc())    db.asc(模型.字段)     升序
    # student_list = Student.query.order_by(db.desc(Student.money)).all()
    # student_list = Student.query.order_by(Student.money.desc()).all()
    # print(student_list)

    """count 统计结果数量"""
    # ret = Student.query.filter(Student.age>17).count()
    # print(ret)

    """limit 结果数量进行限制"""
    """offse                                            """
    # 对学生的钱包进行从大到小排名,第3-第5名的学生
    # student_list = Student.query.order_by(Student.money.desc()).offset(2).limit(3).all()
    # print(student_list)

    """paginate分页器"""
    # paginate(page=当前页码, per_page=每一页数据量, max_per_page=每一页最大数据量)
    # 当前页码,默认是从request.args["page"],如果当前参数没有值,则默认为1
    # 每一页数据量,默认是100条
    # 因为分页器有提供了一个  request.args.["per_page"]给客户端设置每一页数据量,所以再次限定客户端最多能设置的每一页数据量
    pagination = Student.query.filter(Student.sex==True).paginate(per_page=1)
    print(pagination)
    return render_template("list.html",pagination=pagination)
    # print( pagination.items ) # 获取当前页数据量
    # print( pagination.has_next ) # 如果还有下一页数据,则结果为True
    # print( pagination.has_prev ) # 如果有上一页数据,则结果为True
    # print( pagination.page ) # 当前页页码 request.args.get("page",1)
    # print( pagination.total ) # 本次查询结果的数据总量[被分页的数据量总数]
    # print( pagination.pages )   # 总页码
    # print( pagination.prev() ) # 上一页的分页器对象,如果没有上一页,则默认为None
    # print( pagination.next() ) # 下一页的分页器对象,如果没有下一页,则默认为None
    # if pagination.has_next:
    #     print( pagination.next().items ) # 下一页的数据列表

    return "Ok"

if __name__ == '__main__':
    # with app.app_context():
    #     db.create_all() # 根据模型创建所有的数据表
    #     # db.drop_all()   # 删除模型对应的所有数据表
    app.run()

2、分页查询及执行原生SQL语句

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)
db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")

    def __repr__(self):
        return self.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")

    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))

    def __repr__(self):
        return self.name

@app.route("/")
def index():
    from sqlalchemy import func
    """ group_by 分组查询"""
    # 查询男生和女生的最大年龄
    # ret = db.session.query(Student.sex,func.max(Student.age)).group_by(Student.sex).all()
    # print(ret)

    # 查询出男生和女生年龄大于18的人数
    # having是针对分组的结果进行过滤处理,所以having能调用的字段,必须是分组查询结果中的字段,否则报错!!
    # ret = db.session.query(Student.sex,Student.age, func.count(Student.age)).group_by(Student.sex,Student.age).having(Student.age>18).all()
    # print(ret)

    """执行原生SQL语句,返回结果不是模型对象, 是列表和元祖"""
    # 查询多条
    # ret = db.session.execute("select id,name,age,IF(sex,'男','女') from tb_student").fetchall()
    # print(ret)
    # # 查询单条
    # ret = db.session.execute("select * from tb_student where id = 3").fetchone()
    # print(ret)

    # 添加/修改/删除
    # db.session.execute("UPDATE tb_student SET money=(money + %s) WHERE age = %s" % (200, 22))
    # db.session.commit()

    # 查询出女生和男生中大于18岁的人数
    ret = db.session.execute("SELECT IF(sex,'男','女'), count(id) from (SELECT id,name,age,sex FROM `tb_student` WHERE age>18) as stu group by sex").fetchall()
    print(ret)
    return "Ok"

if __name__ == '__main__':

    app.run()

3、一对一查询

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)
db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")
    # 关联属性,是SQLAlchemy提供给开发者快速引用外键模型的一个对象属性,不存在于mySQL中!!!
    # backref 反向引用,类似django的related,通过外键模型查询主模型数据时的关联属性
    info = db.relationship("StudentInfo", backref="own", uselist=False)
    def __repr__(self):
        return self.name

class StudentInfo(db.Model):
    __tablename__ = "tb_student_info"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    sid= db.Column(db.Integer,db.ForeignKey(Student.id), comment="学生")
    address = db.Column(db.String(255), nullable=True, comment="家庭住址")
    mobile = db.Column(db.String(15), unique=True, comment="紧急联系电话")

    def __repr__(self):
        return self.own.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")

    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))

    def __repr__(self):
        return self.name

@app.route("/")
def index():
    """添加数据"""
    # student = Student(
    #     name= "xiaohuang",
    #     age = 13,
    #     sex = True,
    #     money = 1000,
    #     info = StudentInfo(
    #         mobile="13312345678",
    #         address="北京市昌平区白沙路103号"
    #     )
    # )
    # db.session.add(student)
    # db.session.commit()

    """查询/读取"""
    
    # student = Student.query.first()
    # print(student.age)
    # print(student.info.mobile)
    #
    # student = StudentInfo.query.filter(StudentInfo.mobile=="13312345678").first()
    # print(student.own.name)

    """修改更新"""
    info = db.relationship("StudentInfo", backref="own", uselist=False)
    # student = Student.query.get(1)
    # student.age = 18
    # student.info.address = "北京市昌平区沙河镇白沙路103号"
    # db.session.commit()

    """删除"""
    # student = Student.query.get(2)
    # db.session.delete(student.info)  # 先删除外键模型,再删主模型
    # db.session.delete(student)
    # db.session.commit()
    return "Ok"

if __name__ == '__main__':
    # with app.app_context():
    #     db.drop_all()
    #     db.create_all()
    app.run()

4、一对多查询

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)
db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")
    # 关联属性,是SQLAlchemy提供给开发者快速引用外键模型的一个对象属性,不存在于mySQL中!!!
    # backref 反向引用,类似django的related,通过外键模型查询主模型数据时的关联属性
    info = db.relationship("StudentInfo", backref="own", uselist=False)
    def __repr__(self):
        return self.name

class StudentInfo(db.Model):
    __tablename__ = "tb_student_info"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    sid= db.Column(db.Integer,db.ForeignKey(Student.id), comment="学生")
    address = db.Column(db.String(255), nullable=True, comment="家庭住址")
    mobile = db.Column(db.String(15), unique=True, comment="紧急联系电话")

    def __repr__(self):
        return self.own.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")
    course_list = db.relationship("Course",uselist=True, backref="teacher",lazy="subquery")
    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))
    teacher_id = db.Column(db.Integer, db.ForeignKey(Teacher.id),comment="老师")
    def __repr__(self):
        return self.name

@app.route("/")
def index():
    """1对多,多对1"""
    """添加数据"""
    
    # 添加主模型数据,同时也添加外键模型
    # teacher = Teacher(
    #     name="灰太狼",
    #     option="班主任",
    #     course_list=[
    #         Course(name="抓羊",price="9.90"),
    #         Course(name="挨打",price="19.90"),
    #         Course(name="炸房子",price="29.90"),
    #     ]
    # )
    # db.session.add(teacher)
    # db.session.commit()

    # course = Course(
    #     name="平底锅108种用法",
    #     price="99.99",
    #     teacher=Teacher(name="红太狼",option="班主任")
    # )
    # db.session.add(course)
    # db.session.commit()

    Teacher
    course_list = db.relationship("Course",uselist=True, backref="teacher",lazy="subquery")
    Course
    teacher_id = db.Column(db.Integer, db.ForeignKey(Teacher.id),comment="老师")
    """查询数据"""
    # teacher = Teacher.query.filter(Teacher.name=="灰太狼").first()
    # print(teacher.name, teacher.option)
    # print("---------------------------------------------------")
    # print(teacher.course_list)
    # for course in teacher.course_list:
    #     print(course.name)

    # course = Course.query.filter(Course.name=="炸房子").first()
    # print(course)
    # print("%s在教%s" % (course.teacher.name,course.name))

    """更新数据"""
    # teacher = Teacher.query.filter(Teacher.name == "灰太狼").first()
    # teacher.course_list[0].name="抓懒洋洋"
    # db.session.commit()

    """删除数据"""
    # teacher = Teacher.query.filter(Teacher.name=="灰太狼").first()
    # for course in teacher.course_list:
    #     db.session.delete(course)
    # db.session.delete(teacher)
    # db.session.commit()

    

    return "Ok"

if __name__ == '__main__':
    # with app.app_context():
    #     db.drop_all()
    #     db.create_all()
    app.run()

5、多对多

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)

class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)
db = SQLAlchemy()
db.init_app(app)

"""创建模型类"""
# db.Table(
# 表名,
# db.Column("字段名",字段类型,外键声明),
# db.Column("字段名",字段类型,外键声明),
# )
"""以db.Table关系表来确定模型之间的多对多关联"""
achievement = db.Table(
    "tb_achievement",
    db.Column("student_id",db.Integer,db.ForeignKey('tb_student.id')),
    db.Column("course_id",db.Integer,db.ForeignKey('tb_course.id')),

    # 这里的表信息,在主键模型中,仅仅表达的是关联关系,所以中间表的字段,无法通过主模型来获取
    db.Column("created_time",db.DateTime,comment="考试时间"),
    db.Column("score",db.DECIMAL(5,2),comment="成绩")
)

class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")
    # 关联属性,是SQLAlchemy提供给开发者快速引用外键模型的一个对象属性,不存在于mySQL中!!!
    # backref 反向引用,类似django的related,通过外键模型查询主模型数据时的关联属性
    info = db.relationship("StudentInfo", backref="own", uselist=False)
    # course_list = db.relationship("Course", secondary=achievement,backref="student_list",lazy="dynamic")
    def __repr__(self):
        return self.name

class StudentInfo(db.Model):
    __tablename__ = "tb_student_info"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    sid= db.Column(db.Integer,db.ForeignKey(Student.id), comment="学生")
    address = db.Column(db.String(255), nullable=True, comment="家庭住址")
    mobile = db.Column(db.String(15), unique=True, comment="紧急联系电话")

    def __repr__(self):
        return self.own.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")
    course_list = db.relationship("Course",uselist=True, backref="teacher",lazy="subquery")
    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))
    teacher_id = db.Column(db.Integer, db.ForeignKey(Teacher.id),comment="老师")
    student_list = db.relationship("Student",secondary=achievement,backref='course_list',lazy="dynamic")

    def __repr__(self):
        return self.name

@app.route("/")
def index():
    """多对多"""
    # course1 = Course(name="坑爹", price="9.99", teacher=Teacher(name="灰太狼", option="讲师"))
    # course2 = Course(name="坑娘", price="9.99", teacher=Teacher(name="灰太狼", option="讲师"))
    # course3 = Course(name="和羊做朋友,一起坑爹", price="99.99", teacher=Teacher(name="喜洋洋", option="讲师"))
    # student = Student(
    #     name="xiaohuihui",
    #     age=5,
    #     sex=False,
    #     money=1000,
    #     info=StudentInfo(
    #         mobile="13066666666",
    #         address="狼村1号别墅",
    #     ),
    #     course_list = [
    #         course1,
    #         course2,
    #         course3,
    #     ]
    # )
    # db.session.add(student)
    # db.session.commit()

    """查询"""
    # student = Student.query.filter(Student.name=="xiaohuihui").first()
    # print(student)
    # print(student.course_list) # [坑爹, 坑娘, 和羊做朋友,一起坑爹]

    # course = Course.query.filter(Course.name=="和羊做朋友,一起坑爹").first()
    # print("-----------------------------------------------")
    # print(course.student_list.all()) # 获取所有学生信息

    """更新"""
    # course = Course.query.filter(Course.name == "和羊做朋友,一起坑爹").first()
    # course.student_list[0].name="小灰灰"
    # db.session.commit()

    return "Ok"

if __name__ == '__main__':
    # with app.app_context():
    #     db.drop_all()
    #     db.create_all()
    app.run()

 四、数据迁移

所谓数据迁移就是数据库记录版本修改历史的一种技术,可以实现通过代码结构的方式帮我们把关于数据库的表结构,字段结构修改的过程进行一一记录,我们可以通过数据迁移提供的命令控制数据库修改版本的更新和迭代

每次修改数据库模型,makemgirations实际上就是记录了一个代码版本,把修改的状态记录下来,migrate只不过是把修改同步到数据库而已,

  • 在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。

  • 更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中。

  • 在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。

  • 为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。

首先要在虚拟环境中安装Flask-Migrate。

pip install flask-migrate

依赖Flask-Script这个包

安装完后提供了连个包Migrate,MigrateCommand

Migrate就是核心的功能类

操作数据迁移要有一个命令,命令是基于flask_script编写出来的

注册

# 数据迁移
# 第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例
migrate = Migrate(app,db)
#manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令
manager.add_command('db', MigrateCommand)
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
app = Flask(__name__)


class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)

manager = Manager()
manager.app = app

db = SQLAlchemy()
db.init_app(app)

# 数据迁移
#第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例
migrate = Migrate(app,db)

#manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令
manager.add_command('db', MigrateCommand)

"""创建模型类"""
from datetime import datetime
class Achievement(db.Model):
    __tablename__ = "tb_achievement"
    id = db.Column(db.Integer, primary_key=True,comment="主键")
    student_id = db.Column(db.Integer, db.ForeignKey("tb_student.id"), comment="学生")
    course_id = db.Column(db.Integer, db.ForeignKey("tb_course.id"), comment="课程")
    score = db.Column(db.DECIMAL(5,2), nullable=True, comment="成绩分数")
    created_time = db.Column(db.DateTime, default=datetime.now(), comment="考试时间")

    def __repr__(self):
        return "[%s],%s进行了一次%s科目考试,成绩:%s" % (self.created_time,self.student.name,self.course.name,self.score)

class Student(db.Model):
    __tablename__ = "tb_student"
    id = db.Column(db.Integer, primary_key=True,comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    avatar = db.Column(db.String(250), comment="头像")
    age = db.Column(db.Integer, comment="年龄")
    sex = db.Column(db.Boolean, default=False, comment="性别")
    money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包")
    achievement_list = db.relationship("Achievement",uselist=True, backref="student", lazy="select")
    # backref 反向引用,类似django的related,通过外键模型查询主模型数据时的关联属性
    info = db.relationship("StudentInfo", backref="own", uselist=False)
    def __repr__(self):
        return self.name

class StudentInfo(db.Model):
    __tablename__ = "tb_student_info"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    sid= db.Column(db.Integer,db.ForeignKey(Student.id), comment="学生")
    address = db.Column(db.String(255), nullable=True, comment="家庭住址")
    mobile = db.Column(db.String(15), unique=True, comment="紧急联系电话")

    def __repr__(self):
        return self.own.name

class Teacher(db.Model):
    __tablename__ = "tb_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), comment="姓名")
    option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职")
    course_list = db.relationship("Course",uselist=True, backref="teacher",lazy="subquery")
    def __repr__(self):
        return self.name

class Course(db.Model):
    __tablename__ = "tb_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键ID")
    name = db.Column(db.String(250), unique=True, comment="课程名称")
    price = db.Column(db.Numeric(6, 2))
    teacher_id = db.Column(db.Integer, db.ForeignKey(Teacher.id),comment="老师")
    achievement_list = db.relationship("Achievement", backref="course", uselist=True, lazy="select")
    def __repr__(self):
        return self.name

@app.route("/")
def index():
    return "Ok"

if __name__ == '__main__':
    manager.run()

"""
init       初始化,  数据迁移必须要有一个数据迁移的目录,第一步就要执行init
revision 重置版本 并不是回滚 例如有 1.0 2.0 3.0 在3.0 输入revision直接到1.0版本 migrate, 生成版本库 edit 修改当前版本的相关信息
merge 合并两个分支 upgrade, 数据库同步 前进同步 创建修改 downgrade 数据库同步 后退同步 删除 回滚
history 历史版本记录
heads 本次的修改过程
"""

执行init

python main.py db init

执行完就会在项目根目录下创建一个migrations目录

执行migrate生成版本

python main.py db migrate -m "four table"

这个文件帮我们记录版本状态

执行upgrade把表写入数据库

python main.py db upgrade

运行upgrade把版本推进到22ca,此时数据库就有表了

增加一个字段在Student类中

count = db.Column(db.Integer)

执行

python main.py db migrate -m "add column count"
python main.py db upgrade

版本库的版本就变为最新的了

history:可以查看版本迁移的历史过程

开始没有版本,生成第一个版本22c, 22c新增了一个字段生成ccd版本

 

文档: https://faker.readthedocs.io/en/master/locales/zh_CN.html

批量生成测试数据: https://github.com/joke2k/faker

五、flask-session

允许设置session到指定存储的空间中, 文档:

安装命令: https://pythonhosted.org/Flask-Session/

pip install flask-Session

使用session之前,必须配置一下配置项:

SECRET_KEY = "*(%#4sxcz(^(#$#8423" # session秘钥

1、redis保存session

from flask import Flask,session
from flask_redis import FlaskRedis
from flask_session import Session
app = Flask(__name__)
redis = FlaskRedis()
session_store = Session()
class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    SECRET_KEY = "*(%#4sxcz(^(#$#8423"
    # session存储方式为redis
    SESSION_TYPE = "redis"
    # session保存数据到redis时启用的链接对象
    SESSION_REDIS = redis
    # 如果设置session的生命周期是否是会话期, 为True,则关闭浏览器session就失效
    SESSION_PERMANENT = True
    # 是否对发送到浏览器上session的cookie值进行加密
    SESSION_USE_SIGNER = True
    # 保存到redis的session数的名称前缀
    SESSION_KEY_PREFIX = "session:"
    # redis的链接配置
    REDIS_URL = "redis://localhost:6379/1"

app.config.from_object(Config)
redis.init_app(app)
session_store.init_app(app)

@app.route("/")
def index():
    session["username"] = "xiaoming"
    return "Ok"

@app.route("/get_session")
def get_session():
    print( session["username"] )
    return "ok"

@app.route("/redis1")
def set_redis():
    redis.set("username","xiaohuihui")
    redis.hset("brother","zhangfei","17")
    return "ok"

@app.route("/redis2")
def get_redis():
    user = redis.get("username").decode()
    print(user)

    brother = redis.hgetall("brother")
    print(brother)
    # 先编码从redis中取到数据,再解码
    print(brother["zhangfei".encode()].decode())

    return "ok"

if __name__ == '__main__':
    app.run()

访问 根

进入redis

说明我们存到redis中了,存储到了redis的1号库

点击redis = FlaskRedis(),

    def init_app(self, app, **kwargs):
        redis_url = app.config.get(
            "{0}_URL".format(self.config_prefix), "redis://localhost:6379/0"

默认0号库

配置就可以

REDIS_URL = "redis://localhost:6379/1"

2、SQLAlchemy存储session

from flask import Flask,session
from flask_redis import FlaskRedis
from flask_session import Session
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 创建对象
db = SQLAlchemy()
redis = FlaskRedis()
session_store = Session()
class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    SECRET_KEY = "*(%#4sxcz(^(#$#8423"
    # 数据库链接配置
    # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

    # redis的链接配置
    REDIS_URL = "redis://localhost:6379/1"

    # 数据库保存session
    SESSION_TYPE = 'sqlalchemy'  # session类型为sqlalchemy
    SESSION_SQLALCHEMY = db  # SQLAlchemy对象
    SESSION_SQLALCHEMY_TABLE = 'tb_session'  # session要保存的表名称
    SESSION_PERMANENT = True  # 如果设置为True,则关闭浏览器session就失效。
    SESSION_USE_SIGNER = False  # 是否对发送到浏览器上session的cookie值进行加密
    SESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀
# 数据库初始化
db.init_app(app)
app.config.from_object(Config)
redis.init_app(app)
session_store.init_app(app)

@app.route("/")
def index():
    session["username"] = "xiaohui"
    return "Ok"

@app.route("/get_session")
def get_session():
    return session["username"]

if __name__ == '__main__':
    # with app.app_context():
    #     db.create_all()
    app.run()

先创建表

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run()

会生成一张session表

# session_id     数据        有效期

访问根路径

"""
mysql> show create table students.tb_sessionG;
*************************** 1. row ***************************
       Table: tb_session
Create Table: CREATE TABLE `tb_session` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `session_id` varchar(255) DEFAULT NULL,
  `data` blob,
  `expiry` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `session_id` (`session_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4
1 row in set (0.01 sec)

ERROR: 
No query specified

mysql> 
"""

InnoDB: 5.5之后的默认存储引擎支持事务备份恢复行级锁

MyISAM: 表级锁,不支持事务,不支持备份恢复,查询效率比InnoDB高,适合做表优化

memory:数据保存在内存,内存读写非常快,受限mysql自身瓶颈,热点数据现在会放在redis

原文地址:https://www.cnblogs.com/kongxiangqun/p/14015469.html