Flask插件

Flask上下文管理

铺垫

# by gaoxin
from functools import partial


def add(x, y, z):
    print(x + y + z)

# 这是最简单的一个函数
# 如果我要实现一个功能,三个数相加,其中一个数必须是6
# 我们就可以使用偏函数来帮着我们传参

new_func = partial(add, 6)

new_func(1,1)
new_func(1,2)
new_func(1,3)
偏函数
# by gaoxin


class A(object):
    def __init__(self):
        # self.x1 = {}
        object.__setattr__(self, "x1", {})

    def __setattr__(self, key, value):
        print(key, value, self.x1)
        if key in self.x1:
            print(key, value)


a = A()
a.xx = 123
__setattr__

Flask上下文管理一

Flask的上下文管理我们可以简单的理解为一个生命周期~

也就是请求进来到请求走一共做了哪些事情~~我们从源码开始走~~

首先我们知道项目启动执行了app.run()方法~~调用了werkzeug的run_simple()方法~

run_simple(host, port, self, **options) 这时候的self就是我们的app~

run_simple会执行self(),也就是app(), 那么app = Flask()  所以会走Flask的__call__方法~

那么__call__做了什么呢~~

environ是我们请求来的原始数据~当成参数传递给了request_context方法~~

进入这个RequestContext对象~

在这里重新封装了request, 以及给session 赋值了 None~~

这是初始化这个类做的一些事情~~也就说~

ctx = RequestContext(app, environ)  

ctx.request 是重新封装的request

ctx.session = None

然后我们继续往下走~~~

ctx.push() 那么这个RequestContext这个类里就一定有push方法~

我们是执行ctx.push()~~所以self应该就是我们这个ctx对象~~

执行了_request_ctx_stack.push(ctx)~也就是说~~_request_ctx_stack它把我们的ctx对象push到了一个地方~~

我们的ctx这个对象~里面有request以及session~~

self._local就是Local()对象~~

我们看这个初始化方法~跟我们上面回忆的知识点是不是很像~~

就是给我们这个Local类初始化了两个属性~~__storage__ = {}  __ident_func__ = get_ident

那我们继续看LocalStark中push方法做了什么~~

回头看我们的wsgi_app, 里的ctx.push()就走完了~接下来就走我们的视图了~~~

那到这里~我们可以通过什么样的方法在我们视图中拿到这个request对象呢~~

request在ctx对象里~能通过ctx.request得到,那我们怎么得到ctx呢~

ctx被LocalStack对象放入到Local中了~~

# by gaoxin
from flask import Flask
from flask import globals

app = Flask(__name__)


@app.route("/")
def index():
    ctx = globals._request_ctx_stack.top
    print(ctx.request.method)
    return "index"



if __name__ == '__main__':
    app.run()
视图中获得request

在这里我们的上下文管理的第一部分就走完了~~

Flask上下文管理 二

我们之前走到ctx.push(), 接下来那个方法就是去走我们的视图函数了~~

在我们的视图函数中上面那种拿request的方法太费劲了~我们需要简单一点的拿到request~~

那么我们导入的request跟我们上面拿到的request一定是一样的~~那导入的这个request做了什么呢~~

reqeust是LocalProxy这个类的实例化对象~参数是一个偏函数~~

那当我们调用request.method 等方法的时候走的是LocalProxy这个类的__getattr__方法~~

这里的_get_current_object()相当于我们偏函数的执行~~

到这~就跟我们上面的取值方式一样了~~也就是说通过LocalStack方法去Local中去ctx对象~~

然后通过getattr 找到ctx.request~~~

也就是说这个LocalProxy就是一个帮助我们取值的代理~让我们的取值变的更加简单~~~

这个代理通过偏函数来绑定参数~~

ctx中封装了request,以及session~只不过到这里我们的session依然是空的~~

session的实现原理

首先请求进来的时候在Local中存放了ctx对象~这个对象里的session是None~~

当我们走完这个_reqeust_cts_stack.push(ctx)后,我们看它走了什么~~

也就是说~请求进来把ctx放入Local中后~~从前端解密了cookie~然后把解密数据好的数据给了self.session~~

然后继续走~

我们可以看到~session的原理是~~

从cookie中获取数据~解密存入session~~请求走的时候~~ 把session中数据取出来,加密, 给响应设置cookie~~

那么我们平时在视图中设置删除session的值~原来跟request是一样的~通过代理去修改Local中的数据~~~

应用上下文管理

应用上下文和请求上下文的原理和源码是一样的~~~

也就是说~我们请求上下文和应用上下文~分别建立了两个Local对象~~

两个Local对象数据结构都是一样的~~那么请求上下文和应用上下文为什么要分开存放呢~~~

以后我们写离线脚本的时候大家就知道了~~~总之~~有用!!!!!

全局对象g

我们说应用上下文里封装了g对象~~那么这个g对象是什么呢~~~

Flask中g的生命周期?  

g和session有什么区别?

g和全局对象有什么区别~~

1--我们讲这么多上下文管理~~我们知道请求进来会为每个请求在Local中建立一个独立空间~~

    也就是在应用上下文的Local对象中建立了一个g对象~~当请求走的时候~~~就会删除~~

   所以g的生命周期是请求进来到走~~~

2--session不一样的是保存在cookie中~所以下次请求来的时候cookie带来了~~~

3--全局变量~~是在项目启动创建的~~

  无论多少请求进来都可以访问全局变量~~

我们的g对象一般情况用于before_request中设置值~只为这一次请求建立全局变量~~~

!!注意在我们重定向的时候还可以取g的值么~~~

SQLAlchemy介绍

SQLAlchemy是一个基于Python的ORM框架。该框架是建立在DB-API之上,使用关系对象映射进行数据库操作。

简而言之就是,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

补充:什么是DB-API ? 是Python的数据库接口规范。

在没有DB-API之前,各数据库之间的应用接口非常混乱,实现各不相同,

项目需要更换数据库的时候,需要做大量的修改,非常不方便,DB-API就是为了解决这样的问题。

1
pip install sqlalchemy

组成部分:

  -- engine,框架的引擎

  -- connection pooling  数据库连接池

  -- Dialect  选择链接数据库的DB-API种类(实际选择哪个模块链接数据库)

  -- Schema/Types  架构和类型

  -- SQL Expression Language   SQL表达式语言

连接数据库

SQLAlchemy 本身无法操作数据库,其必须依赖遵循DB-API规范的三方模块,

Dialect 用于和数据API进行交互,根据配置的不同调用不同数据库API,从而实现数据库的操作。

# MySQL-PYthon
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

#pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

# cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

# 更多
# http://docs.sqlalchemy.org/en/latest/dialects/index.html
不同的数据库API
from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接数
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 连接池中没有线程最多等待时间,否则报错
    pool_recycle=-1,  # 多久之后对连接池中的连接进行回收(重置)-1不回收
)
连接数据库

执行原生SQL

# by gaoxin

from sqlalchemy import create_engine
engine = create_engine(
    "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",
    max_overflow=0,
    pool_size=5,
)

def test():
    cur = engine.execute("select * from Course")
    result = cur.fetchall()
    print(result)
    cur.close()

if __name__ == '__main__':
    test()
# [(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)]
engine.execute

ORM

一、创建表

# by gaoxin

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint
import datetime

ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)

Base = declarative_base()


class UserInfo(Base):
    __tablename__ = "user_info"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)

    __table_args__ = (
        UniqueConstraint("id", "name", name="uni_id_name"),
        Index("name", "email")
    )


def create_db():
    Base.metadata.create_all(ENGINE)


def drop_db():
    Base.metadata.drop_all(ENGINE)



if __name__ == '__main__':
    create_db()
单表的创建
# by gaoxin

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
import datetime


ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)

Base = declarative_base()


# ======一对多示例=======
class UserInfo(Base):
    __tablename__ = "user_info"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    # FK字段的建立
    hobby_id = Column(Integer, ForeignKey("hobby.id"))
    # 不生成表结构 方便查询使用
    hobby = relationship("Hobby", backref="user")

    __table_args__ = (
        UniqueConstraint("id", "name", name="uni_id_name"),
        Index("name", "email")
    )


class Hobby(Base):
    __tablename__ = "hobby"

    id = Column(Integer, primary_key=True)
    title = Column(String(32), default="码代码")




def create_db():
    Base.metadata.create_all(ENGINE)


def drop_db():
    Base.metadata.drop_all(ENGINE)



if __name__ == '__main__':
    create_db()
    # drop_db()
一对多的创建
# by gaoxin

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
import datetime


ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)

Base = declarative_base()


# ======多对多示例=======
class Book(Base):
    __tablename__ = "book"

    id = Column(Integer, primary_key=True)
    title = Column(String(32))
    # 不生成表字段 仅用于查询方便
    tags = relationship("Tag", secondary="book2tag", backref="books")


class Tag(Base):
    __tablename__ = "tag"

    id = Column(Integer, primary_key=True)
    title = Column(String(32))


class Book2Tag(Base):
    __tablename__ = "book2tag"

    id = Column(Integer, primary_key=True)
    book_id = Column(Integer, ForeignKey("book.id"))
    tag_id = Column(Integer, ForeignKey("tag.id"))


def create_db():
    Base.metadata.create_all(ENGINE)

def drop_db():
    Base.metadata.drop_all(ENGINE)

if __name__ == '__main__':
    create_db()
    # drop_db()
多对多的创建

二、对数据库表的操作(增删改查)

# by gaoxin

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models_demo import Tag


ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)

Session = sessionmaker(bind=ENGINE)

# 每次执行数据库操作的时候,都需要创建一个session

# 线程安全,基于本地线程实现每个线程用同一个session


session = scoped_session(Session)

# =======执行ORM操作==========
tag_obj = Tag(title="SQLAlchemy")
# 添加
session.add(tag_obj)
# 提交
session.commit()
# 关闭session
session.close()
scoped_session
# by gaoxin

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models_demo import Tag, UserInfo
import threading


ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)

Session = sessionmaker(bind=ENGINE)

# 每次执行数据库操作的时候,都需要创建一个session
session = Session()
session = scoped_session(Session)

# ============添加============
# tag_obj = Tag(title="SQLAlchemy")
# # 添加
# session.add(tag_obj)
# session.add_all([
#     Tag(title="Python"),
#     Tag(title="Django"),
# ])
# # 提交
# session.commit()
# # 关闭session
# session.close()

# ============基础查询============
# ret1 = session.query(Tag).all()
# ret2 = session.query(Tag).filter(Tag.title == "Python").all()
# ret3 = session.query(Tag).filter_by(title="Python").all()
# ret4 = session.query(Tag).filter_by(title="Python").first()
# print(ret1, ret2, ret3, ret4)

# ============删除===========
# session.query(Tag).filter_by(id=1).delete()
# session.commit()

# ===========修改===========
session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"})
session.query(Tag).filter_by(id=23).update({"title": "王者毒药"})
session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False)
# synchronize_session="evaluate" 默认值进行数字加减
session.commit()
基本的增删改查
# 条件查询
ret1 = session.query(Tag).filter_by(id=22).first()
ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all()
ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all()
ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first()
from sqlalchemy import and_, or_
ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first()
ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first()
ret7 = session.query(Tag).filter(or_(
    Tag.id>1,
    and_(Tag.id>3, Tag.title=="LOL")
)).all()
# 通配符
ret8 = session.query(Tag).filter(Tag.title.like("L%")).all()
ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all()
# 限制
ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2]
# 排序
ret11 = session.query(Tag).order_by(Tag.id.desc()).all()  # 倒序
ret12 = session.query(Tag).order_by(Tag.id.asc()).all()  # 正序
# 分组
ret13 = session.query(Tag.test).group_by(Tag.test).all()
# 聚合函数
from sqlalchemy.sql import func
ret14 = session.query(
    func.max(Tag.id),
    func.sum(Tag.test),
    func.min(Tag.id)
).group_by(Tag.title).having(func.max(Tag.id > 22)).all()
# 连表
ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all()
# print(ret15) 得到一个列表套元组 元组里是两个对象
ret16 = session.query(UserInfo).join(Hobby).all()
# print(ret16) 得到列表里面是前一个对象
# 相当于inner join
# for i in ret16:
#     # print(i[0].name, i[1].title)
#     print(i.hobby.title)
ret17 = session.query(Hobby).join(UserInfo, isouter=True).all()
ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()
ret18 = session.query(Hobby).outerjoin(UserInfo).all()
ret18_1 = session.query(UserInfo).outerjoin(Hobby).all()
# 相当于left join
print(ret17)
print(ret17_1)
print(ret18)
print(ret18_1)
常用操作
# 基于relationship的FK
# 添加
user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇"))
session.add(user_obj)

hobby = Hobby(title="弹奏一曲")
hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")]
session.add(hobby)
session.commit()

# 基于relationship的正向查询
user_obj_1 = session.query(UserInfo).first()
print(user_obj_1.name)
print(user_obj_1.hobby.title)

# 基于relationship的反向查询
hb = session.query(Hobby).first()
print(hb.title)
for i in hb.user:
    print(i.name)

session.close()
基于relationship的FK
# 添加
book_obj = Book(title="Python源码剖析")
tag_obj = Tag(title="Python")
b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id)
session.add_all([
    book_obj,
    tag_obj,
    b2t,
])
session.commit()

#  上面有坑哦~~~~
book = Book(title="测试")
book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")]
session.add(book)
session.commit()

tag = Tag(title="LOL")
tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")]
session.add(tag)
session.commit()

# 基于relationship的正向查询
book_obj = session.query(Book).filter_by(id=4).first()
print(book_obj.title)
print(book_obj.tags)
# 基于relationship的反向查询
tag_obj = session.query(Tag).first()
print(tag_obj.title)
print(tag_obj.books)
基于relationship的M2M

Flask-session

Flask-session跟框架自带的session有什么区别呢~

框架自带的session是通过请求上下文~放入到Local中的~那如果我们想把session放入别的地方怎么办呢~~

比如redis~或者数据库~等等~~Flask-session就提供了这些功能~~我们看下Flask-session怎么用~~

一、下载安装

pip install flask-session

二、导入并实例化

def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")

    app.register_blueprint(us)
    # Flask-Session 第二步实例化session
    Session(app)
   
    return app

三、配置文件

class BaseConfig(object):
    # Flask-Session  第三步
    # SESSION_TYPE = 'redis'
    # SESSION_REDIS = Redis(host='192.168.0.94', port='6379')

实现原理

回顾一下session的实现原理~请求进来先把request以及session封装到RequestContext对象中~~

然后调用push方法通过LocalStark放入到Local中~这时候放入到Local中的session还是空的~

然后调用了session_interface中的open_session 以及save_session方法~~

那我们再看下~~Flask-session都做了什么~~

修改了app.session_interface这个类~所以在我们调用open_session以及save_session的时候~调用的是我们配置的类里的方法~

从而实现了session存储地方的不同~

Flask SQLAlchemy

学习Flask-SQLAlchemy之前~大家要先学习一下SQLAlchemy~一个Python的ORM框架~~

点我学习SQLALchemy

接下来是Flask-SQLAlchemy的应用~~

一、下载安装

pip3 install flask-sqlalchemy

二、导入并实例化SQLAlchemy

# 在跟项目同名的文件夹下的 init.py中
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

from .views.user import us

# !!! 注意事项
#   必须在导入蓝图之前

三、初始化

def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")

    app.register_blueprint(us)
    # Flask-Session 第二步实例化session
    Session(app)
    # 初始化db
    db.init_app(app)
    return app

四、在配置文件写入配置信息

class BaseConfig(object):
    # Flask-Session  第三步
    # SESSION_TYPE = 'redis'
    # SESSION_REDIS = Redis(host='192.168.0.94', port='6379')

    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8"
    SQLALCHEMY_POOL_SIZE = 10
    SQLALCHEMY_MAX_OVERFLOW = 5

    # SQLALCHEMY_TRACK_MODIFICATIONS = False
    pass

五、创建model

# by gaoxin
from sqlalchemy import Column, Integer, String
from flask_demo import db


class Users(db.Model):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)

六、生成表(需要使用app上下文)

# by gaoxin
from flask_demo import db, create_app
from flask_demo.models import *
# 一定要导入models 否则找不到表创建不出来 app = create_app() app_ctx = app.app_context() with app_ctx: db.create_all() # db.drop_all()

七、基于ORM对数据库操作

# by gaoxin
from flask import Blueprint
from flask_demo import db
from flask_demo.models import Users

us = Blueprint("us", __name__)


@us.route("/index")
def index():
    # db.session.add(Users(name="gaoxin"))
    # db.session.commit()
    # db.session.remove()
    ret = db.session.query(Users).all()
    print(ret)
    db.session.remove()
    return "Index"

Flask-Script

一、下载安装

pip3 install flask-script

二、增加的功能 runserver

# by gaoxin

from flask_demo import create_app
from flask_script import Manager

app = create_app()
manager = Manager(app)

if __name__ == '__main__':
    # app.run()
    manager.run()
# 启动命令变成
# python3 manager.py runserver -h 127.0.0.1 -p 8000
# Running on http://127.0.0.1:8000/ (Press CTRL+C to quit)

三、自定义命令

# by gaoxin

from flask_demo import create_app
from flask_script import Manager

app = create_app()
manager = Manager(app)


# 位置传参
@manager.command
def custom(arg):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)
    
    
# 关键字传参
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令
    执行: python manage.py  cmd -n gaoxin -u http://www.oldboyedu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)


if __name__ == '__main__':
    # app.run()
    manager.run()

Flask-migrate

一、下载安装

pip3 install flask-migrate

二、增加的命令  

  !!!! 依赖flask-script  !!!!

# by gaoxin

from flask_demo import create_app, db
from flask_demo.models import *
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

app = create_app()
manager = Manager(app)
Migrate(app, db)

"""
# 数据库迁移命名
# 依赖 flask-script
python manage.py db init # 初始化
python manage.py db migrate # makemigrations
python manage.py db upgrade # migrate
"""
manager.add_command("db", MigrateCommand)

if __name__ == '__main__':
    # app.run()
    manager.run()

wtforms

类比我们django的Form组件~

Form组件的主要应用是~帮助我们自动生成HTML,以及做表单数据的验证~~

用法跟Form组件大同小异~~

一、下载安装

pip3 install wtforms

二、自动生成HTML

第一步 生成一个Form类

第二步 实例化这个Form类,把这个实例化对象当成参数传递给前端

# 视图页面
from wtforms import Form, widgets, validators
from wtforms.fields import simple
class MyForm(Form):
    name = simple.StringField(
        label="用户名",
        render_kw={"placeholder": "请输入用户名"},
        widget=widgets.TextArea(),
        default="gaoxin"
    )
    pwd = simple.PasswordField()


@ac.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "GET":
        form = MyForm(data={"name": "gao"})
        return render_template("login.html", form=form)
<!--html页面-->
<form action="" novalidate>
    用户名: {{form.name}}
    密码: {{form.pwd}}
    <button type="submit">提交</button>

</form>
<!--循环出来的页面-->
<form action="">
    {% for field in form %}
        {{field.label}}: {{field}}

    {% endfor %}
    <button type="submit">提交</button>

</form>

三、验证

第一步  在Form类中增加验证信息

第二步 在视图中做数据的校验 并且页面展示错误信息

# 视图页面
class MyForm(Form):
    name = simple.StringField(
        label="用户名",
        render_kw={"placeholder": "请输入用户名"},
        # widget=widgets.TextArea(),
        validators=[
            validators.DataRequired(message="用户名不能为空"),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        # default="gaoxin"
    )
    pwd = simple.PasswordField(
        label="密码",
        widget=widgets.PasswordInput(),
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
        ]
    )


@ac.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "GET":
        form = MyForm(data={"name": "gao"})
        return render_template("login.html", form=form)
    form = MyForm(formdata=request.form)
    if form.validate():
        print(form.data)
    else:
        return render_template("login.html", form=form)
    return "lakdsjlga"
<!--循环出来的页面-->
<form action="" method="post" novalidate>
    {% for field in form %}
        {{field.label}}: {{field}} {{field.errors[0]}}

    {% endfor %}
    <button type="submit">提交</button>

</form>

四、拓展字段

以用户注册为例,输入用户名,密码,重复密码,性别和爱好。

class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        # render_kw={'class': 'form-control'},
        default='gaoxin'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        # render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, ''),
            (2, ''),
        ),
        coerce=int
    )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        # 从数据库获取数据 做到实时更新
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))


@ac.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 1})
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            return "注册成功"
        else:
            print(form.errors)
            return render_template('register.html', form=form)
视图
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for item in form %}
    <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>
register.html

!!! 注意选项字段需要去数据库取数据 还有就是从数据库取数据的实时更新

Flask-Session

from flask import Flask,views,request,session
from flask_session import Session
from redis import Redis

app=Flask(__name__)
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host="127.0.0.1",port=6379)

Session(app)

@app.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "POST":
        session["user"] = "123"
        return "post"
    return "get"
if __name__ == '__main__':
    app.run(debug=True)
View Code

WTForms

from flask import Flask, request, render_template

from wtforms import Form, validators, widgets
from wtforms.fields import simple, core

app = Flask(__name__)


class RegForm(Form):
    username = simple.StringField(
        label="用户名",
        validators=[
            validators.DataRequired(message="用户名不能为空"),
        ],
        render_kw={"class": "my_class"}
    )

    pwd = simple.PasswordField(
        label="密码",
        validators=[
            validators.DataRequired(message="密码不能为空"),
            validators.length(min=6, max=16, message="长度必须大于等于6,小于等于16")
        ],
        render_kw={"class": "form-control"},
        widget = widgets.PasswordInput(),

    )

    pwd_confim = simple.PasswordField(
        label="确认密码",
        validators=[
            validators.EqualTo("pwd", message="两次密码不一致"),
            validators.length(min=6, max=16, message="长度必须大于等于6,小于等于16")
        ],
    )

    gender = core.SelectField(
        label="性别",
        choices=(
            (1, ""),
            (2, "")
        ),
        default=1,
        coerce=int  # 限制是int类型的
    )

    city = core.RadioField(
        label="城市",
        choices=(
            ("1", "北京"),
            ("2", "上海")
        ),
        default=2,
        coerce=str
    )

    email = simple.StringField(
        label="邮箱",
        validators=[
            validators.DataRequired(message="邮箱不能为空"),
            validators.Email(message="格式不对")
        ]
    )

    hobby = core.SelectMultipleField(
        label="爱好",
        choices=(
            (1, "电影"),
            (2, "音乐"),
            (3, "画画"),
            (4, "看书")
        ),
        coerce=int,
        default=(1, 4)

    )

    favor = core.SelectMultipleField(
        label="喜好",
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]


    )

    def __init__(self,*args,**kwargs):  #这里的self是一个RegisterForm对象
        '''重写__init__方法'''
        super().__init__(*args, **kwargs)  #继承父类的init方法
        self.favor.choices =((1, '篮球'), (2, '足球'), (3, '羽毛球'))  #吧RegisterForm这个类里面的favor重新赋值

    def validate_pwd_confim(self,field,):
        '''
        自定义pwd_config字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        '''
        # 最开始初始化时,self.data中已经有所有的值
        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证


class LoginForm(Form):
    username = simple.StringField(
        label="用户名",
        validators=[
            validators.DataRequired(message="用户名不能为空"),
            validators.length(min=4, max=8, message="长度必须大于等于4,小于等于8")
        ]
    )

    password = simple.PasswordField(
        label="密码",
        validators=[
            validators.DataRequired(message="密码不能为空"),
            validators.length(min=4, max=8, message="长度必须大于等于4,小于等于8")
        ]
    )


@app.route('/register',methods=["GET","POST"])
def register():
    if request.method=="GET":
        form = RegForm(data={'gender': 1})  #默认是1,
        return render_template("register.html",form=form)
    else:
        form = RegForm(formdata=request.form)
        if form.validate():  #判断是否验证成功
            print('用户提交数据通过格式验证,提交的值为:', form.data)  #所有的正确信息
        else:
            print(form.errors)  #所有的错误信息
        return render_template('register.html', form=form)



@app.route("/reg", methods=["GET", "POST"])
def reg():
    if request.method == "GET":
        rf = RegForm()
        return render_template("reg.html", rf=rf)
    else:
        rf_data = RegForm(request.form)
        if rf_data.validate():
            print(rf_data.data)
            return "OK"
        else:
            return render_template("reg.html", rf=rf_data)


@app.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "GET":
        login_form = LoginForm()
        return render_template("login.html", lf=login_form)
    else:
        login_form_data = LoginForm(request.form)
        if login_form_data.validate():
            user = login_form_data.data.get("username")
            return str(user)
        else:
            return render_template("login.html", lf=login_form_data)


if __name__ == '__main__':
    app.run(debug=True)
View Code
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>

<form action="" method="post" novalidate>
    {% for field in rf %}

    <p>{{ field.label }}{{ field }}{{ field.errors[0] }}</p>

    {% endfor %}

    <input type="submit" value="submit">
</form>

<form action="" method="post" novalidate>
    <p>{{ lf.username.label }}{{ lf.username }}{{ lf.username.errors.0 }}</p>
    <p>{{ lf.password.label }}{{ lf.password }}{{ lf.password.errors[0] }}</p>
    <input type="submit" value="submit">
</form>

</body>

</body>
</html>
View Code

Flask-请求上下文

# 偏函数partial
from functools import partial

def my_sum(a, b):
    print(a)
    print(b)
    return a + b


# new_my_sum = partial(my_sum, 10)
# new_my_sum此时就是下面这样的
# def my_sum(b):
#     a=10


new_my_sum = partial(my_sum,
                     b=10)  # 位置参数一定要在关键字参数前面  不能写成new_my_sum = partial(my_sum, a=10),如果非要这样写,下面就改成res = new_my_sum(b=20)
# new_my_sum此时就是下面这样的
# def my_sum(b):
#     b=10

res = new_my_sum(20)

print(res)


class My:
    def __call__(self, *args, **kwargs):
        print(666, '__call__')

    def __setattr__(self, key, value):
        print(key, value, '__setattr__')

    def __getattr__(self, item):
        print(item, '__getattr__')

    def __setitem__(self, key, value):
        print(key, value, '__setitem__')

    def __getitem__(self, item):
        print(item, '__getitem__')


a = My()
a()  # 执行__call__
a.name = 'tom'  # 执行__setattr__
a['age'] = 18  # __setitem__
a['name']  # __getitem__
a.qweqwewqe  # __getattr__

# 空间换时间
# 线程安全  Flask上下文机制就是使用的Threading.local
# 线程进来,开辟一个空间给这个线程,线程操作的所有任务,都会复制一份到空间中
from threading import local


class Foo(local):
    pass


print('##############################')

# 栈Stack
class Mystack(object):
    data = []

    def __setattr__(self, key, value):
        self.push(key, value)

    def push(self, key, value):
        self.data.append({key:value})

    def top(self):
        return self.data.pop()


my_stack = Mystack()
my_stack.name = 'tom'
my_stack.name = 'rose'

print(my_stack.data)  #[{'name': 'tom'}, {'name': 'rose'}]


#LocalStack
# import threading
# threading.current_thread().ident #当前线程的ID

import threading
from threading import get_ident #和上面是一样的
class MyLocalStack(object):
    data=[]
    storage={}

    def push(self, item):
        try:
            self.storage[get_ident()].append(item)
        except:
            self.storage[get_ident()]=[item]

    def top(self):
        return self.storage[get_ident()].pop()

my_local_stack=MyLocalStack()

import time
def go(i):
    my_local_stack.push(i)
    time.sleep(1)
    # my_local_stack.top()

for i in range(5):
    t = threading.Thread(target=go,args=(i,))
    t.start()

print(my_local_stack.storage)


###########
from werkzeug.wrappers import Request, Response
from werkzeug.serving import run_simple


@Request.application
def app(req):
    print(req)
    return Response("200ok")


run_simple("0.0.0.0", 9527, app)

"""
run_simple("0.0.0.0",9527,app)   会执行app函数

from flask import Flask
app=Flask(__name__)

if __name__ == '__main__':
    app.run()

"""
铺垫
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask,request
app=Flask(__name__)

if __name__ == '__main__':
    app.run()
    #run_simple(host, port, self, **options)
    #app()=obj()=obj.__call__
    #app.wsgi_app


"""
上文
self._local=={"__storage__":{},"__ident_func__":get_ident}
ctx = requestcontext对象  此对象中有 request/session 
此时  self._local=={"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}

_request_ctx_stack == LocalStack()==self._local=={"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}
"""

"""
下文源码
request = LocalProxy(partial(_lookup_req_object, "request")#返回的是request对象)
        

class LocalProxy(object):
    __slots__ = ("__local", "__dict__", "__name__", "__wrapped__")

    def __init__(self, local, name=None):  ####local==request偏函数==partial(_lookup_req_object, "request")#返回的是request对象
        object.__setattr__(self, "_LocalProxy__local", local)  #__local=local=request偏函数
        object.__setattr__(self, "__name__", name)
        if callable(local) and not hasattr(local, "__release_local__"):
            object.__setattr__(self, "__wrapped__", local)        
        
      
    def __getattr__(self, name):   ##request.method执行的就是这
        if name == "__members__":
            return dir(self._get_current_object())
        return getattr(self._get_current_object(), name)  #从request对象中取出 name ,name=method
        
        
    def _get_current_object(self):
        if not hasattr(self.__local, "__release_local__"):
            return self.__local()  ##request偏函数的执行,返回的是request对象
        try:
            return getattr(self.__local, self.__name__)
        except AttributeError:
            raise RuntimeError("no object bound to %s" % self.__name__)
      
        

def _lookup_req_object(name):  #name==request
    top = _request_ctx_stack.top   #top==ctx  ctx = requestcontext对象  此对象中有 request/session 
    if top is None:
        raise RuntimeError(_request_ctx_err_msg)
    return getattr(top, name)      #返回的是request对象  ,top是ctx name是request
    
    @property
    def top(self):
        try:
            return self._local.stack[-1]   ##
        except (AttributeError, IndexError):
            return None
    

"""


"""  上文源码 
    def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):   # self==app==Flask
        from werkzeug.serving import run_simple
        try:
            run_simple(host, port, self, **options)  #相当于执行self() 也就是 Flask.__call__()    # self==app==Flask
            Flask.__call__()

                def __call__(self, environ, start_response):  #environ==request
                    return self.wsgi_app(environ, start_response)

                        def wsgi_app(self, environ, start_response):
                            ctx = self.request_context(environ)  # self==app==Flask
                        ##  ctx = requestcontext对象  此对象中有 request/session  ##
                            error = None
                            try:
                                try:
                                    ctx.push()###   ctx执行的push
                                    response = self.full_dispatch_request()
                                except Exception as e:
                                    error = e
                                    response = self.handle_exception(e)
                                except:  # noqa: B001
                                    error = sys.exc_info()[1]
                                    raise
                                return response(environ, start_response)
                            finally:
                                if self.should_ignore_error(error):
                                    error = None
                                ctx.auto_pop(error)



                            def push(self):  ##self==ctx

                                top = _request_ctx_stack.top    ###_request_ctx_stack == LocalStack()=={"__storage__":{},"__ident_func__":get_ident}
                                if top is not None and top.preserved:
                                    top.pop(top._preserved_exc)

                                app_ctx = _app_ctx_stack.top
                                if app_ctx is None or app_ctx.app != self.app:
                                    app_ctx = self.app.app_context()
                                    app_ctx.push()
                                    self._implicit_app_ctx_stack.append(app_ctx)
                                else:
                                    self._implicit_app_ctx_stack.append(None)

                                if hasattr(sys, "exc_clear"):
                                    sys.exc_clear()

                                _request_ctx_stack.push(self)######### self==ctx    ctx = requestcontext对象  此对象中有 request/session

                                ###{"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}### 上文



                                def push(self, obj):   #########  obj就是ctx    self==_request_ctx_stack == LocalStack()=={"__storage__":{},"__ident_func__":get_ident}
                                    
                                    rv = getattr(self._local, "stack", None)  ##此时 self._local=={"__storage__":{},"__ident_func__":get_ident}
                                    if rv is None:
                                        self._local.stack = rv = [] ##此时 self.local=={"__storage__":{3721:{"stack":rv=[] }},"__ident_func__":get_ident}                                   
                                    rv.append(obj)  ##obj==ctx  ctx = requestcontext对象  此对象中有 request/session 此时  {"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}

                                    return rv  ##rv==ctx

                                
                                
                                class Local(object):
                                            def __setattr__(self, name, value):
                                                ident = self.__ident_func__()
                                                storage = self.__storage__
                                                try:
                                                    storage[ident][name] = value
                                                except KeyError:
                                                    storage[ident] = {name: value}
                                
                                

                                class LocalStack(object):

                                    def __init__(self):
                                        self._local = Local()

                                class Local(object):
                                    __slots__ = ("__storage__", "__ident_func__")

                                    def __init__(self):
                                        object.__setattr__(self, "__storage__", {})
                                        object.__setattr__(self, "__ident_func__", get_ident)


                        def request_context(self, environ):
                            return RequestContext(self, environ)   # self==app==Flask

                        class RequestContext(object):
                            def __init__(self, app, environ, request=None, session=None):  #self==requestcontext对象   app==app==flask
                                self.app = app
                                if request is None:
                                    request = app.request_class(environ)      #request_class = Request
                                self.request = request #######
                                self.url_adapter = None
                                try:
                                    self.url_adapter = app.create_url_adapter(self.request)
                                except HTTPException as e:
                                    self.request.routing_exception = e
                                self.flashes = None
                                self.session = session

                        class Request(RequestBase, JSONMixin):
                            pass

                        class BaseRequest(object):
                            def __init__(self, environ, populate_request=True, shallow=False):
                                self.environ = environ
                                if populate_request and not shallow:
                                    self.environ["werkzeug.request"] = self
                                self.shallow = shallow

"""
View Code

Flask-SQLAlchemy

首先要先安装一下Flask-SQLAlchemy这个模块

pip install Flask-SQLAlchemy

然后你要下载一个干净的Flask项目 点击下载

接下来基于这个Flask项目,我们要加入Flask-SQLAlchemy让项目变得生动起来

1.加入Flask-SQLAlchemy第三方组件

 1 from flask import Flask
 2 
 3 # 导入Flask-SQLAlchemy中的SQLAlchemy
 4 from flask_sqlalchemy import SQLAlchemy
 5 
 6 # 实例化SQLAlchemy
 7 db = SQLAlchemy()
 8 # PS : 实例化SQLAlchemy的代码必须要在引入蓝图之前
 9 
10 from .views.users import user
11 
12 
13 def create_app():
14     app = Flask(__name__)
15 
16     # 初始化App配置 这个app配置就厉害了,专门针对 SQLAlchemy 进行配置
17     # SQLALCHEMY_DATABASE_URI 配置 SQLAlchemy 的链接字符串儿
18     app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:DragonFire@127.0.0.1:3306/dragon?charset=utf8"
19     # SQLALCHEMY_POOL_SIZE 配置 SQLAlchemy 的连接池大小
20     app.config["SQLALCHEMY_POOL_SIZE"] = 5
21     # SQLALCHEMY_POOL_TIMEOUT 配置 SQLAlchemy 的连接超时时间
22     app.config["SQLALCHEMY_POOL_TIMEOUT"] = 15
23     app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
24 
25     # 初始化SQLAlchemy , 本质就是将以上的配置读取出来
26     db.init_app(app)
27 
28     app.register_blueprint(user)
29 
30     return app
MyApp/__init__.py

2.建立models.py ORM模型文件

 1 from MyApp import db
 2 
 3 Base = db.Model # 这句话你是否还记的?
 4 # from sqlalchemy.ext.declarative import declarative_base
 5 # Base = declarative_base()
 6 # 每一次我们在创建数据表的时候都要做这样一件事
 7 # 然而Flask-SQLAlchemy已经为我们把 Base 封装好了
 8 
 9 # 建立User数据表
10 class Users(Base): # Base实际上就是 db.Model
11     __tablename__ = "users"
12     __table_args__ = {"useexisting": True}
13     # 在SQLAlchemy 中我们是导入了Column和数据类型 Integer 在这里
14     # 就和db.Model一样,已经封装好了
15     id = db.Column(db.Integer,primary_key=True)
16     username = db.Column(db.String(32))
17     password = db.Column(db.String(32))
18 
19 
20 if __name__ == '__main__':
21     from MyApp import create_app
22     app = create_app()
23     # 这里你要回顾一下Flask应该上下文管理了
24     # 离线脚本:
25     with app.app_context():
26         db.drop_all()
27         db.create_all()
MyApp/models.py

3.登录视图函数的应用

 1 from flask import Blueprint, request, render_template
 2 
 3 user = Blueprint("user", __name__)
 4 
 5 from MyApp.models import Users
 6 from MyApp import db
 7 
 8 @user.route("/login",methods=["POST","GET"])
 9 def user_login():
10     if request.method == "POST":
11         username = request.form.get("username")
12         password = request.form.get("password")
13 
14         # 还记不记得我们的
15         # from sqlalchemy.orm import sessionmaker
16         # Session = sessionmaker(engine)
17         # db_sesson = Session()
18         # 现在不用了,因为 Flask-SQLAlchemy 也已经为我们做好会话打开的工作
19         # 我们在这里做个弊:
20         db.session.add(Users(username=username,password=password))
21         db.session.commit()
22 
23         # 然后再查询,捏哈哈哈哈哈
24         user_info = Users.query.filter(Users.username == username and User.password == password).first()
25         print(user_info.username)
26         if user_info:
27             return f"登录成功{user_info.username}"
28 
29     return render_template("login.html")
MyApp/views/user.py

Flask-Script

Flask-Script 从字面意思上来看就是 Flask 的脚本

是的,熟悉Django的同学是否还记得Django的启动命令呢? python manager.py runserver 大概是这样对吧

其实Flask也可以做到,基于 Flask-Script 就可以了 - 但是你还是得有一个项目 点击下载

1.安装 Flask-Script

pip install Flask-Script

2.将 Flask-Script 加入到 Flask 项目中

 1 import MyApp
 2 # 导入 Flask-Script 中的 Manager
 3 from flask_script import Manager
 4 
 5 app = MyApp.create_app()
 6 # 让app支持 Manager
 7 manager = Manager(app)
 8 
 9 if __name__ == '__main__':
10     #app.run()
11     # 替换原有的app.run(),然后大功告成了
12     manager.run()
MyApp/manager.py

3.使用命令启动 Flask 项目

python manager.py runserver

4.启动项目并更改配置参数(监听IP地址,监听端口)

python manager.py runserver -h 0.0.0.0 -p 9527

5.高级操作 - 自定制脚本命令

5.1.方式一 : @manager.command

 1 import MyApp
 2 # 导入 Flask-Script 中的 Manager
 3 from flask_script import Manager
 4 
 5 app = MyApp.create_app()
 6 # 让app支持 Manager
 7 manager = Manager(app) # type:Manager
 8 
 9 @manager.command
10 def DragonFire(arg):
11     print(arg)
12 
13 if __name__ == '__main__':
14     #app.run()
15     # 替换原有的app.run(),然后大功告成了
16     manager.run()
MyApp/manager.py
python manager.py DragonFire 666

5.2.方式二 : @manager.opation("-短指令","--长指令",dest="变量名")

 1 import MyApp
 2 # 导入 Flask-Script 中的 Manager
 3 from flask_script import Manager
 4 
 5 app = MyApp.create_app()
 6 # 让app支持 Manager
 7 manager = Manager(app) # type:Manager
 8 
 9 @manager.command
10 def DragonFire(arg):
11     print(arg)
12 
13 @manager.option("-n","--name",dest="name")
14 @manager.option("-s","--say",dest="say")
15 def talk(name,say):
16     print(f"{name}你可真{say}")
17 
18 if __name__ == '__main__':
19     #app.run()
20     # 替换原有的app.run(),然后大功告成了
21     manager.run()
MyApp/manager.py
python manager.py talk -n 赵丽颖 -s 漂亮
python manager.py talk --name DragonFire --say NB-Class

Flask-Migrate

终于到了Flask-Migrate,之前在学习Flask-SQLAlchemy的时候,有的同学就提过类似的问题,Flask支持 makemigration / migrate 吗?

答案在这里该诉你,如果你同时拥有两个三方组件 Flask-Script 和 Flask-Migrate 那么就支持这样的动作

首先你要有几个准备工作

项目下载

1.安装 Flask-Migrate

pip install Flask-Migrate

2.将 Flask-Migrate 加入到 Flask 项目中 - PS: 注意了 Flask-Migrate 是要依赖 Flask-Script 组件的

 1 import MyApp
 2 # 导入 Flask-Script 中的 Manager
 3 from flask_script import Manager
 4 
 5 # 导入 Flask-Migrate 中的 Migrate 和 MigrateCommand
 6 # 这两个东西说白了就是想在 Flask-Script 中添加几个命令和指令而已
 7 from flask_migrate import Migrate,MigrateCommand
 8 
 9 app = MyApp.create_app()
10 # 让app支持 Manager
11 manager = Manager(app) # type:Manager
12 
13 # Migrate 既然是数据库迁移,那么就得告诉他数据库在哪里
14 # 并且告诉他要支持那个app
15 Migrate(app,MyApp.db)
16 # 现在就要告诉manager 有新的指令了,这个新指令在MigrateCommand 中存着呢
17 manager.add_command("db",MigrateCommand) # 当你的命令中出现 db 指令,则去MigrateCommand中寻找对应关系
18 """
19 数据库迁移指令:
20 python manager.py db init 
21 python manager.py db migrate   # Django中的 makemigration
22 python manager.py db upgrade  # Django中的 migrate
23 """
24 
25 
26 @manager.command
27 def DragonFire(arg):
28     print(arg)
29 
30 @manager.option("-n","--name",dest="name")
31 @manager.option("-s","--say",dest="say")
32 def talk(name,say):
33     print(f"{name}你可真{say}")
34 
35 if __name__ == '__main__':
36     #app.run()
37     # 替换原有的app.run(),然后大功告成了
38     manager.run()
MyApp/manager.py

3.执行数据库初始化指令

python manager.py db init

此时你会发现你的项目目录中出现了一个好玩儿的东西

原文地址:https://www.cnblogs.com/bubu99/p/11229544.html