Flask ==》 信号
Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为。
pip3 install blinker
引子:
1.预留了几个位置以列表储存,默认为空。 2.某一天,在某个位置写了个函数, 3,重新走一遍流程,发现列表里面有东西,就会进去循环执行一遍。
flask的源码: 简单来说就是 设定了固定的流程,按流程走。但是,某些位置,它预留了点东西。
例如:赛车的路上,是固定的赛程,途中有的休息,换胎等等,这些操作,可以理解为是 “信号”。
现在,我们一起来找 flask 源码 的哪个地方使用了 信号?
flask 里面的信号总共有多少个呢 ? 总共: 10 个
内置信号如下:
request_started = _signals.signal('request-started') # 请求到来前执行 request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 请求上下文push时执行 appcontext_popped = _signals.signal('appcontext-popped') # 请求上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发 独立的 flash闪现
前两个示例:(request-started,request-finished)
from flask import Flask from flask.signals import request_started from flask.signals import request_finished app=Flask(__name__) def aaa(*args,**kwargs): print('A加油站',args,kwargs) def bbb(*args,**kwargs): print('B加油站',args,kwargs) request_started.connect(aaa) request_finished.connect(bbb) @app.route('/index') def index(): print('index ') return 'xxx' if __name__ == '__main__': app.run()
打印如下:
request-started.send() # 执行注册到列表中的所有信号。
一般用到最多的是 请求开始和请求结束。
1. 特殊的装饰器和信号有什么区别?
回答: 信号被触发是通过 “send” 来触发的, 它没有没拿返回值。
特殊的装饰器写不写返回值就有要求了,如 before_request 有返回值的话,后面就不执行了。
以此看来,特殊的装饰器功能更强大。
2.通过信号可以做权限吗?
回答: 不能,信号本身自己做不了。特殊装饰器直接拿返回值。
3.信号用于做什么?
回答: 只限于做操作。如:通过自定义的信号,可以降低代码之间的耦合。
请求前,请求后,处理上下文后,定制写没有返回值的操作。
自定义信号:
from flask import Flask from flask.signals import _signals app=Flask(__name__) aray = _signals.signal('aray') def aaa(*args,**kwargs): print('A加油站',args,kwargs) def bbb(*args,**kwargs): print('B加油站',args,kwargs) aray.connect(aaa) aray.connect(bbb) @app.route('/index') def index(): #触发这个信号:执行注册到列表中的所有函数 #意义: 如 发送短信,邮件,微信 可以在上面写成对应的函数处理,注册一个发一个, # 这样,自己就不用写那几个类,工厂模式不用谢,代码简单,降低耦合。 aray.send(sender='xxx',a1=123,a2=234) return 'xxx' if __name__ == '__main__': app.__call__() app.run()
Flask ==》 flask-session
pip3 install flask_session
pip3 install redis
目的:open_session, save_session 放到redis里面使用。
要重写:open_session,save_session 里面的方法,该怎么办?
app.session_interface = Foo() # 重写 open_session, save_session
存储方式:redis
#!/usr/bin/env python # -*- coding:utf-8 - import redis from flask import Flask, session from flask_session import Session app = Flask(__name__) app.debug = True app.secret_key = 'xxxx' app.config['SESSION_TYPE'] = 'redis' # session类型为redis app.config['SESSION_PERMANENT'] = False # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 app.config['SESSION_REDIS'] = redis.Redis(host='127.0.0.1', port='6379', password='123123') # 用于连接redis的配置 Session(app) #实例化 @app.route('/index') def index(): session['k1'] = 'v1' #按这样整个写下来,这个值就保存到redis了 return 'xx' if __name__ == '__main__': app.run()
请求先进来:
里面的open_session方法。
第二次进来: 运行 save_session:
def save_session(self, app, session, response): domain = self.get_cookie_domain(app) #拿到域名 path = self.get_cookie_path(app) #拿到路径 if not session: if session.modified: self.redis.delete(self.key_prefix + session.sid) response.delete_cookie(app.session_cookie_name, domain=domain, path=path) return # Modification case. There are upsides and downsides to # emitting a set-cookie header each request. The behavior # is controlled by the :meth:`should_set_cookie` method # which performs a quick check to figure out if the cookie # should be set or not. This is controlled by the # SESSION_REFRESH_EACH_REQUEST config flag as well as # the permanent flag on the session itself. # if not self.should_set_cookie(app, session): # return httponly = self.get_cookie_httponly(app) #获取日期 secure = self.get_cookie_secure(app) #获取日期 expires = self.get_expiration_time(app, session) #获取日期 val = self.serializer.dumps(dict(session)) #里面的session是个字典,进行序列化 #self.redis是 redis连接 它有字典,列表 #conn.set('xxx':'{k1:v1}',ex = '超时时间') #conn.setex('xxx':'{k1:v1}') value值:是传过来的 key: session+随机字符串 self.redis.setex(name=self.key_prefix + session.sid, value=val, time=total_seconds(app.permanent_session_lifetime)) if self.use_signer: #是否加密 session_id = self._get_signer(app).sign(want_bytes(session.sid)) #self._get_signer 跳入(加密) else: #不加密 session_id = session.sid response.set_cookie(app.session_cookie_name, session_id, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure) #写到用户浏览器的cookie: key=session value=sid
def open_session(self, app, request): sid = request.cookies.get(app.session_cookie_name) #sid: cookie里是否含有session if not sid: #一开始没有 sid = self._generate_sid() #生成一个sid return self.session_class(sid=sid, permanent=self.permanent) #session_class 继承了字典的方法 if self.use_signer: #解密 signer = self._get_signer(app) if signer is None: return None try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid, permanent=self.permanent) if not PY2 and not isinstance(sid, text_type): sid = sid.decode('utf-8', 'strict') val = self.redis.get(self.key_prefix + sid) if val is not None: try: data = self.serializer.loads(val) return self.session_class(data, sid=sid) except: return self.session_class(sid=sid, permanent=self.permanent) return self.session_class(sid=sid, permanent=self.permanent)
memcached:
redis把数据放到内存里,什么时候引入redis? redis实在引入自动分配的时候用到。
分配订单时一次一次去数据取,为了防止多次数据操作,故而写在了内存里, 但是,重启,多线程又出现问题了。
那么怎么办? 加了redis之后,即没有数据库取的慢,又不会出现重启之后,多线程下重新分配了。
小结: redis就是在另外一台机器的内存放到数据。
这里: mamcached就和 redis 类似。
他们两的区别:
redis: 可含有5大数据类型。
memcached: 只有字符串一个。
#!/usr/bin/env python # -*- coding:utf-8 - import redis from flask import Flask, session from flask_session import Session import memcache app = Flask(__name__) app.debug = True app.secret_key = 'xxxx' app.config['SESSION_TYPE'] = 'memcached' # session类型为redis app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 app.config['SESSION_MEMCACHED'] = memcache.Client(['10.211.55.4:12000']) Session(app) @app.route('/index') def index(): session['k1'] = 'v1' return 'xx' if __name__ == '__main__': app.run()
filesystem:
#!/usr/bin/env python # -*- coding:utf-8 - import redis from flask import Flask, session from flask_session import Session app = Flask(__name__) app.debug = True app.secret_key = 'xxxx' app.config['SESSION_TYPE'] = 'filesystem' # session类型为redis app.config[ 'SESSION_FILE_DIR'] = '/Users/wupeiqi/PycharmProjects/grocery/96.Flask新课程/组件/2.flask-session' # session类型为redis app.config['SESSION_FILE_THRESHOLD'] = 500 # 存储session的个数如果大于这个值时,就要开始进行删除了 app.config['SESSION_FILE_MODE'] = 384 # 文件权限类型 app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 Session(app) @app.route('/index') def index(): session['k1'] = 'v1' session['k2'] = 'v1' return 'xx' if __name__ == '__main__': app.run()
mongodb: 关系型数据库
集合 == 表
例如: {'k1':'v1'}
{'k2':'v2','k3':'v3'}
#!/usr/bin/env python # -*- coding:utf-8 - import redis from flask import Flask, session from flask_session import Session import pymongo app = Flask(__name__) app.debug = True app.secret_key = 'xxxx' app.config['SESSION_TYPE'] = 'mongodb' # session类型为redis app.config['SESSION_MONGODB'] = pymongo.MongoClient() app.config['SESSION_MONGODB_DB'] = 'mongo的db名称(数据库名称)' app.config['SESSION_MONGODB_COLLECT'] = 'mongo的collect名称(表名称)' app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 Session(app) @app.route('/index') def index(): session['k1'] = 'v1' session['k2'] = 'v1' return 'xx' if __name__ == '__main__': app.run()
mongodb操作简单示例:
#!/usr/bin/env python # -*- coding:utf-8 -*- from pymongo import MongoClient # 创建链接 conn = MongoClient('47.93.4.198', 27017) # 选择数据库 db = conn['db1'] # 选择表 posts = db['posts'] post_data = { 'name': 'alex', 'age': 18 } # 表中插入数据 # result = posts.insert_one(post_data) # 获取一条数据 # row = posts.find_one() # print(row) # # 获取多条数据 # rows = posts.find() # for row in rows: # print(row) # 删除多条数据 # rows = posts.delete_many(filter={}) # print(rows) # 更新多条数据 # posts.update({}, {'name': 'wupeiqi'})
sqlalchemy: ORM 相当于把数据库存在表里
#!/usr/bin/env python # -*- coding:utf-8 - import redis from flask import Flask, session from flask_session import Session as FSession from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.debug = True app.secret_key = 'xxxx' # 设置数据库链接 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123@127.0.0.1:3306/fssa?charset=utf8' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True # 实例化SQLAlchemy db = SQLAlchemy(app) app.config['SESSION_TYPE'] = 'sqlalchemy' # session类型为sqlalchemy app.config['SESSION_SQLALCHEMY'] = db # SQLAlchemy对象 app.config['SESSION_SQLALCHEMY_TABLE'] = 'session' # session要保存的表名称 app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 FSession(app) @app.route('/index') def index(): session['k1'] = 'v1' session['k2'] = 'v1' return 'xx' if __name__ == '__main__': app.run()
PS: 在写好代码后,不要着急运行,需要先执行进入终端执行一条创建数据库表的命令:
bogon:pro-flask wupeiqi$ python3 Python 3.5.1 (v3.5.1:37a07cee5969, Dec 5 2015, 21:12:44) [GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> from app import db >>> db.create_all() >>>
信号示例: