flask刷新token

我们在做前后端分离的项目中,最常用的都是使用token认证。

登录后将用户信息,过期时间以及私钥一起加密生成token,但是比较头疼的就是token过期刷新的问题,因为用户在登录后,如果在使用过程中,突然提示token过期了,需要重新登录,会觉得很奇怪。

我使用的方式,是在解析token的时候,如果token过期了,但是还在可刷新的时间范围内,我们应该自动刷新token,并将新的token返回给用户。

但是如果前端采用异步请求,同时发来了多个接口的话,我们不可能对每个请求的token都进行刷新。

我的解决方案是,将过期但还在刷新范围的token存入redis,同时设置token的过期时间为可刷新时间,过了可刷新时间,token就会被自动删除

当前端多个请求过来时,会对请求带来的token进行验证,分三种情况:

  1)如果token已经过了刷新时间,则抛出异常。

  2)如果token不在redis中,表示刚刚过期,还没有进行刷新token操作,需要刷新token。

  3)如果token在redis中,则权限默认通过。

下面上代码:

  1)为了给token加上可刷新时间,需要重写TimedJSONWebSignatureSerializer 的make_header和loads方法

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer_
import redis
r = redis.Redis(host="127.0.0.1", port=6379,db=0)
class Serializer(Serializer_):
    def __init__(self, secret_key, expires_in=None, **kwargs):
        self.expires_in = expires_in
        super(Serializer, self).__init__(secret_key, expires_in, **kwargs)

    def make_header(self, header_fields):
        header = JSONWebSignatureSerializer.make_header(self, header_fields)
        iat = self.now()
        exp = iat + self.expires_in
        refresh_exp = iat+current_app.config["REFRESH_TIME"]
        header["iat"] = iat
        header["exp"] = exp
        header["refresh_exp"] = refresh_exp
        return header

    def loads(self, s, salt=None, return_header=False):
        payload, header = JSONWebSignatureSerializer.loads(
            self, s, salt, return_header=True
        )

        if "exp" not in header:
            raise BadSignature("Missing expiry date", payload=payload)

        int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
        try:
            header["exp"] = int(header["exp"])
        except ValueError:
            raise int_date_error
        if header["exp"] < 0:
            raise int_date_error
        now = self.now()
        if header["exp"] < now:
            if header["refresh_exp"]<now:
                # 已经过了可刷新时间,直接抛出异常
                raise SignatureExpired(
                    "Signature expired",
                    payload=payload,
                    date_signed=self.get_issue_date(header),
                )
            else:
                # TODO 增加判断,看是否有存储在redis中,如果有存储过,表示token已经被刷新过了,直接放行即可。
                if r.get(s):
                    return payload
                pxt = header["refresh_exp"] - now
                if pxt>0:
                    r.set(s, header["exp"], px = pxt)
                # 还在可刷新时间内
                # 生成新的token返回给前端
                serializer = Serializer(current_app.config["SECRET_KEY"], expires_in=self.expires_in)
                # 调用serializer的dumps方法将uid和type写入生成token
                token = serializer.dumps(payload)
                res = make_response()
                res.headers["Authorization"] = token
                res.set_cookie("authorization",token.decode("ascii"))
                return payload, token
        if return_header:
            return payload, header
        return payload 

  2)认证权限

auth = HTTPBasicAuth()
user = namedtuple("User",["uid","type","scope"])

@auth.verify_password
def check_authorization(token, pwd):
    user_info = check_auth_token(token)
    if not user_info:
        return False
    else:
        if isinstance(user_info, tuple):
            user_info_ = user_info[0]
            token = user_info[1]
        else:
            user_info_ = user_info
        g.user = user_info_
        return True if not token else token


def check_auth_token(token):
    serialzer = Serializer(current_app.config["SECRET_KEY"])
    try:
        s = serialzer.loads(token)
    except BadSignature:
        raise AuthFailed(msg="token is invalid", error_code=1004)
    except SignatureExpired:
        raise AuthFailed(msg="token is expired", error_code=1004)
    token = ""
    if isinstance(s, tuple):
        u_info = s[0]
        token = s[1]
    else:
        u_info = s
    uid = u_info["uid"]
    type = u_info["type"]
    scope = u_info["scope"]
    return user(uid, type, scope), token

  3)生成新的token后,将新的token放入response的header中,前端人员从response header中去取authorization

@api.router("/get/<int:uid>")
@auth.login_required
def get_user(uid, token=None):
    user = User.query.get_or_404(uid)
    res_json = jsonify(user)
    res = make_response(res_json)
    res.headers["Authorization"] = token
    return res

  

  

原文地址:https://www.cnblogs.com/fiona-zhong/p/10315499.html