Flask 信号

Flask本身没有信号,需要安装第三方组件。Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为。

pip3 install blinker

一、内置信号

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

部分源码流程:

class Flask(_PackageBoundObject):

    def full_dispatch_request(self):
        self.try_trigger_before_first_request_functions()
        try:
            ########## 触发request_started信号 ##########
            request_started.send(self)       
            rv = self.preprocess_request()
            if rv is None:
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)

    def wsgi_app(self, environ, start_response):
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)
request_started
class Flask(_PackageBoundObject):

    def finalize_request(self, rv, from_error_handler=False):
    response = self.make_response(rv)
    try:
        response = self.process_response(response)
        ########## request_finished信号 ##########
        request_finished.send(self, response=response)
    except Exception:
        if not from_error_handler:
            raise
        self.logger.exception('Request finalizing failed with an error while handling an error')
    return response
request_finished

二、自定义信号

from flask import Flask, current_app, flash, render_template
from flask.signals import _signals

app = Flask(import_name=__name__)

# 自定义信号
xx = _signals.signal("xx")

def func(sender, *args, **kwargs):
    print(sender, args, kwargs)  # 123 () {'k1': 'v1'}

# 自定义信号中注册函数
xx.connect(func)

@app.route("/index")
def index():
    # 触发信号
    xx.send("123", k1="v1")
    return "index"

if __name__ == "__main__":
    app.run()
原文地址:https://www.cnblogs.com/believepd/p/10397069.html