0220 session表单信号SQLAlchemy

flask-session

open_session在请求刚进来时执行,完成session对象的创建(就是一特殊字典),在视图函数中完成对session的赋值操作,save_session()在视图函数执行完后,生成response后执行,将session写入response的cookie中。

flask-session是flask框架的session组件,由于原来flask内置session使用签名cookie保存,该组件则将支持session保存到多个地方,如:

  • redis:保存数据的一种工具,五大类型。非关系型数据库
  • memcached
  • filesystem
  • mongodb
  • sqlalchmey:那数据存到数据库表里面

使用

安装

pip install flask-session

作用

将默认保存的cookie中的值,保存到redis

使用

from flask import Flask,session
# 导入RedisSessionInterface类
from flask_session import RedisSessionInterface
# 保存至redis中
import redis

app = Flask(__name__)
conn=redis.Redis(host='127.0.0.1',port=6379)

#use_signer是否对key签名
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz')

@app.route('/')
def hello_world():
    session['name']='lqz'
    return 'Hello World!'

if __name__ == '__main__':
    app.run()

使用2

from redis import Redis
from flask import Flask,session
from flask_session import Session
# 创建flask对象app
app = Flask(__name__)

# 更改配置
app.config['SESSION_TYPE'] = "redis"
# redis配置
app.config['SESSION_REDIS'] = Redis(host="127.0.0.1",port="6379")

Session(app)

# 创建视图函数
@app.route("/")
def index():
    session['name'] = 'sb'
    return "ojbk"

@app.route("/login")
def login():
    print(session.get('name'))
    print(session['name'])
    return "login"

if __name__ == '__main__':
    app.run()

解析

Flask 中,所有和 session 有关的调用,都是转发到 self.session_interface 的方法调用上(这样用户就能用自定义的 session_interface 来控制 session 的使用)。而默认的 session_inerface 有默认值:

session_interface = SecureCookieSessionInterface()

执行SecureCookieSessionInterface.open_session()来生成默认session对象:

	def open_session(self, app, request):
        获取session签名的算法
        s = self.get_signing_serializer(app)
	如果为空 直接返回None
        if s is None:
            return None
        val = request.cookies.get(app.session_cookie_name)
	# 如果val为空,即request.cookies为空
        if not val:
            return self.session_class()
        max_age = total_seconds(app.permanent_session_lifetime)
        try:
            data = s.loads(val, max_age=max_age)
            return self.session_class(data)
        except BadSignature:
            return self.session_class()

请求第一次来时,request.cookies为空,即返回self.session_class():

session_class = SecureCookieSession

看SecureCookieSession:

class SecureCookieSession(CallbackDict, SessionMixin):
modified = False
accessed = False

def __init__(self, initial=None):
    def on_update(self):
        self.modified = True
        self.accessed = True
 
    super(SecureCookieSession, self).__init__(initial, on_update)
 
def __getitem__(self, key):
    self.accessed = True
    return super(SecureCookieSession, self).__getitem__(key)
 
def get(self, key, default=None):
    self.accessed = True
    return super(SecureCookieSession, self).get(key, default)
 
def setdefault(self, key, default=None):
    self.accessed = True
    return super(SecureCookieSession, self).setdefault(key, default)

看其继承关系,其实就是一个特殊的字典。到此我们知道了session就是一个特殊的字典,调用SecureCookieSessionInterface类的open_session()创建,并保存在ctx中,即RequestContext对象中。但最终由session = LocalProxy(..., 'session')对象代为管理,到此,在视图函数中就可以导入session并使用了。

三、当请求第二次来时,session生成的是什么?

当请求第二次到来时,与第一次的不同就在open_session()那个val判断处,此时cookies不为空, 获取cookie的有效时长,如果cookie依然有效,通过与写入时同样的签名算法将cookie中的值解密出来并写入字典并返回中,若cookie已经失效,则仍然返回'空字典'。

四、特殊的SecureCookieSession字典有那些功能?如何实现的?

默认的 session 对象是 SecureCookieSession,这个类就是一个基本的字典,外加一些特殊的属性,比如 permanent(flask 插件会用到这个变量)、modified(表明实例是否被更新过,如果更新过就要重新计算并设置 cookie,因为计算过程比较贵,所以如果对象没有被修改,就直接跳过)。

怎么知道实例的数据被更新过呢? SecureCookieSession 是基于 werkzeug/datastructures:CallbackDict 实现的,这个类可以指定一个函数作为 on_update 参数,每次有字典操作的时候(setitemdelitem、clear、popitem、update、pop、setdefault)会调用这个函数。

SecureCookieSession:

class SecureCookieSession(CallbackDict, SessionMixin):

modified = False
accessed = False
 
def __init__(self, initial=None):
    def on_update(self):  
        self.modified = True
        self.accessed = True
    #将on_update()传递给CallbackDict
    super(SecureCookieSession, self).__init__(initial, on_update)
 
def __getitem__(self, key):
    self.accessed = True
    return super(SecureCookieSession, self).__getitem__(key)
 
def get(self, key, default=None):
    self.accessed = True
    return super(SecureCookieSession, self).get(key, default)
 
def setdefault(self, key, default=None):
    self.accessed = True
    return super(SecureCookieSession, self).setdefault(key, default)

继承的 CallbackDict:

class CallbackDict(UpdateDictMixin, dict):

def __init__(self, initial=None, on_update=None):
    dict.__init__(self, initial or ())
    self.on_update = on_update
 
def __repr__(self):
    return '<%s %s>' % (
        self.__class__.__name__,
        dict.__repr__(self)
    )

CallbackDict又继承UpdateDictMixin:

class UpdateDictMixin(object):
on_update = None

def calls_update(name):
    def oncall(self, *args, **kw):
        rv = getattr(super(UpdateDictMixin, self), name)(*args, **kw)
        if self.on_update is not None:
            self.on_update(self)
        return rv
    oncall.__name__ = name
    return oncall
 
def setdefault(self, key, default=None):
    modified = key not in self
    rv = super(UpdateDictMixin, self).setdefault(key, default)
    if modified and self.on_update is not None:
        self.on_update(self)
    return rv
 
def pop(self, key, default=_missing):
    modified = key in self
    if default is _missing:
        rv = super(UpdateDictMixin, self).pop(key)
    else:
        rv = super(UpdateDictMixin, self).pop(key, default)
    if modified and self.on_update is not None:
        self.on_update(self)
    return rv
 
__setitem__ = calls_update('__setitem__')
__delitem__ = calls_update('__delitem__')
clear = calls_update('clear')
popitem = calls_update('popitem')
update = calls_update('update')
del calls_update

由UpdateDictMixin()可知,对session进行改动会调用pop, __setitem__等方法,同时就会调用on_update()方法,从而修改

modify,security的值。

五、签名算法:

都获取 cookie 数据的过程中,最核心的几句话是:

s = self.get_signing_serializer(app)
val = request.cookies.get(app.session_cookie_name)
data = s.loads(val, max_age=max_age)

return self.session_class(data)

其中两句都和 s 有关,signing_serializer 保证了 cookie 和 session 的转换过程中的安全问题。如果 flask 发现请求的 cookie 被篡改了,它会直接放弃使用。

我们继续看 get_signing_serializer 方法:

def get_signing_serializer(self, app):
    if not app.secret_key:
        return None
    signer_kwargs = dict(
        key_derivation=self.key_derivation,
        digest_method=self.digest_method
    )
    return URLSafeTimedSerializer(app.secret_key,
        salt=self.salt,
        serializer=self.serializer,
        signer_kwargs=signer_kwargs)

我们看到这里需要用到很多参数:

secret_key:密钥。这个是必须的,如果没有配置 secret_key 就直接使用 session 会报错

salt:为了增强安全性而设置一个 salt 字符串(可以自行搜索“安全加盐”了解对应的原理)

serializer:序列算法

signer_kwargs:其他参数,包括摘要/hash算法(默认是 sha1)和 签名算法(默认是 hmac)

URLSafeTimedSerializer 是 itsdangerous 库的类,主要用来进行数据验证,增加网络中数据的安全性。itsdangerours提供了多种 Serializer,可以方便地进行类似 json 处理的数据序列化和反序列的操作。至于具体的实现,因为篇幅限制,就不解释了。

六、session什么时候写入cookie中?session的生命周期?

前面的几个问题实际上都发生在wsgi_app()前两句函数中,主要就是ctx.push()函数中,下面看看wsgi_app()后面干了嘛:

def wsgi_app(self, environ, start_response):
  
    ctx = self.request_context(environ)
    error = None
    try:
        try:
	# ctx.push函数是前半部分最重要的一个函数
	# 生成request和session并将二者保存到RequestContext()对象ctxz中
	# 最后将ctx,push到LocalStack()对象_request_ctx_stack中
            ctx.push()
            # 寻找视图函数,并执行
            response = self.full_dispatch_request()
        except Exception as e:
            error = e
            response = self.handle_exception(e)
        except:
            error = sys.exc_info()[1]
            raise
        return response(environ, start_response)
    finally:
        if self.should_ignore_error(error):
            error = None
        # 最后, 将自己请求在local中的数据清除
        ctx.auto_pop(error)

看full_dispatch_request:

def full_dispatch_request(self):
    #执行before_first_request
    self.try_trigger_before_first_request_functions()
    try:
        # 触发request_started 信号
        request_started.send(self)
        # 调用before_request
        rv = self.preprocess_request()
        if rv is None:
            #执行视图函数
            rv = self.dispatch_request()
    except Exception as e:
        rv = self.handle_user_exception(e)
    return self.finalize_request(rv)

前半部分就在执行flask钩子,before_first_request, before_request以及信号,接着执行视图函数生成rv,我们主要看finalize_request(rv):

 def finalize_request(self, rv, from_error_handler=False):
        response = self.make_response(rv)
        try:
            response = self.process_response(response)
            request_finished.send(self, response=response)
        except Exception:
            if not from_error_handler:
                raise
            self.logger.exception('Request finalizing failed with an '
                                  'error while handling an error')
        return response
首先根据rv生成response。再执行process_response:

   def process_response(self, response):
        ctx = _request_ctx_stack.top
        bp = ctx.request.blueprint
        funcs = ctx._after_request_functions
        if bp is not None and bp in self.after_request_funcs:
            funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
        if None in self.after_request_funcs:
            funcs = chain(funcs, reversed(self.after_request_funcs[None]))
        for handler in funcs:
            response = handler(response)
        if not self.session_interface.is_null_session(ctx.session):
            self.session_interface.save_session(self, ctx.session, response)
        return response

前半部分主要执行flask的钩子,看后面,判断,session是否为空,如果不为空,则执行save_session():

def save_session(self, app, session, response):
    domain = self.get_cookie_domain(app)
    path = self.get_cookie_path(app)
 
    # If the session is modified to be empty, remove the cookie.
    # If the session is empty, return without setting the cookie.
    if not session:
        if session.modified:
            response.delete_cookie(
                app.session_cookie_name,
                domain=domain,
                path=path
            )
 
        return
 
    # Add a "Vary: Cookie" header if the session was accessed at all.
    if session.accessed:
        response.vary.add('Cookie')
 
    if not self.should_set_cookie(app, session):
        return
 
    httponly = self.get_cookie_httponly(app)
    secure = self.get_cookie_secure(app)
    samesite = self.get_cookie_samesite(app)
    expires = self.get_expiration_time(app, session)
    val = self.get_signing_serializer(app).dumps(dict(session))
    response.set_cookie(
        app.session_cookie_name,
        val,
        expires=expires,
        httponly=httponly,
        domain=domain,
        path=path,
        secure=secure,
        samesite=samesite
    )

save_session()比较简单,且有注释,便不再讲解,主要就是将session写入response.set_cookie中。这样便完成session的写入response工作,并由response返回至客户端。

再请求结束时会执行wsgi_app()的finally:ctx.auto_pop(error)函数,将与对应请求相关的request,session清除,session生命周期便结束。

总结:

至此,flask内置session的机制便讲解完毕,session的实现是依赖与flask的上下文管理,因此先弄清楚flask上下文,再来看session就比较容易理解。其主要的就是SecureCookieSessionInterface对象的open_session()与save_session() 。open_session在请求刚进来时执行,完成session对象的创建(就是一特殊字典),在视图函数中完成对session的赋值操作,save_session()在视图函数执行完后,生成response后执行,将session写入response的cookie中。

当然,flask内置session无法满足生产需求。因为将session数据全部保存在cookie中不安全且cookie存储数据量有限,但flask-session组件帮我们实现了将数据保存在服务器''数据库''中而只将sessionID保存在cookie中.

表单验证wtforms

用法详解

WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。

安装

pip3 install wtforms

使用1

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')

app.debug = True


class LoginForm(Form):
    # 字段(内部包含正则表达式)
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(), # 页面上显示的插件
        render_kw={'class': 'form-control'}

    )
    # 字段(内部包含正则表达式)
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)

if __name__ == '__main__':
    app.run()

login.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>

使用2

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, '男'),
            (2, '女'),
        ),
        coerce=int # “1” “2”
     )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))

    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for field in form %}
    <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>

使用3

from flask import Flask,render_template,redirect,request
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__,template_folder="templates")
app.debug = True

=======================simple===========================
class RegisterForm(Form):
    name = simple.StringField(
        label="用户名",
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={"class":"form-control"},
        default="haiyan"
    )
    pwd = simple.PasswordField(
        label="密码",
        validators=[
            validators.DataRequired(message="密码不能为空")
        ]
    )
    pwd_confim = simple.PasswordField(
        label="重复密码",
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd',message="两次密码不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

  ========================html5============================
    email = html5.EmailField(  #注意这里用的是html5.EmailField
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

  ===================以下是用core来调用的=======================
    gender = core.RadioField(
        label="性别",
        choices=(
            (1,"男"),
            (1,"女"),
        ),
        coerce=int  #限制是int类型的
    )
    city = core.SelectField(
        label="城市",
        choices=(
            ("bj","北京"),
            ("sh","上海"),
        )
    )
    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )
    favor = core.SelectMultipleField(
        label="喜好",
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget = widgets.ListWidget(prefix_label=False),
        option_widget = widgets.CheckboxInput(),
        coerce = int,
        default = [1, 2]
    )

    def __init__(self,*args,**kwargs):  #这里的self是一个RegisterForm对象
        '''重写__init__方法'''
        super(RegisterForm,self).__init__(*args, **kwargs)  #继承父类的init方法
        self.favor.choices =((1, '篮球'), (2, '足球'), (3, '羽毛球'))  #吧RegisterForm这个类里面的favor重新赋值

    def validate_pwd_confim(self,field,):
        '''
        自定义pwd_config字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        '''
        # 最开始初始化时,self.data中已经有所有的值
        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证

@app.route('/register',methods=["GET","POST"])
def register():
    if request.method=="GET":
        form = RegisterForm(data={'gender': 1})  #默认是1,
        return render_template("register.html",form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():  #判断是否验证成功
            print('用户提交数据通过格式验证,提交的值为:', form.data)  #所有的正确信息
        else:
            print(form.errors)  #所有的错误信息
        return render_template('register.html', form=form)

if __name__ == '__main__':
    app.run()
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for item in form %}
    <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>

meta

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect, session
from wtforms import Form
from wtforms.csrf.core import CSRF
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
from hashlib import md5

app = Flask(__name__, template_folder='templates')
app.debug = True


class MyCSRF(CSRF):
    """
    Generate a CSRF token based on the user's IP. I am probably not very
    secure, so don't use me.
    """

    def setup_form(self, form):
        self.csrf_context = form.meta.csrf_context()
        self.csrf_secret = form.meta.csrf_secret
        return super(MyCSRF, self).setup_form(form)

    def generate_csrf_token(self, csrf_token):
        gid = self.csrf_secret + self.csrf_context
        token = md5(gid.encode('utf-8')).hexdigest()
        return token

    def validate_csrf_token(self, form, field):
        print(field.data, field.current_token)
        if field.data != field.current_token:
            raise ValueError('Invalid CSRF')


class TestForm(Form):
    name = html5.EmailField(label='用户名')
    pwd = simple.StringField(label='密码')

    class Meta:
        # -- CSRF
        # 是否自动生成CSRF标签
        csrf = True
        # 生成CSRF标签name
        csrf_field_name = 'csrf_token'

        # 自动生成标签的值,加密用的csrf_secret
        csrf_secret = 'xxxxxx'
        # 自动生成标签的值,加密用的csrf_context
        csrf_context = lambda x: request.url
        # 生成和比较csrf标签
        csrf_class = MyCSRF

        # -- i18n
        # 是否支持本地化
        # locales = False
        locales = ('zh', 'en')
        # 是否对本地化进行缓存
        cache_translations = True
        # 保存本地化缓存信息的字段
        translations_cache = {}


@app.route('/index/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        form = TestForm()
    else:
        form = TestForm(formdata=request.form)
        if form.validate():
            print(form)
    return render_template('index.html', form=form)


if __name__ == '__main__':
    app.run()

信号signals

flask框架的信号基于blink,其主要就是让开发者可以在flask请求过程中定制一些用户行为

安装

pip install blinker
from flask import signals

内置信号

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

使用信号

from flask import Flask,signals,render_template

app = Flask(__name__)

# 往信号中注册函数
def func(*args,**kwargs):
    print('触发型号',args,kwargs)
# 将函数绑定给信号
signals.request_started.connect(func)

# 触发信号: signals.request_started.send()
@app.before_first_request
def before_first1(*args,**kwargs):
    pass
@app.before_first_request
def before_first2(*args,**kwargs):
    pass

@app.before_request
def before_first3(*args,**kwargs):
    pass

@app.route('/',methods=['GET',"POST"])
def index():
    print('视图')
    return render_template('index.html')


if __name__ == '__main__':
    app.wsgi_app
    app.run()

信号触发点

a. before_first_request
b. 触发 request_started 信号
c. before_request
d. 模板渲染
    渲染前的信号 before_render_template.send(app, template=template, context=context)
        rv = template.render(context) # 模板渲染
    渲染后的信号 template_rendered.send(app, template=template, context=context)
e. after_request
f. session.save_session()
g. 触发 request_finished信号        
    如果上述过程出错:
        触发错误处理信号 got_request_exception.send(self, exception=e)
            
h. 触发信号 request_tearing_down

自定义信号

from flask import Flask, current_app, flash, render_template
from flask.signals import _signals
app = Flask(import_name=__name__)

# 自定义信号
# 信号名 = _signals.signal("信号名")
xxxxx = _signals.signal('xxxxx')
 
def func(sender, *args, **kwargs):
    print(sender)
# 自定义信号中注册函数
xxxxx.connect(func)
@app.route("/x")
def index():
    # 触发信号
    xxxxx.send('123123', k1='v1')
    return 'Index' 
 
if __name__ == '__main__':
    app.run()

SQLAlchemy

介绍

QLAlchemy是一个基于Python的ORM框架。该框架是建立在DB-API之上,使用关系对象映射进行数据库操作。
简而言之就是,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

什么是DB-API?
DB-API是Python的数据库接口规范。
在没有DB-API之前,各数据库之间的应用接口非常混乱,实现各不相同,
项目需要更换数据库的时候,需要做大量的修改,非常不方便,DB-API就是为了解决这样的问题。

安装

pip3 install sqlalchemy

组成部分

Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类
Schema/Types,架构和类型
SQL Exprression Language,SQL表达式语言

使用

连接数据库

SQLAlchemy本身无法操作数据库,其必须依赖遵循DB-API规范的第三方模块

Dialect用于和数据API进行交互,根据配置不同调用不同数据库API,从而实现数据库的操作

数据库API

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

Django中如何反向生成models

python manage.py inspectdb > app/models.py

连接数据库

from sqlalchemy import create_engine


# create_engine就是去建立连接,相当于我们pymsql建立连接的时候 conn= pymysql.connect(...)
conn = create_engine(
    "mysql+pymysql://root:123abc@127.0.0.1:3306/数据库名?charset=utf8mb4",
    max_overflow=0,   # 超过连接池大小外最多创建的连接数
    pool_size=5,      # 连接池大小
    pool_timeout=30,  # 连接池中没有线程最多等待时间,否则报错
    pool_recycle=-1,  # 多久之后对连接池中的连接进行回收(重置)-1不回收
)

执行原生SQL

from sqlalchemy import create_engine
conn = create_engine(
    "mysql+pymysql://root:123abc@127.0.0.1:3306/test?charset=utf8mb4",
    max_overflow=0,
    pool_size=5,
)

def test():
    ret = conn.execute("select * from MyTest")
    result = ret.fetchall()
    print(result)
    ret.close()

if __name__ == '__main__':
    test()

ORM

  • 创建表
# 1. 创建单表
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint
import datetime

ENGINE = create_engine("mysql+pymysql://root:123abc@127.0.0.1:3306/test?charset=utf8mb4",)

# Base是declarative_base的实例化对象
Base = declarative_base()


# 每个类都要继承Base
class UserInfo(Base):
    # __tablename__是必须要的,它是设置实际存在数据库中的表名
    __tablename__ = "user_info"

    # Column是列的意思,固定写法 Column(字段类型, 参数)
    # primary_key主键、index索引、nullable是否可以为空
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)

    # 相当于Django的ORM的class Meta,是一些元信息
    __table_args__ = (
        UniqueConstraint("id", "name", name="uni_id_name"),
        Index("name", "email")
    )


def create_db():
    # metadata.create_all创建所有表
    Base.metadata.create_all(ENGINE)


def drop_db():
    # metadata.drop_all删除所有表
    Base.metadata.drop_all(ENGINE)


if __name__ == '__main__':
    create_db()

参考博客

原文地址:https://www.cnblogs.com/fwzzz/p/12459061.html