Flask框架

前言: 
    1.flask 是web框架,他和django一样是同步的框架
    2.Flask 基于Werkzeug WSGI工具箱和Jinja2 模板引擎
    安装:flask:  pip install  flask
app.run()
    本质:当执行app.run方法的时候,最终执行run_simplerun_simple(host,port,app,**aptions)
         最后 app(),对象加(),执行__call__方法
flask的4剑客:
    1.直接返回字符串
        return "ok"
     2.返回 html页面
        首先导入render_template
        return render_template("index.html",name="xxx ")
     3.跳转页面
        return redirect('/lobin')
     4.返回json数据
        return jsonify(name_dict)
flask配置文件的方法:
    1.app对象.属性=值 这种方式只能配置这两种 配置
        app.debug=True
        app.secret_key="4324353"
    2.已字典的形式配置
        app.config['DEGUG']=True
        
    3.以文件形式
        新建一个xxx.py文件,内写配置
            eg:DEGUG=True
        app.config.from_pyfile('xxx.py')

    4.以类的形式配置
        新建一个xxx.py文件,内写各种场景下的类
        
        class Config(object):
            DEBUG = False
            TESTING = False
            DATABASE_URI = 'sqlite://:memory:'


        class ProductionConfig(Config):
            DATABASE_URI = 'mysql://user@localhost/foo'


        class DevelopmentConfig(Config):
            DEBUG = True


        class TestingConfig(Config):
            TESTING = True
        
        app.config.from_project('xxx.xxx类 ')
配置方法
flask路由:
    @app.route("/index")本质是 app.add_url_rule("/index",view_func='index')
    route的参数:
        endpoint: 起别名 ,反向解析 即: url_for('名称')   注:不能重复
        methods: 不指定默认GET,允许的请求方式 如:["GET", "POST"]
        view_func:是endpoint 指向的函数:也就是请求该路由要响应的函数
        strict_slashes:该参数是用来设置我们的路由是否为严格模式,默认严格,True严格
        
        redirect_to:重定向的指定的地址
    
    route使用正则:
         我们要用自定义的路由,用正则的话
        1.导入from werkzeug.routing import BaseConverter
        2.要写一个类,然后继承BaseConverter,然后实现__init__;to_python(self, value);to_url(self, value)
        3.app.url_map.converters['谁便'] = RegexConverter
        4.我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='谁便,regex1("正则表达式")
        5.regex1("正则表达式")匹配出来的结果,返回to_python,一定要return
        6.当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上

        
        from flask import Flask, views, url_for
        from werkzeug.routing import BaseConverter

        app = Flask(import_name=__name__)

        class RegexConverter(BaseConverter):
            """
            自定义URL匹配正则表达式
            """
            def __init__(self, map, regex):
                super(RegexConverter, self).__init__(map)
                self.regex = regex

            def to_python(self, value):
                """
                路由匹配时,匹配成功后传递给视图函数中参数的值
                """
                #value就正则匹配出来的结果
                return "asdasdasd"

            def to_url(self, value):
                """
                使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
                """
                val = super(RegexConverter, self).to_url(value)
                print(val)
                return val

        app.url_map.converters['regex1'] = RegexConverter
        @app.route('/index/<regex1("d+"):nid>',endpoint="sb")
        def index(nid):
            print(url_for('sb', nid='888'))
            return 'Index'

        if __name__ == '__main__':
            app.run()    
flask路由
cbv:
    原始:
        from flask import Flask,views,request

        app = Flask(__name__)
        app.config.from_pyfile('settings.py')

        class IndexView(views.View):
            def dispatch_request(self):
                print(request.args)
                return 'index'

        app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))
        if __name__ == '__main__':
            app.run()    

    注:为什么as_view(name='index')?
        如果不指定,每次都是view,会报错,所有必须指定
    
    要想仿照django,类得继承Method.view
        from flask import Flask, views, request, url_for
        from functools import wraps

        def login_verify(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                user_name = request.args.get('user')
                password = request.args.get('password')
                if user_name == 'mark' and password == '123':
                    return func(*args,**kwargs)
                else:
                    return '请登录'
            return wrapper


        class CBVTest(views.MethodView):

            methods = ['GET','POST']  # 指定可以接收的方法有什么
            decorators = [login_verify,]  # 指定自定义的装饰器

            def get(self):
                print(url_for('cbvtest'))
                return 'cbv_get'
            def post(self):
                return 'cbv_post'
        app.add_url_rule('/cbvtest',view_func=CBVTest.as_view(name='cbvtest'),endpoint='end_demo')
请求与响应:
   请求相关信息
    request.method            提交的方法
    request.args               get请求提及的数据
    request.form               post请求提交的数据
    equest.values              post和get提交的数据总和
    request.cookies            客户端所带的cookie
    request.headers            请求头
    request.path               不带域名,请求路径
    request.full_path          不带域名,带参数的请求路径
    request.script_root
    request.url               带域名带参数的请求路径
    request.base_url          带域名请求路径
    request.url_root          域名
    request.host_url          域名
    request.host              127.0.0.1:500
    request.files           获取传过来的文件

   响应相关信息
     自定义一些响应
     1.导入make_response
     2.response=make_response(4剑客)
     3.操作response
     4.return response
      eg:
        response=make_response(render_template('index.html'))
        #设置cookie
        response.set_cookie('jason', 'sb')
        #删除cookie
        response.delete_cookie('key')
        #设置header 
        response.headers['X-Something'] = 'A value sbwewewe'
        
        obj = request.files['the_file_name']
        obj.save('/var/www/uploads/' + secure_filename(f.filename))
        return response
session:
    1导入session
    2必须设置app.secret_key="askjdaksd"
    3存session['name']="xxx"
    4取session['name']
    app.session_interface
    原理:
        存: 把session中的值加密序列化放到cookie中,返回到浏览器中
            1.调用 save_session,将我们的session 通过加密得到val,读取配置文件中的['SESSION_COOKIE_NAME']得到key
            2.将key与val 存进cookie
            
        取:从cookie中取出值,反解,生成session对象,在视图函数中直接用sessoin就可以
            1.获取request里面的cookies,获取里面的key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值
            2.对值进行解密
            
闪现:
    what?
        a页面产生信息,传给c页面;
    但是用户访问a页面以后,不是直接跳到c页面, 而是途经其他页面后,才去访问c页面,
    这时拿到a页面给的信息
    
    flash:存信息 
    get_flashed_messages:取信息
    
    from flask import Flask,render_template
    from flask import session,flash,get_flashed_messages

    app = Flask(__name__)
    app.config.from_pyfile('settings.py')

    @app.route('/')
    def home():
        session['name'] = 'wangyanfei'
        flash('普通信息', category='info')
        return render_template('home.html')

    @app.route('/index')
    def index():
        A = get_flashed_messages(with_categories=True)
        B = get_flashed_messages()
        C = get_flashed_messages(with_categories=True)
        print(A)
        print(B,C)
        return "dslkfjsdkjfl"

    if __name__ == '__main__':
        app.run()
    
    注:1.要用闪现 必须设置app.secret_key='dfk';
    
       2.我们可以通过 flash的参数 category 对信息做分类
            eg: flash('xxxx',category="info")
       3.我们可以通过get_flashed_messages的参数
            >:with_categories:以键值对的形式获取我们设置的闪现
            >:category_filter:进行分类信息的过滤
            eg: get_flashed_messages(with_categories=True,category_filter=('error',))
              
       4.同一次请求内,不管取多少次,都可以取到;第二次请求时就取不到了,因为上下文管理会把
            _request_ctx_stack.top.flashes 清空掉
请求扩展:
    1.before_request  类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情
        #基于它做用户登录认证
        @app.before_request
        def process_request(*args,**kwargs):
            if request.path == '/login':
                return None
            user = session.get('user_info')
            if user:
                return None
            return redirect('/login')
    2.after_request  类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常
        
        @app.after_request
        def process_response1(response):
            print('process_response1 走了')
            return response
            
    3.before_first_request 第一次请求时 执行,跟浏览器无关
    
    4.teardown_request  如论有没有异常都会执行,如果没有异常这个参数就是None,有就记录这个异常
    
    5.errorhandler 捕获异常并处理让用户无法感知 路径不存在时404,服务器内部错误500
        @app.errorhandler(404)
        def error_404(arg):
            return "404错误了"
            
        @app.errorhandler(500)
        def error_500(arg):
            print(arg)
            return "500错误了"

    6.template_global  标签 被装饰的函数在不传给模板的情况下,模板可以直接调用    
        @app.template_global()
        def sb(a1, a2):
            return a1 + a2
        #{{sb(1,2)}}
    7.template_filter   过滤器
        @app.template_filter()
        def db(a1, a2, a3):
            return a1 + a2 + a3
        #{{ 1|db(2,3)}}

            
    小结:
        1. 请求来时 多个before_request的话 ,自上而下执行
        2. 如果前面的有返回值,后面就不会执行,包括正真的视图函数,但是after_request依然会执行
        3. response走时 自下而上 的执行(源码中是先反转,再遍历);必须接受response,也必须返回response
        
中间件(了解):

    from flask import Flask

    app = Flask(__name__)

    @app.route('/')
    def index():
        return 'Hello World!'
    # 模拟中间件
    class Md(object):
        def __init__(self,old_wsgi_app):
            self.old_wsgi_app = old_wsgi_app

        def __call__(self,  environ, start_response):
            print('开始之前')
            ret = self.old_wsgi_app(environ, start_response)
            print('结束之后')
            return ret

    if __name__ == '__main__':
        #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法    
        #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
        #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
        #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
        #把原来的wsgi_app替换为自定义的,
        
        app.wsgi_app = Md(app.wsgi_app)
        app.run()
线程与偏函数:
    1.因为线程是不安全的,所以我们要解决这种线程不安全的问题,有如下两种解决方案

        方案一:加锁,多个线程要真正实现共用一个数据,并且该线程修改了数据之后会影响到其他线程

        方案二:使用threading.local对象把要修改的数据复制一份,使得每个数据互不影响

        思考:
            为什么不用加锁的思路?加锁的思路是多个线程要真正实现共用一个数据,并且该线程修改了数据之后会影响到其他线程,
        更适合类似于12306抢票的应用场景,而我们是要做请求对象的并发,想要实现的是该线程对于请求对象这部分内容有任何修改
        并不影响其他线程。所以使用方案二

    2.请求并发设计
        函数与方法:
            from types import MethodType,FunctionType

            class Foo(object):
                def fetch(self):
                    pass

            print(isinstance(Foo.fetch,MethodType))
            print(isinstance(Foo.fetch,FunctionType)) # True

            obj = Foo()
            print(isinstance(obj.fetch,MethodType)) # True
            print(isinstance(obj.fetch,FunctionType))
        threading.local:
            不用local:
            from threading import Thread
            import time
            cxw = -1
            def task(arg):
                global cxw
                cxw = arg
                # time.sleep(2)
                print(cxw)

            for i in range(10):
                t = Thread(target=task,args=(i,))
                t.start()
                
            使用local:
                from threading import Thread
                from threading import local
                import time
                from threading import get_ident
                # 特殊的对象
                cxw = local()
                def task(arg):
                    # 对象.val = 1/2/3/4/5
                    cxw.value = arg
                    time.sleep(2)
                    print(cxw.value)
                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
        通过字典自定义threading.local(函数):
            from threading import get_ident,Thread
            import time
                storage = {}
                def set(k,v):
                    ident = get_ident()
                    if ident in storage:
                        storage[ident][k] = v
                    else:
                        storage[ident] = {k:v}
                def get(k):
                    ident = get_ident()
                    return storage[ident][k]
                def task(arg):
                    set('val',arg)
                    v = get('val')
                    print(v)

                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
        面向对象:
            from threading import get_ident,Thread
            import time
            class Local(object):
                storage = {}
                def set(self, k, v):
                    ident = get_ident()
                    if ident in Local.storage:
                        Local.storage[ident][k] = v
                    else:
                        Local.storage[ident] = {k: v}
                def get(self, k):
                    ident = get_ident()
                    return Local.storage[ident][k]
            obj = Local()
            def task(arg):
                obj.set('val',arg) 
                v = obj.get('val')
                print(v)
            for i in range(10):
                t = Thread(target=task,args=(i,))
                t.start()
        通过setattr和getattr实现:
            from threading import get_ident,Thread
            import time
            class Local(object):
                storage = {}
                def __setattr__(self, k, v):
                    ident = get_ident()
                    if ident in Local.storage:
                        Local.storage[ident][k] = v
                    else:
                        Local.storage[ident] = {k: v}
                def __getattr__(self, k):
                    ident = get_ident()
                    return Local.storage[ident][k]
            obj = Local()
            def task(arg):
                obj.val = arg
                print(obj.val)
            for i in range(10):
                t = Thread(target=task,args=(i,))
                t.start()
        每个对象有自己的存储空间(字典):
            from threading import get_ident,Thread
            import time
            class Local(object):
                def __init__(self):
                    object.__setattr__(self,'storage',{})
                    self.storage={}
                def __setattr__(self, k, v):
                    ident = get_ident()
                    if ident in self.storage:
                        self.storage[ident][k] = v
                    else:
                        self.storage[ident] = {k: v}
                def __getattr__(self, k):
                    ident = get_ident()
                    return self.storage[ident][k]
            obj = Local()
            def task(arg):
                obj.val = arg
                obj.xxx = arg
                print(obj.val)
            for i in range(10):
                t = Thread(target=task,args=(i,))
                t.start()
        兼容线程和协程(源码到request中去看,看local的__getattr__,setattr):
            try:
                from greenlet import getcurrent as get_ident
            except Exception as e:
                from threading import get_ident
            from threading import Thread
            import time
            class Local(object):
                def __init__(self):
                    object.__setattr__(self,'storage',{})
                def __setattr__(self, k, v):
                    ident = get_ident()
                    if ident in self.storage:
                        self.storage[ident][k] = v
                    else:
                        self.storage[ident] = {k: v}
                def __getattr__(self, k):
                    ident = get_ident()
                    return self.storage[ident][k]
            obj = Local()
            def task(arg):
                obj.val = arg
                obj.xxx = arg
                print(obj.val)
            for i in range(10):
                t = Thread(target=task,args=(i,))
                t.start()
                       
        情况一:单进程单线程,基于全局变量就可以做
        情况二:单进程多线程,基于threading.local对象做
        情况三:单进程多线程多协程,如何做?
        方法:依赖于底层的werkzeug外部包,werkzeug实现了保证多线程和多协程的安全
        原理:
            就是在最开始导入线程和协程的唯一标识的时候统一命名为get_ident,并且先导入协程模块的时候如果报错说明不支持协程,
        就会去导入线程的get_ident,这样无论是只有线程运行还是协程运行都可以获取唯一标识,并且把这个标识的线程或协程需要设
        置的内容都分类存放于__storage__字典中.
                
        
    3.偏函数
        定义:
            当函数的参数太多,需要简化时,用functools.partial可以创建一个新的函数,这个新函数可以固定住原函数的部分参数,
        从而在调用时更简单.
        #偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数
        from functools import partial
        def test(a,b,c,d):
            return a+b+c+d

        tes=partial(test,1,2)
        print(tes(3,4))
线程与偏函数
上下文管理
   1 app.__call__
    2 wsgi_app(environ, start_response) 
        2.1 ctx = self.request_context(environ)        
            2.1.1 return RequestContext(self, environ)
                这里的self是app,environ请求相关
            2.1.2 return RequestContext(self, environ)
                得到了RequestContext的对象,而且有request属性
        2.2  2.1中的ctx就是RequestContext的对象  

        2.3  ctx.push()执行这个,就是RequestContext的对象的push方法
            2.3.1  #执行这个,self-->ctx
                _request_ctx_stack.push(self) 
            2.3.1.1 我们发现_request_ctx_stack = LocalStack()
                他的push方法的源码:
                    def push(self, obj):
                        rv = getattr(self._local, "stack", None)
                        if rv is None:     
                            # self._local=>stack-->storage['线程id']['stack']=[ctx,]
                            self._local.stack = rv = []
                        rv.append(obj)
                        return rv                    
    3在请求中获取request.form
        3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__
            def __getattr__(self, name):
                if name == "__members__":
                    return dir(self._get_current_object())
                #name-->form,
                #self._get_current_object()===>ctx.request,form
                #_get_current_object()---》self.__local()
                
                return getattr(self._get_current_object(), name)
                
            3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request")
             def _get_current_object(self):
             
                if not hasattr(self.__local, "__release_local__"):
                        #local==>partial(_lookup_req_object, "request")
                        #def __init__(self, local, name=None):
                            # object.__setattr__(self, "_LocalProxy__local", local)
                    #self.__local()===>local()
                    return self.__local()
                try:
                    return getattr(self.__local, self.__name__)
                except AttributeError:
                    raise RuntimeError("no object bound to %s" % self.__name__)
    4 partial(_lookup_req_object, "request")偏函数的源码
        def _lookup_req_object(name):
            #name是request
            #ctx
            top = _request_ctx_stack.top
            if top is None:
                raise RuntimeError(_request_ctx_err_msg)
            #ctx-->request
            return getattr(top, name)
        4.1中_request_ctx_stack.top
           @property
            def top(self):
            
            try:
                return self._local.stack[-1]
            except (AttributeError, IndexError):
                return None

    总结:其实操作flask的请求上下文就是操作Local中的字典__storage__
        
        1.通过REquestContext类首先实例化ctx请求上下文对象,其内部包含请求对象

        2.入栈,通过请求上下文对象的类的push()方法触发了LocalStack类的push() 方法,从而添加到Local类中的字典里。

        3.观察导入的request源码 ,通过观察LocalProxy的源码,最后触发了LocalStack的top()方得到上下文对象,再得到请求对象
          从而实现reuqest的功能。到请求对象,从而实现reuqest的功能。

        4.出栈通过请求上下文对象的类的方法,触发了LocalStack的的pop()方法从而从字典中删除掉当前线程或当前携程的请求信息。
蓝图:
    基本使用: 创建蓝图-->利用蓝图创建路由关系-->注册蓝图
        -templates
        -static
        -views
            -user.py
            -order.py
        -app.py
            
        user.py/order.py
        
            from flask import Blueprint

            # 1 创建蓝图
            user_bp = Blueprint('user',__name__)

            # 2 利用蓝图创建路由关系
            @user_bp.route('/login/')
            def login():
                return "login"

            @user_bp.route('/logout/')
            def logout():
                return "logout"
                
        app.py    
        
            from flask import Flask
            from views.user import user_bp
            from views.order import order_bp

            app = Flask(__name__)
            # 3 注册蓝图
            app.register_blueprint(user_bp)
            app.register_blueprint(order_bp)


            if __name__ == '__main__':
                app.run()

        注:
            1.user_bp :是用于指向创建出的蓝图对象,可以自由命名。
            2.Blueprint的第一个参数自定义命名的‘user’用于url_for翻转url时使用。
            3.__name__用于寻找蓝图自定义的模板和静态文件使用。
    高级使用:
        1.蓝图中实现path部分的url前缀
            1.创建蓝图的时候填写url_prefix可以为增加url的path部分的前缀,可以更方便的去管理访问视图函数
                from flask import Blueprint
                # 1 创建蓝图
                user_bp = Blueprint('user',__name__,url_prefix='/user')
                # 注意斜杠跟视图函数的url连起来时候不要重复了。
                
            2.url加前缀的时候也可以再注册蓝图的时候加上,更推荐这么做,因为代码的可读性更强。
                app.register_blueprint(user_bp,url_prefix='/order')
        2.蓝图中自定义模板路径    
            创建蓝图的时候填写template_folder可以指定自定义模板路径
                # 1 创建蓝图                                           #所对应的参数路径是相对于蓝图文件的
                user_bp = Blueprint('user',__name__,url_prefix='/user',template_folder='views_templates')
            注:
                1.蓝图虽然指定了自定义的模板查找路径,但是查找顺序还是会先找主app规定的模板路径(templates),
                  找不到再找蓝图自定义的模板路径
                2.Blueprint的template_folder参数指定的自定义模板路径是相对于蓝图文件的路径。
g对象:
    作用:
        专门用来存储用户信息的,g的全称的为global
    使用范围:    
        g对象在一次请求中的所有的代码的地方,都是可以使用的
    特性:
        1.当前请求内你设置了就可以取,必须先设置,后取,当前请求可以取无数次
        2.当前请求内你设置了,没取,其他请求来了也取不到
    
        from  flask import Flask,request,g
        
        app=Flask(__name__)
        
        def set_g():
            g.name='sb'

        @app.route("/")
        def index():
            set_g()

            return redirect("/index")

        @app.route("/index")
        def login():
            print(g.name)
            return "2"

        if __name__ == '__main__':
            app.run()
            
        

    g对象和session的区别:
        session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session;
        g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
信号:
    定义:基于blinker,可以让开发者在flask请求过程中定制一些用户行为    
    安装: pip install blinker
    步骤: 
        1.往信号中注册函数signals.request_started.connect(func) 
        2.触发信号signals.request_started.send()
    自定义信号:
        from flask import Flask
        from flask.signals import _signals

        app = Flask(import_name=__name__)

        # 自定义信号
        xxxxx = _signals.signal('xxxxx')

        def func(sender,a):
            print(sender,a)
            print("我是自定义信号")

        # 自定义信号中注册函数
        xxxxx.connect(func)


        @app.route("/x")
        def index():
            # 触发信号
            xxxxx.send("sb",a="1")
            return 'Index'

        if __name__ == '__main__':
            app.run()
            
flask-session:

    作用:将cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy中
    安装:pip3 install flask-session
    
    方式1:
        from flask import Flask,session,jsonify
        from flask_session import RedisSessionInterface
        import redis
        app = Flask(__name__)
        app.config.from_pyfile('settings.py')
        conn = redis.Redis(host='127.0.0.1', port=6379)
        app.session_interface=RedisSessionInterface(conn,key_prefix='wyf')
        @app.route('/')
        def home():
            session['token'] = 'wyf2012'
            return 'ojbk'

        def login():
            a = session['token']
            print(a)
            dic = {'a': a}
            return jsonify(dic)

        app.add_url_rule('/login',view_func=login,endpoint='mylog')

        if __name__ == '__main__':
            app.run()
            
    方式2:
        from redis import Redis
        from flask.ext.session import Session
        app.config['SESSION_TYPE'] = 'redis'
        app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
        Session(app)
        
    注:RedisSessionInterface的参数
        use_signer:是否对key签名
        permanent: 关闭浏览器,cookie是否失效
flask-script:自定义一些类似django的命令
    安装: pip install flask-script
    使用:
        from flask import Flask
        from flask_script import Manager
        app = Flask(__name__)
        manager=Manager(app)

        @app.route("/")
        def index():
            return "ok"

        @manager.command
        def custom1(arg,a):
            """
            自定义命令
            python manage.py custom 123
            :param arg:
            :return:
            """
            print(arg,a)


        @manager.option('-n', '--name', dest='name')
        @manager.option('-u', '--url', dest='url')
        def cmd1(name, url):
            """
            自定义命令(-n也可以写成--name)
            执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
            执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
            :param name:
            :param url:
            :return:
            """
            print(name, url)

        if __name__ == '__main__':
            manager.run()
            
多app应用:(了解)
    使用:
        from werkzeug.wsgi import DispatcherMiddleware
        from werkzeug.serving import run_simple
        from flask import Flask
        app1 = Flask('app01')
        app2 = Flask('app02')

        @app1.route('/index')
        def index():
            return "app01"

        @app2.route('/index')
        def index2():
            return "app2"
        dm = DispatcherMiddleware(app1, {
            '/sec12': app2,
        })
        if __name__ == "__main__":

            run_simple('localhost', 5000, dm)
wtforms:
    安装: pip install wtforms
    使用:
        app.py
            from flask import Flask, render_template, request, redirect
            from wtforms import Form
            from wtforms.fields import simple
            from wtforms import validators
            from wtforms import widgets

            app = Flask(__name__)

            app.debug = True


            class LoginForm(Form):
                # 字段(内部包含正则表达式)
                name = simple.StringField(
                    label='用户名',
                    validators=[
                        validators.DataRequired(message='用户名不能为空.'),
                        validators.Length(min=2, max=6, message='用户名长度必须大于%(min)d且小于%(max)d')
                    ],
                    widget=widgets.TextInput(), # 页面上显示的插件
                    render_kw={'class': 'form-control'}
                )
                # 字段(内部包含正则表达式)
                pwd = simple.PasswordField(
                    label='密码',
                    validators=[
                        validators.DataRequired(message='密码不能为空.'),
                        validators.Length(min=8, message='密码长度必须大于%(min)d'),
                        # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                        #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

                    ],
                    widget=widgets.PasswordInput(),
                    render_kw={'class': 'form-control'}
                )



            @app.route('/login', methods=['GET', 'POST'])
            def login():
                if request.method == 'GET':
                    form = LoginForm()
                    return render_template('login.html', form=form)
                else:
                    form = LoginForm(formdata=request.form)
                    if form.validate():
                        print('用户提交数据通过格式验证,提交的值为:', form.data)
                    else:
                        print(form.errors)
                    return render_template('login.html', form=form)

            if __name__ == '__main__':
                app.run()
        
        html:
            <!DOCTYPE html>
            <html lang="en">
            <head>
                <meta charset="UTF-8">
                <title>Title</title>
            </head>
            <body>
            <h1>登录</h1>
            <form method="post">
                <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
                <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
                <input type="submit" value="提交">
            </form>
            </body>
            </html>
    
    所有表单验证:
        app.py
            from flask import Flask, render_template, request, redirect
            from wtforms import Form
            from wtforms.fields import core
            from wtforms.fields import html5
            from wtforms.fields import simple
            from wtforms import validators
            from wtforms import widgets

            app = Flask(__name__, template_folder='templates')
            app.debug = True



            class RegisterForm(Form):

                def validate_pwd_confirm(self, field):
                    """
                    自定义pwd_confirm字段规则,例:与pwd字段是否一致
                    :param field:
                    :return:
                    """
                    # 最开始初始化时,self.data中已经有所有的值
                    print(field.data)
                    if field.data !="sb":
                        #raise validators.ValidationError("sb")  # 继续后续验证
                        raise validators.StopValidation("SB")  # 不再继续后续验证

                    # if field.data != self.data['pwd']:
                    #     raise validators.ValidationError("密码不一致") # 继续后续验证
                        #raise validators.StopValidation("密码不一致")  # 不再继续后续验证

                name = simple.StringField(
                    label='用户名',
                    validators=[
                        validators.DataRequired()
                    ],
                    widget=widgets.TextInput(),
                    render_kw={'class': 'form-control'},
                    default='tank'
                )

                pwd = simple.PasswordField(
                    label='密码',
                    validators=[
                        validators.DataRequired(message='密码不能为空.')
                    ],
                    widget=widgets.PasswordInput(),
                    render_kw={'class': 'form-control'}
                )

                pwd_confirm = simple.PasswordField(
                    label='重复密码',
                    validators=[
                        validate_pwd_confirm,
                        validators.DataRequired(message='重复密码不能为空.'),
                        #validators.EqualTo('pwd', message="两次密码输入不一致")
                    ],
                    widget=widgets.PasswordInput(),
                    render_kw={'class': 'form-control'}
                )

                email = html5.EmailField(
                    label='邮箱',
                    validators=[
                        validators.DataRequired(message='邮箱不能为空.'),
                        validators.Email(message='邮箱格式错误')
                    ],
                    widget=widgets.TextInput(input_type='email'),
                    render_kw={'class': 'form-control'}
                )

                gender = core.RadioField(
                    label='性别',
                    choices=(
                        (1, ''),
                        (2, ''),
                    ),
                    coerce=int # “1” “2”
                 )
                city = core.SelectField(
                    label='城市',
                    choices=(
                        ('bj', '北京'),
                        ('sh', '上海'),
                    )
                )

                hobby = core.SelectMultipleField(
                    label='爱好',
                    choices=(
                        (1, '篮球'),
                        (2, '足球'),
                    ),
                    coerce=int
                )

                favor = core.SelectMultipleField(
                    label='喜好',
                    choices=(
                        (1, '篮球'),
                        (2, '足球'),
                    ),
                    widget=widgets.ListWidget(prefix_label=False),
                    option_widget=widgets.CheckboxInput(),
                    coerce=int,
                    default=[1, 2]
                )

                def __init__(self, *args, **kwargs):
                    super(RegisterForm, self).__init__(*args, **kwargs)
                    self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
                    self.favor.data=[1,]





            @app.route('/register', methods=['GET', 'POST'])
            def register():
                if request.method == 'GET':
                    form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
                    return render_template('register.html', form=form)
                else:
                    form = RegisterForm(formdata=request.form)
                    if form.validate():
                        print('用户提交数据通过格式验证,提交的值为:', form.data)
                    else:
                        print(form.errors)
                    return render_template('register.html', form=form)



            if __name__ == '__main__':
                app.run()
                    
        html:
            <!DOCTYPE html>
            <html lang="en">
            <head>
                <meta charset="UTF-8">
                <title>Title</title>
            </head>
            <body>
            <h1>用户注册</h1>
            <form method="post" novalidate style="padding:0  50px">
                {% for field in form %}
                <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
                {% endfor %}
                <input type="submit" value="提交">
            </form>
            </body>
wtforms
Flask-SQLAlchemy:
    定义:flask和SQLAchemy的管理者,通过他把他们做连接
    下载: pip install flask-sqlalchemy
    下载: pip install flask-migrate
    
    案例:
        acccount.py
            #!/usr/bin/env python
            # -*- coding:utf-8 -*-

            from flask import Blueprint
            from .. import db
            from .. import models

            account = Blueprint('account', __name__)


            @account.route('/login')
            def login():
                # db.session.add(models.Users(username='lqz', email='123'))
                # db.session.query(models.Users).all()
                # db.session.commit()
                # 添加示例
                """
                db.session.add(models.Users(username='lqz', pwd='123', gender=1))
                db.session.commit()

                obj = db.session.query(models.Users).filter(models.Users.id == 1).first()
                print(obj)

                PS: db.session和db.create_session
                """
                # db.session.add(models.Users(username='wupeiqi1', email='wupeiqi1@xx.com'))
                # db.session.commit()
                # db.session.close()
                #
                # db.session.add(models.Users(username='wupeiqi2', email='wupeiqi2@xx.com'))
                # db.session.commit()
                # db.session.close()
                # db.session.add(models.Users(username='alex1',email='alex1@live.com'))
                # db.session.commit()
                # db.session.close()



                user_list = db.session.query(models.Users).all()
                db.session.close()
                for item in user_list:
                    print(item.username)


                return 'login'                
                
        __init__.py    
            #!/usr/bin/env python
            # -*- coding:utf-8 -*-
            from flask import Flask
            from flask_sqlalchemy import SQLAlchemy
            db = SQLAlchemy()

            from .models import *
            from .views import account

            def create_app():
                app = Flask(__name__)
                app.config.from_object('settings.DevelopmentConfig')
                # 将db注册到app中
                db.init_app(app)
                # 注册蓝图
                app.register_blueprint(account.account)

                return app


        models.py

            from . import db

            class Users(db.Model):
                """
                用户表
                """
                __tablename__ = 'users'
                id = db.Column(db.Integer, primary_key=True)
                username = db.Column(db.String(80), unique=True, nullable=False)
                email = db.Column(db.String(120), unique=True, nullable=False)
                # ids = db.Column(db.Integer)

                def __repr__(self):
                    return '<User %r>' % self.username


        manage.py
            
            from sansa import create_app
            from flask_script import Manager
            from flask_migrate import Migrate,MigrateCommand
            from sansa import db
            app = create_app()
            manager=Manager(app)
            #为了实现迁移
            Migrate(app,db)
            #现在把命令注册进来
            manager.add_command('db1', MigrateCommand)

            if __name__ == '__main__':
                # app.run()
                manager.run()


        settings.py
            
            class BaseConfig(object):
                # SESSION_TYPE = 'redis'  # session类型为redis
                # SESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀
                # SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。
                # SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密

                SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/flask01?charset=utf8"
                SQLALCHEMY_POOL_SIZE = 5
                SQLALCHEMY_POOL_TIMEOUT = 30
                SQLALCHEMY_POOL_RECYCLE = -1

                # 追踪对象的修改并且发送信号
                SQLALCHEMY_TRACK_MODIFICATIONS = False


            class ProductionConfig(BaseConfig):
                pass


            class DevelopmentConfig(BaseConfig):
                pass


            class TestingConfig(BaseConfig):
                pass
flask-sqlalchemy
 
原文地址:https://www.cnblogs.com/wyf20190411-/p/13772188.html