Flask进阶编程

flask restful

1.flask与django对比

  • flask与django开发效率对比

    意义不大,当把常用插件都安装到flask那么flask与django基本差不多,只是flask没有配置插件,用户可以自定义选择相应插件,而这些插件为了帮助我们减少重复代码,方便开发。
    
  • django与flask谁更优秀

    gitHub Starts 数量几乎相同
    主流框架就是这两个
    优秀程度来说没有意义
    
  • 上手速度

    django什么都集成好了
    flask根据自己所需进行配置
    各有优劣。
    
  • 哪个框架更适合做大型项目

    大型项目都是跨语言,跨框架。这也没有意义
    
  • flask轻量级,django重量级,就像flask像小姐姐,django像御姐

  • 如果真的要比较和选择

    接受不了框架的数据,那么选flask
    
    想要完成功能,那么用django
    
  • 自己遇到问题debug调试

2.必备环境与软件

1.Python3.6 (虚拟环境pipenv)
2.Pycharm(开发工具)
3.mysql(数据库)
4.Navicat(数据库可视化工具)
5.PostMan(API测试工具)

3.flask版本

  • 使用flask 1.0版本
1.0 版本相比以往0.x版本,主要体现在代码重构,机制重写,废弃2.6与3.3

4.项目目录搭建

  • 项目基本目录

  • 代码:

    • ginger.py
    from app.app import create_app
    # 调用app create_app方法,获得app然后启动:
    app = create_app()
    
    if __name__ == '__main__':
        app.run(debug=True)
    
    • libs/redprint.py
    class Redprint:
        """
        复写蓝图方法,用于注册函数
        """
        def __init__(self,name):
            self.name = name
            self.mound = []
        def route(self, rule, **options):
        	# 定义路由
            def decorator(f):
                # f要执行的函数,rule为路由。
                self.mound.append((f, rule, options))
                return f
            return decorator
        def register(self,bp,url_prefix=None):
            if url_prefix is None:
                # 判断如果没有url_prefix,指定它的name为路由前缀
                url_prefix = '/' + self.name
            # 实现视图函数向蓝图注册
            for f, rule, options in self.mound:
                # 视图中有endpoint取endpoint,没有就取视图名字作为endpoint
                endpoint = options.pop("endpoint", f.__name__)
                # 添加路由
                bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
    
    • app/api/v1/__init__.py
    from flask import Blueprint
    from app.api.v1 import user, book
    def create_blueprint_v1():
        bp_v1 = Blueprint('v1',__name__)
        #将红图注册到蓝图中
        # user.api.register(bp_v1,url_prefix='/user')
        # book.api.register(bp_v1,url_prefix='/book')
        user.api.register(bp_v1)
        book.api.register(bp_v1)
        return bp_v1
    
    • 业务代码:book.py
    from app.libs.redprint import Redprint
    # 用Redprint 注册函数
    api = Redprint('book')
    @api.route('',methods=['GET'])
    def get_book():
        return 'I am book'
    
    @api.route('',methods=['POST'])
    def create_book():
        return 'create book'
    
  • 项目结构思路

首先自己注册Redprint,然后再每个分文件中都实例化一个Redprint对象,然后把Redprint注册到蓝图上,再把蓝图注册到flask核心对象中
  • 整个反向实现过程思路
首先在业务层面代码:实例化一个Redprint,用Redprint注册函数。

然后再 业务层面__init__里创建一个蓝图,把Redprint注册到Blueprint中,并且通过url_prefix增加一个url前缀

在app.py中,再一次把蓝图注册flask核心对象中,并且通过url_prefix增加一个url前缀

5.restful

  • 又称表述性状态转移,一种架构设计风格。将资源(如上面book,user),使用url定位资源,使用HTTP操作资源(GET,POST,PUT,DELETE)。

6.客户端:

  • 比如客户注册,除了传统意义上的用户,你应该考虑到第三方要调你的接口,APP,小程序等都可以要调用你的接口,所以你的设计不仅仅是用户。当然注册形式也是多样化的短信,邮件,QQ,微信等等。所以你要设计好的代码结构,这样让你的代码不会特别杂乱。

7.用户注册

  • 功能实现演示:

  • 提交数据首先进行表单验证,这里使用WTForm,因表单提交一般用于网页中,json数据提交一般用于移动端。那么我们提交表单,如何json传入什么参数,如何通过表单进行校验。

    from app.libs.enums import ClientTypeEnum
    from app.libs.error_code import Success
    from app.libs.redprint import Redprint
    from app.validators.forms import ClientForm, UserEmailForm
    from models.user import User
    
    api = Redprint('client')
    @api.route('/register', methods=['POST'])
    def create_create():
        # validate_for_api这里封装验证表单是否正确,不正确会自动抛出异常
        # 第一次验证,验证账号密码合法性
        form = ClientForm().validate_for_api()
        # if form.validate():
        # 如果新增其他注册类型如微信,公众号,QQ等,只需在promise定义枚举。
        # 再定义相应的函数像:__register_user_by_email即可
        # promise 为一个字典,key=枚举中键,value=相应验证的函数。
        # ClientTypeEnum 标识不同类型注册的枚举
        # 你也可以在promise定义其他类型注册,下面再定义相应方法。这里只用email进行演示
        promise = {
            ClientTypeEnum.USER_EMAIL: __register_user_by_email,
        }
        # 从form中取到用户是哪个类型注册,这里以email注册为例
        promise[form.type.data]()
        # # 引发一个异常,咱么可以自定义一个异常,此引发错误时html错误。
        # # 那么错误异常信息如何返回json格式,自定义错误异常json信息
        """
            有些异常我们时可以预知:已知异常 通过APIException
            有些异常我们时无法预知:未知异常
            # 通过AOP处理未知异常。通过装饰器捕捉未知异常
        """
        # 最终:我们可以接收定义时的复杂,但不能接受调用时候的复杂
        # 因为定义时候只定义一次。
        return Success()
    
    def __register_user_by_email():
        # 必须从form中拿到参数,因为form中数据是经过校验器校验
        # UserEmailForm,这里如果用户用email注册需要对email进行校验。
        # 第二次验证,验证email注册用户的email
        form = UserEmailForm().validate_for_api()
        User.reqister_by_email(form.nickname.data,
                               form.account.data,
                               form.secret.data)
    
    # 注意: 此时validate_for_api 为自定义的BaseForm中方法用于验证表单是否正确。不正确会抛出异常
    

8.form表单验证

  • 在validators中定义一个j基类BaseForm,默认继承Form
from flask import request
from wtforms import Form
# ParameterException 为自定义返回错误信息,后面会详细叙述
from app.libs.error_code import ParameterException

class BaseForm(Form):
    """
    重写form,让验证form验证继承它
    """
    def __init__(self):
        # 在基类中实例化时,获取到请求的数据,并转换json格式
        data = request.json
        # 继承父类的实例化方法,将请求数据传入form表单中
        super(BaseForm, self).__init__(data=data)
    # 这里定义的是一个全局的form表单验证,用于验证是否满足表单验证,不满足抛出异常,此时self为form对象
    def validate_for_api(self):
        valid = super(BaseForm, self).validate()
        print(valid)
        if not valid:
            raise ParameterException(msg=self.errors)
        # 将form返回
        return self
  • 其他form表单验证。其他表单的验证,都要直接间接继承父类
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
# Author Xu Junkai
# coding=utf-8
# @Time    : 2020/2/25 16:20
# @Site    :
# @File    : forms.py
# @Software: PyCharm
"""
from wtforms import StringField, IntegerField
from wtforms.validators import DataRequired, length, Email, Regexp, ValidationError

from app.libs.enums import ClientTypeEnum
from models.user import User
# 此处继承BaseForm
from app.validators.baseform import BaseForm as Form

class ClientForm(Form):
    # 定义账号验证规则
    account = StringField(validators=[DataRequired(message='没有输入账号'), length(
        min=5, max=32,message='密码长度要求6-32'
    )])
    # 定义密码验证规则,此处没有定义
    secret = StringField()
    # 定义用户是通过哪种方式注册的验证(email,qq...)这里注意我们采用是枚举方式定义注册类型,见下面代码
    type = IntegerField(validators=[DataRequired(message='没有输入类型'),])
    def validate_type(self,value):
        # 定义传入注册类型,必须是我们指定的类型,不是的话抛出异常
        try:
            client = ClientTypeEnum(value.data)
        except ValueError as e:
            raise e
        # 将枚举类型赋值给type
        self.type.data = client

class UserEmailForm(ClientForm):
    # 默认继承ClientForm,通过email注册的用户需要对email进行验证
    # 通过email注册方式,进行校验。这样减少重复代码编写
    account = StringField(
        validators=[Email(message='invalidate email')]
    )
    secret = StringField(validators=[
        DataRequired(),
        Regexp(r'^[A-Za-z0-9_*&$#@]{6,22}$')
    ])
    nickname = StringField(validators=[DataRequired(),
                                       length(min=2, max=22)])
    def validate_account(self,value):
        # 重复验证,从数据库查询有没有注册,有注册,引发一个wtf错误。
        if User.query.filter_by(email=value.data).first():
            raise ValidationError()

9.枚举的应用

  • 以往验证表单我们都是通过choice验证,通过枚举方式进行验证:libs/enums.py
from enum import Enum
class ClientTypeEnum(Enum):
    """
    继承Enum,定义一个枚举,用于不同形式客户端注册,(微信,EMAIL,QQ...)
    """
    # email注册
    USER_EMAIL = 100
    # 手机注册
    USER_MOBILE = 1001
    # 微信小程序
    USER_MINA = 200
    # 微信公众号
    USER_WX = 201

10.异常捕捉--->自定义

  • libs/error.py定义一个基类,用于异常返回相应格式的异常信息,类似如下:
{
    "msg": "sorry, we make a mistake (~~ _ ~~)",
    "error_code": 999,
    "request": "POST /v1/client/register"
}
  • 自定义,基类异常处理,都是默认配置,让其返回数据结构为json
import json
from flask import request
from werkzeug.exceptions import HTTPException
class APIException(HTTPException):
    """
    本身继承wekzeug的HTTPException
    要求返回错误信息格式:
    {
        "msg": "sorry, we make a mistake (~~ _ ~~)",
        "error_code": 999,
        "request": "POST /v1/client/register"
    }
    """
    # 默认code,msg,error_code  code为返回服务端状态码
    code = 500
    msg = 'sorry, we make a mistake (~~ _ ~~)'
    error_code = 999
    def __init__(self, msg=None, code=None, error_code=None, headers=None):
        if code:
            self.code = code
        if error_code:
            self.error_code = error_code
        if msg:
            self.msg = msg
        super(APIException,self).__init__(msg, None)

    def get_body(self, environ=None):
        """
        封装json格式,将数据转为我们所需要格式
        :param environ:
        :return:
        """
        body = dict(
            msg = self.msg,
            error_code = self.error_code,
            request = request.method + " " +self.get_url_no_param()
        )
        text = json.dumps(body)
        return text
    # 定义返回的Content-Type类型
    def get_headers(self, environ=None):
        """
        定义我们要返回数据类型为json
        :param environ: 
        :return: 
        """
        return [("Content-Type", "application/json")]
    @staticmethod
    def get_url_no_param():
        # 返回发生异常的url进行处理,去除 ? 后面的内容
        full_path = str(request.full_path)
        main_path = full_path.split("?")
        return main_path[0]

  • libs/error_code.py 自定义返回数据:错误码,信息,或是成功状态码...
from app.libs.error import APIException
# 400 请求参数错误
# 401 未授权
# 403 禁止访问
# 404 没有找到页面
# 500 服务器产生未知错误
# 200 查询成功
# 201 创建或更新成功
# 204 删除成功
# 301/302 重定向
class Success(APIException):
    """
    返回成功数据
    """
    code = 201
    msg = "ok"
    error_code = 0

class ServerError(APIException):
    """定义python中最原始的错误"""
    code = 500
    msg = "sorry, we made a mistake.(~__~!)"
    error_code = 999


class ClientTypeError(APIException):
    code = 400
    msg = 'client is invalid'
    error_code = 1006

class ParameterException(APIException):
    # 定义通用异常提示,APIException定义返回异常的格式
    code = 400
    msg = 'invalid parameter'
    error_code = 1000

class NotFound(APIException):
    code = 404
    msg = "the resource ar not found!"
    error_code = 1001

class AuthFailed(APIException):
    code = 401
    error_code = 1005
    msg = "authorization failed"

  • 未知错误异常捕捉,前面考虑到有些异常是我们可以捕捉的,但是有些未知异常我们是无法预测的。那么怎么办?
  • 这里再主程序入口文件,定义全局捕捉异常,通过装饰器方式
# 捕捉未知异常 只有在 flask 1.0版本及以上,可以捕捉通用异常
@app.errorhandler(Exception)
def framework_error(e):
    # 有可能是APIExcepiton
    # 有可能是HTTPException
    # 有可能是Excepiton  python中最原始的错误
    if isinstance(e,APIException):
        return e
    if isinstance(e,HTTPException):
        code = e.code
        msg = e.description
        error_code = 1007
        return APIException(msg, code, error_code)
    else:
        #这里进行判断,如果是调试模式,返回详细信息(因为开发节点需要看到详细信息)
        if not app.config['DEBUG']:
            # 当然此处可以进行日志记录
            # python中最原始的错误 就返回默认未知错误
            return ServerError
        else:
            raise e

11.数据库连接

  • 这里我们重写Query替换BaseQuery:,并自定义一个基本model.
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery
from sqlalchemy import Column, Integer, SmallInteger
from contextlib import contextmanager

from app.libs.error_code import NotFound


class SQLAlchemy(_SQLAlchemy):
    # 创建一个上下文管理器,可以用with语法
    @contextmanager
    def auto_commit(self):
        # 异常就回滚数据,并抛出异常
        try:
            yield
            self.session.commit()
        except Exception as e:
            db.session.rollback()
            raise e
class Query(BaseQuery):
    """
    覆盖查询类 重写filter_by方法
    """
    def filter_by(self,**kwargs):
        # 为每个model中加入status,只有等于1才会被找到
        if "status" not in kwargs.keys():
            kwargs['status'] = 1
        return super(Query,self).filter_by(**kwargs)
    def get_or_404(self, ident, description=None):
        # 重写get_or_404方法,没有找到触发自己定义异常。
        # 因原本get_or_404异常返回时一个html页面,但是我们想返回是一个json
        rv = self.get(ident)
        if not rv:
            raise NotFound()
        return rv
    def first_or_404(self, description=None):
        # 同理
        rv = self.first()
        if not rv:
            raise NotFound()
        return rv
# query_class指定自定义的Query,实例化一个db对象
db = SQLAlchemy(query_class=Query)


class Base(db.Model):
    """
    模型的基类,为所有模型添加create_time,status属性
    为方便好用:添加了模型公共方法,比如删除一个模型
    """
    __abstract__ = True
    create_time = Column(Integer)
    status = Column(SmallInteger,default=1)
    def __init__(self):
        # 实例化定义创建时间
        self.create_time = int(datetime.now().timestamp())
    @property
    def create_datetime(self):
        if self.create_time:
            return datetime.fromtimestamp(self.create_time)
        else:
            return None
    def set_attrs(self, attrs_dict):
        for key,value in attrs_dict.items():
            if hasattr(self, key) and key != 'id':
                setattr(self, key, value)
    def delete(self):
        self.status = 0

12.model定义User表

from sqlalchemy import Column, Integer, String, SmallInteger
from werkzeug.security import generate_password_hash, check_password_hash

from app.libs.error_code import NotFound, AuthFailed
from models.base import Base, db


class User(Base):
    # 定义字段名
    id = Column(Integer, primary_key=True)
    email = Column(String(24), unique=True,nullable=False)
    nickname = Column(String(24), unique=True)
    auth = Column(SmallInteger, default=1)
    _password = Column('password', String(100))
    @property
    def password(self):
        return self._password
    @password.setter
    def password(self,raw):
        # 将原始密码加密
        self._password = generate_password_hash(raw)

    # 注册代码,在一个对象下面创建对象本身,这样不是合理的,但是如果定义静态方法,是可行的
    @staticmethod
    def reqister_by_email(nickname, account, secret):
        # email注册数据提交数据库
        # 因之前创建db指定定义auto_commit对加上上下文管理器装饰器,所以这里可以用with语法
        with db.auto_commit():
            user = User()
            user.nickname = nickname
            user.email = account
            user.password = secret
            # 插入数据库中
            db.session.add(user)
    @staticmethod
    def verify(email, password):
        """
        主要做登陆的验证
        """
        # 验证用户是否存在
        user = User.query.filter_by(email=email).first_or_404()
        # 通过方法check_password,验证密码是否正确
        if not user.check_password(password):
            raise AuthFailed
        return {"uid": user.id}
    def check_password(self, raw):
        # 验证密码是否正确
        if not self._password:
            return False
        return check_password_hash(self._password, raw)

13.Token获取:

  • 通过用户输入账号,密码,登陆类型,获取token

  • 效果如下:

  • 视图定义:获取token

    from flask import current_app, jsonify
    
    from app.libs.enums import ClientTypeEnum
    from app.libs.redprint import Redprint
    from app.validators.forms import ClientForm
    from models.user import User
    #生成Token令牌
    from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
    api = Redprint("token")
    
    # 发起请求类型:
    # {"account":"xujunkaipy@163.com","secret":"123456","type":100} post
    # url = http://127.0.0.1:5000/v1/token
    @api.route('', methods=['POST'])
    def get_token():
        # 表单验证是否合法
        form = ClientForm().validate_for_api()
        # 定义验证登陆用户的方法,verify写在User类下用于验证账号密码是否正确
        promise = {
            ClientTypeEnum.USER_EMAIL: User.verify,
        }
        # 传入账号密码,进行验证。验证用户账号是否存在,密码是否正确
        identity = promise[ClientTypeEnum(form.type.data)](
            form.account.data,
            form.secret.data
        )
        # Token 生成 TOKEN_EXPIRATION 为配置文件设置过期时间
        expiration = current_app.config['TOKEN_EXPIRATION']
        # generate_auth_token生成令牌
        token = generate_auth_token(identity['uid'],
                                    form.type.data,
                                    None,
                                    expiration)
        response_token = {
            'token': token.decode('ascii')
        }
        # 返回token,返回http状态码 201
        return jsonify(response_token), 201
    
    
    def generate_auth_token(uid,ac_type,scope=None,expiration=7200):
        """
            生成令牌
            uid:用户id
            ac_type: 用户类型
            scope:权限作用域
            expiration: 令牌有效期
        """
        s = Serializer(current_app.config['SECRET_KEY'],
                       expires_in=expiration)
        return s.dumps(
            {
                "uid": uid,
                "type": ac_type.value
            }
        )
    
    

14.限制没有token进行api访问

  • 往往有些数据必须登陆才能看得到,怎样限制那些没有登陆用户访问。我们通过用户访问接口是否携带token,通过装饰器方式对进行token校验来确定用户是否登陆。这里我们使用HTTPBasicAuth一个第三方组件完成功能。

  • 显示效果:

    libs/token_auth.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    """
    # Author Xu Junkai
    # coding=utf-8
    # @Time    : 2020/2/26 15:10
    # @Site    :
    # @File    : token_auth.py
    # @Software: PyCharm
    """
    from collections import namedtuple
    
    from flask_httpauth import HTTPBasicAuth
    from itsdangerous import TimedJSONWebSignatureSerializer 
        as Serializer,BadSignature, SignatureExpired
    from flask import current_app, g
    
    from app.libs.error_code import AuthFailed
    # 实例化HTTPBasicAuth对象。
    auth = HTTPBasicAuth()
    # 定义一个namedtuple 用于解密token返回
    User = namedtuple("User",['uid', 'ac_type', 'scpoe'])
    @auth.verify_password
    def verify_password(token,password):
        # token验证
        # HTTPBasicAuth 规范要传递账号,密码的方式
        # key= Authorization
        # value = basic base64(账号:密码)
        """
        HTTPBasicAuth 规范可以将账号密码放到Authorization传入。
        传入格式是: basic + ' ' +base64(账号:密码)
        相当于:key=Authorization,value=basic + ' ' +base64(账号:密码)
        这里我们变通:只传入token, 这样函数第二个参数我们不用,
        通过获取到了token,执行verify_auth_token方法进行解密。解密失败抛出一系列异常
    
        """
        user_info = verify_auth_token(token)
        if not user_info:
            return False
        else:
            # 如果校验成功,把用户信息保存在g变量中,后续用
            g.user = user_info
            return True
    
    
    # 解密token
    def verify_auth_token(token):
        # 根据配置密钥解密
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except BadSignature:
            # 令牌不合法,抛出异常
            raise AuthFailed(msg='token is invalid', error_code=1002)
        except SignatureExpired:
            # 验证token是否过期
            raise AuthFailed(msg='token is expired', error_code=1003)
        uid = data['uid']
        ac_type = data['type']
        # 返回
        return User(uid, ac_type, '')
    
    
    • 在需要验证token的视图上面添加装饰器
    from app.libs.redprint import Redprint
    from app.libs.token_auth import auth
    from models.user import User
    
    api = Redprint('user')
    @api.route('/<int:uid>',methods=["GET"])
    @auth.login_required
    def get_user(uid):
        user = User.query.get_or_404(uid)
        return 'I am junKai'
    
  • 知识点:

    class A:
        name = "xjk"
        age = 18
        def __init__(self):
            self.gender = "male"
    print(A.__dict__)  # {"gender":"male"}
    #由上面代码可以看出来,__dict__方法是不可以把类变量转化成字典的,那么如何拿到类变量中键和值呢
    
    class A:
        name = "xjk"
        age = 18
        def __init__(self):
            self.gender = "male"
        def keys(self):
            # 首先会调用keys方法,获取keys
            return ("name", "age", "gender")
        def __getitem__(self, item):
            # 通过__getitem__ 让对象可以用[] 访问。
            # 通过getattr 获取对象值
            return getattr(self, item)
    o = A()
    # 通过dict 调用keys方法
    print(dict(o))
    # {'name': 'xjk', 'age': 18, 'gender': 'male'}
    

15.权限控制

  • 这里设计权限思路而不是把权限放在库里,因为当用户多了,频繁操作你的数据库,会给你数据库带来很多压力,这里就是把权限存在文件中,然后后端逻辑判断

  • 首先下面有一个视图

@api.route('/<int:uid>',methods=["GET"])
@auth.login_required
def super_get_user(uid):
    """
    超管用户获取用户信息,获取哪个都可以
    """
    user = User.query.filter_by(id = uid).first_or_404()
    return jsonify(dict(user))
  • 我们首先好奇的是,user为对象,我们如何通过jsonify将user转换成dict的。这里我们在组件的入口文件:也就是app.py中重写JSONEncoder方法实现的
from flask import Flask as _Flask
from flask.json import JSONEncoder as _JSONEncoder

from app.libs.error_code import ServerError
from datetime import date

class JSONEncoder(_JSONEncoder):
    def default(self, o):
        # default会被递归调用。因为如果对象的元素无法序列化,它会再调用default进行深层次的序列化
        # 判断有没有keys和getitem方法
        if hasattr(o, 'keys') and hasattr(o, "__getitem__"):
            return dict(o)
        # 没有的话,报一个服务错误。
        # 这里判断如果是时间,再对时间对象进行处理,当然你还可以定义其他对象处理方式比如uuid
        if isinstance(o, date):
            return o.strftime('%Y-%m-%d')
        raise ServerError()

class Flask(_Flask):
    # 将json_encoder替换掉自己写的JSONEncoder
    # 这样就能跑到自己定义的JSONEncoder中
    json_encoder = JSONEncoder
  • 首先改写model中User类的verify方法
class User(Base):
	...
	@staticmethod
    def verify(email, password):
        """
        主要做登陆的验证
        """
        # 验证用户是否存在
        user = User.query.filter_by(email=email).first_or_404()
        # 通过方法check_password,验证密码是否正确
        if not user.check_password(password):
            raise AuthFailed
        # 判断是不是管理员
        scope = "AdminScope" if user.auth == 2 else "UserScope"
        # 将结果返回
        return {"uid": user.id,"scope":scope}
  • 在token中通过generate_auth_token函数生成token时候,将当前用户的类型也加在token中
def generate_auth_token(uid,ac_type,scope=None,expiration=7200):
    """
        生成令牌
        uid:用户id
        ac_type: 用户类型
        scope:权限作用域
        expiration: 令牌有效期
    """
    s = Serializer(current_app.config['SECRET_KEY'],
                   expires_in=expiration)
    return s.dumps(
        {
            "uid": uid,
            "type": ac_type.value,
            "scope": scope
        }
    )

  • 在解密token时候:执行verify_auth_token函数进行解密,可以获取到当前用户是普通用户还是超级用户
# 解密token
def verify_auth_token(token):
    # 根据配置密钥解密
    s = Serializer(current_app.config['SECRET_KEY'])
    try:
        data = s.loads(token)
    except BadSignature:
        # 令牌不合法,抛出异常
        raise AuthFailed(msg='token is invalid', error_code=1002)
    except SignatureExpired:
        # 验证token是否过期
        raise AuthFailed(msg='token is expired', error_code=1003)
    uid = data['uid']
    ac_type = data['type']
    scope = data["scope"]
    # 可知道scope和request 所访问视图函数
    # 判断用户是否有权限
    allow = is_in_scope(scope,request.endpoint)
    print("allow",allow)
    if not allow:
        raise Forbidden()
    return User(uid, ac_type, scope)

  • 关键点在于 is_in_scope方法。它是用来判断用户权限的。Scope中定义2个列表allow_api用户存放视图函数的api,allow_module则用户存放一个py文件所有的允许api,这样,通过这两个权限粒度不同,能更好进行权限判断。
class Scope:
    allow_api = []
    # 一个py文件所有视图函数
    allow_module = []
    # 不允许访问的权限 为什么要设置forbidden,
    # 比如说,如果user.py视图函数有100个,只有2个不允许访问,这样需要在UserScope内的allow_api配置98个项
    # 显然这是费工夫的,如果我通过排除法做就会简单很多
    forbidden = []
    def __add__(self, other):
        # 运算符重造
        self.allow_api = self.allow_api + other.allow_api
        # 权限去重:
        self.allow_api = list(set(self.allow_api))
        # 最后一定要return 当前对象否则无法用链式方法书写

        # 红图级别也要支持相加,也就是user.py文件所有视图
        self.allow_module = self.allow_module + other.allow_module
        self.allow_module = list(set(self.allow_module))

        # 禁止访问
        self.forbidden = self.forbidden + other.forbidden
        self.forbidden = list(set(self.forbidden))
        return self


class AdminScope(Scope):
    # 超级用户
    allow_api = ['v1.user+super_get_user','v1.user+super_delete_user']
    allow_module = ["v1.user"]
    def __init__(self):
        # 链式方法添加UserScope和AdminScope权限
        self + UserScope()

class UserScope(Scope):
    # 普通用户
    allow_api = ["v1.user+get_user","v1.user+delete_user"]
    forbidden = ['v1.user+super_get_user','v1.user+super_delete_user']
    def __init__(self):
        self + AdminScope()
# class SuperScope(Scope):
#     allow_api = ["v1.C", "v1.D"]
#     allow_module = ["v1.user",]
#     def __init__(self):
#         # 链式方法添加UserScope和AdminScope权限
#         self + UserScope() + AdminScope()

def is_in_scope(scope, endpoint):
    # 首先需要让endpoint包含视图所对应模块,
    # 有些用户可以访问当前py文件所有的视图函数。通过moudle_name实现权限粒度更粗分配
    # 构建格式: 如果是视图函数 v1.view_func
    # 如果是模块的话: v1.module_name+view_func,其实module_name就是红图名字
    # 我们可以再红图做拼接
    """
    判断是否允许访问
    :param scope:
    :param endpoint:
    :return:
    """
    # scope 为 UserScope 或 AdminScope字符串
    #globals可以把当前变量,类,函数封装成一个字典
    # {"UserScope":UserScope,"AdminScope":AdminScope ...}
    # 通过 globals[]
    # 实例化相应类的对象
    scope = globals()[scope]()
    # scope.allow_api获取允许访问权限的接口
    # endpoint = v1.red_name+view_func
    splits = endpoint.split("+")
    # red_name = v1.red_name
    red_name = splits[0]
    if endpoint in scope.forbidden:
        # 首先检测是否是禁止访问的
        return False
    if endpoint in scope.allow_api:
        # 再判断视图函数是否再允许权限中
        return True
    if red_name in scope.allow_module:
        # 最后判断py文件内所有视图函数是否允许
        return True
    else:
        return False
  • 解密成功,并将用户信息保存在g变量中。
@auth.verify_password
def verify_password(token,password):
    # token验证
    # HTTPBasicAuth 规范要传递账号,密码的方式
    # key= Authorization
    # value = basic base64(账号:密码)
    """
    HTTPBasicAuth 规范可以将账号密码放到Authorization传入。
    传入格式是: basic + ' ' +base64(账号:密码)
    相当于:key=Authorization,value=basic + ' ' +base64(账号:密码)
    这里我们变通:只传入token, 这样函数第二个参数我们不用,
    通过获取到了token,执行verify_auth_token方法进行解密。解密失败抛出一系列异常

    """
    user_info = verify_auth_token(token)
    if not user_info:
        return False
    else:
        # 如果校验成功,把用户信息保存在g变量中,后续用
        g.user = user_info
        return True
  • 如果is_in_scope返回False则会引发一个错误,表示该用户没有权限
allow = is_in_scope(scope,request.endpoint)
if not allow:
	raise Forbidden()
原文地址:https://www.cnblogs.com/xujunkai/p/12384561.html