SpringCloud+Tornado基于jwt实现请求安全校验

项目背景

在实际项目中,Tornado项目作为一个微服务纳入SpringCloud体系,该过程中涉及到TornadoSpring体系的安全验证,也就是权限调用校验,在该项目中Tornado是通过SpringCloud中的Feign调用的,经过一系列实验,最后选用jwt来实现这个权限效验的过程。

实现思路

用户进行登陆认证(后台微服务),认证成功后调用Tornado项目的认证接口生成token,该值返回到后台微服务保存在会话中,下一次请求时带上该token值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。

项目结构

在这里插入图片描述

common - authServer.py是认证接口
common - basicServer.py是示例接口
handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出
utils - jwtUtils.py是生成与校验token的
utils - responseUtils.py是返回结果工具类

具体实现

  • jwtUtils.py
# -*- coding:utf-8 -*-
import jwt
import datetime
from jwt import exceptions
from utils.responseUtils import JsonUtils

JWT_SALT = '1qazxdr5'


def create_token(payload, timeout=12):
    """
    创建token
    :param payload:  例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息
    :param timeout: token的过期时间,默认20分钟
    :return:
    """
    headers = {
        'typ': 'jwt',
        'alg': 'HS256'
    }
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
    result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8')
    return result


def parse_payload(token):
    """
    对token进行校验并获取payload
    :param token:
    :return:
    """
    try:
        verified_payload = jwt.decode(token, JWT_SALT, True)
        print(verified_payload)
        return JsonUtils.success('认证通过')
    except exceptions.ExpiredSignatureError:
        return JsonUtils.noAuth('token已失效')
    except jwt.DecodeError:
        return JsonUtils.noAuth('token认证失败')
    except jwt.InvalidTokenError:
        return JsonUtils.noAuth('非法的token')

  • baseHandler.py
# -*- coding:utf-8 -*-
import functools
import json
import tornado.web
from utils import jwtUtils
from utils.responseUtils import JsonUtils


# 方式一:authenticated 装饰器
def authenticated(method):
    @functools.wraps(method)
    def wrapper(self, *args, **kwargs):
        """
        这里调用的是 current_user 的 get 方法(property装饰),
        """
        # 通过token请求头传递token
        head = self.request.headers
        token = head.get("token", "")
        if not token:
            self.write(JsonUtils.noAuth("未获取到token请求头"))
            self.set_header('Content-Type', 'application/json')
            return

        result = json.loads(jwtUtils.parse_payload(token))  # 将json解码
        print(result)
        token_msg = json.dumps(result)

        if result['sta'] != '00':
            self.write(token_msg)
            self.set_header('Content-Type', 'application/json')
            return

        return method(self, *args, **kwargs)
    return wrapper


# 方式二:进行预设 继承tornado的RequestHandler
class BaseHandler(tornado.web.RequestHandler):
    def prepare(self):
        super(BaseHandler, self).prepare()

    def set_default_headers(self):
        super().set_default_headers()


# 进行token校验,继承上面的BaseHandler
class TokenHandler(BaseHandler):
    def prepare(self):
        # 通过token请求头传递token
        head = self.request.headers
        token = head.get("token","")
        if not token:
            self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头"))

        result = json.loads(jwtUtils.parse_payload(token))  # 将json解码
        print(result)

        if result['sta'] != '00':
            self.isAuth = False
        else:
            self.isAuth = True

        self.authMsg = json.dumps(result)

  • authServer.py
import tornado.web
from utils import jwtUtils
from utils.responseUtils import JsonUtils


class authHandler(tornado.web.RequestHandler):
    def post(self, *args, **kwargs):
        """
        安全认证接口
        :param args:
        :param kwargs:
        :return:
        """
        username = self.get_argument("username")
        print("authHandler:" + username)

        if not username:
            self.write(JsonUtils.error("参数异常"))
        else:
            token = jwtUtils.create_token({"username": username})
            print("token:" + token)
            self.write(JsonUtils.success(token))

        self.set_header('Content-Type', 'application/json')


  • basicServer.py
import tornado.web
import json
from pandas.core.frame import DataFrame
from handlers import baseHandler
from utils.responseUtils import JsonUtils
from handlers.baseHandler import authenticated


class StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler):
    """
    *** TokenHandler验证,对应baseHandler.py中的方式二 ***
    """
    def get(self):
        username = self.get_argument('username', 'Hello')

        # 权限认证通过
        if self.isAuth:
            self.write(JsonUtils.success(username))
        else:
            self.write(self.authMsg)

        self.set_header('Content-Type', 'application/json')


class TestHandler(tornado.web.RequestHandler):
    """
    *** authenticated验证,对应baseHandler.py中的方式一 ***
    """
    @authenticated
    def post(self):
        username = self.get_argument('username', 'Hello')
        self.write(JsonUtils.success(username))
        self.set_header('Content-Type', 'application/json')
        
  • responseUtils.py
from tornado.escape import json_encode, utf8


class JsonUtils(object):
    @staticmethod
    def success(response):
        """
        正确返回
        :param response: 返回结果
        :return: string, {"message": "ok", "sta": "00", "data": }
        """
        return json_encode({"message": "ok", "sta": "00", "data": response})

    @staticmethod
    def info(message):
        """
        提示返回
        :param message: 提示信息
        :return: string,
        """
        return json_encode({"message": str(message), "sta": "99001", "data": None})

    @staticmethod
    def error(message):
        """
        错误返回
        :param message: 错误信息
        :return: string,
        """
        return json_encode({"message": str(message), "sta": "9999", "data": None})

    @staticmethod
    def noAuth(message):
        """
        无权限返回
        :param message: 错误信息
        :return: string,
        """
        return json_encode({"message": str(message), "sta": "403", "data": None})

  • 下面是一些调用的结果图示:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

.end

原文地址:https://www.cnblogs.com/maggieq8324/p/14159709.html