JWT认证

JWT认证

什么是JWT?

JWT全称(json web token),主要由三段信息构成,将这三段文本用‘.’链接在一起就构成了JWT字符串,

第一部分信息称为头部(header),第二部分称为载荷(payload),第三部分称为签证(signature)

header

header包括声明类型声明加密算法

{
    'type':'JWT',
    'alg':'HS256'
}

payload

载荷是存放有效信息的地方,包括:

(1)标准中注册的声明
(2)公共的声明
(3)私有的声明

exp:
    {
        "sub":"1234567890",
        "name":"surpass",
        "admin":True
    }

JWT的前两部分都可以使用base64进行加密

signature

签证信息由三个部分组成

header(base64 加密后的)
payload(base64 加密后的)
secret

JWT算法的本质原理:签发与校验

"""
1)jwt分三段式:头.体.签名 (head.payload.sgin)
2)头和体是可逆加密,让服务器可以反解出user对象;签名是不可逆加密,保证整个token的安全性的
3)头体签名三部分,都是采用json格式的字符串,进行加密,可逆加密一般采用base64算法,不可逆加密一般采用hash(md5)算法
4)头中的内容是基本信息:公司信息、项目组信息、token采用的加密方式信息
{
	"company": "公司信息",
	...
}
5)体中的内容是关键信息:用户主键、用户名、签发时客户端信息(设备号、地址)、过期时间
{
	"user_id": 1,
	...
}
6)签名中的内容时安全信息:头的加密结果 + 体的加密结果 + 服务器不对外公开的安全码 进行md5加密
{
	"head": "头的加密字符串",
	"payload": "体的加密字符串",
	"secret_key": "安全码"
}
"""

签发:

# 根据登录请求提交来的 账号 + 密码 + 设备信息 签发 token
"""
1)用基本信息存储json字典,采用base64算法加密得到 头字符串
2)用关键信息存储json字典,采用base64算法加密得到 体字符串
3)用头、体加密字符串再加安全码信息存储json字典,采用hash md5算法加密得到 签名字符串

账号密码就能根据User表得到user对象,形成的三段字符串用 . 拼接成token返回给前台
"""

校验:

"""
1)将token按 . 拆分为三段字符串,第一段 头加密字符串 一般不需要做任何处理
2)第二段 体加密字符串,要反解出用户主键,通过主键从User表中就能得到登录用户,过期时间和设备信息都是安全信息,确保token没过期,且时同一设备来的
3)再用 第一段 + 第二段 + 服务器安全码 不可逆md5加密,与第三段 签名字符串 进行碰撞校验,通过后才能代表第二段校验得到的user对象就是合法的登录用户
"""

总结

# DRF项目的jwt认证开发流程(重点)
"""
1)用账号密码访问登录接口,登录接口逻辑中调用 签发token 算法,得到token,返回给客户端,客户端自己存到cookies中

2)校验token的算法应该写在认证类中(在认证类中调用),全局配置给认证组件,所有视图类请求,都会进行认证校验,所以请求带了token,就会反解出user对象,在视图类中用request.user就能访问登录的用户

注:登录接口需要做 认证 + 权限 两个局部禁用
"""

(1)安装第三方的JWT模块

pip install djangorestframework-jwt

(2)新建一个user表继承于AbstractUser

from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
    mobile = models.CharField(max_length=16, unique=True, verbose_name='手机号')
    gender = models.IntegerField(choices=GENDER_CHOICES, default=0, verbose_name='性别')
    icon = models.ImageField(upload_to=path_and_rename, verbose_name='个人头像')
    is_delete = models.BooleanField(default=False)

(3)创建超级用户

python manage.py createsuperuser

简单的使用

from rest_framework_jwt.views import obtain_jwt_token,ObtainJSONWebToken,VerifyJSONWebToken,RefreshJSONWebToken
urlpatterns = [
    # 登录接口认证
    path('login/', obtain_jwt_token),
]

自定义auth类的使用

from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_jwt.authentication import jwt_decode_handler


class LoginToken(BaseJSONWebTokenAuthentication):

    def authenticate(self, request):
        jwt_value = str(request.META.get('HTTP_AUTHENTICATION'))  # 获取token
        try:
            payload = jwt_decode_handler(jwt_value)  # 获取载荷
        except Exception:
            raise AuthenticationFailed("认证失败")
        user = self.authenticate_credentials(payload) # 获取user信息
        return user, jwt_value

局部使用

LoginView

class LoginView(GenericViewSet):
    queryset = User.objects.all()
    serializer_class = LoginSerializer

    def login(self, request, *args, **kwargs):
        login_ser = self.get_serializer(data=request.data, context={})
        login_ser.is_valid(raise_exception=True)
        token = login_ser.context.get('token')
        user = login_ser.context.get('user')
        return APIResponse(code=100, msg='登录成功', data={'token': token, 'username': user.username})

LoginSerializer

class LoginSerializer(serializers.ModelSerializer):
    username = serializers.CharField(max_length=32)  # 这里需要重新覆盖username字段,因为是post请求,Loginserializer会先去models中校验,因为username在数据库中国是unique的

    class Meta:
        model = models.User
        fields = ['username', 'password']

    def validate(self, attrs):
        username = attrs.get('username')
        password = attrs.get('password')
        if not models.User.objects.filter(username=username):
            raise ValidationError('用户不存在')
        else:
            user = models.User.objects.filter(username=username).first()
            if user.check_password(password):
                payload = jwt_payload_handler(user)  # 把user传入,得到payload
                token = jwt_encode_handler(payload)  # 把payload传入得到token
                self.context['token'] = token  # 利用context传递参数
                self.context['user'] = user
                return attrs
            else:
                raise ValidationError('密码错误')

auth

from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_jwt.authentication import jwt_decode_handler


class LoginToken(BaseJSONWebTokenAuthentication):

    def authenticate(self, request):
        jwt_value = str(request.META.get('HTTP_AUTHORIZATION'))  # 获取token
        print(jwt_value)
        try:
            payload = jwt_decode_handler(jwt_value)  # 获取载荷
            print(payload)
        except Exception as e:
            print(str(e))
            raise AuthenticationFailed("认证失败")
        user = self.authenticate_credentials(payload)  # 获取user信息
        return user, jwt_value

queryview

class QueryUserView(GenericViewSet, RetrieveModelMixin):
    """
    查询接口
    """
    queryset = User.objects.all()
    serializer_class = UserSerializer
    pk = None
    # throttle_classes = [IPThrottle, ]
    authentication_classes = [LoginToken, ]  # 先进行登录校验
    permission_classes = [IsAuthenticated, ] # 在判断用户是否是匿名用户

    def get_object(self):
        queryset = self.filter_queryset(self.get_queryset())
        filter_kwargs = {self.lookup_field: self.pk}
        return queryset.filter(**filter_kwargs)

    def get_queryset(self):
        queryset = self.queryset
        if isinstance(queryset, QuerySet):
            queryset = queryset.all()
        return queryset.filter(is_delete=False)

    def retrieve(self, request, *args, **kwargs):
        self.pk = kwargs.get('pk')
        if not self.pk:
            return APIResponse(code='102', msg='查询失败', data={'result': '缺少主键值'})
        if not self.get_object():
            return APIResponse(code='102', msg='查询失败', data={'result': '无效的主键值'})
        instance = self.get_object().first()
        serializer = self.get_serializer(instance=instance)
        return APIResponse(code='102', msg='查询成功', data=serializer.data)

    def query(self, request, *args, **kwargs):
        return self.retrieve(request, *args, **kwargs)

全局使用

REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES":["utils.auth.LoginToken",],
    "DEFAULT_PERMISSION_CLASSES":["rest_framework.permissions.IsAuthenticated"]
}

# 在login和register等视图中需要局部禁用
class LoginView(GenericViewSet, RetrieveModelMixin):
    queryset = User.objects.all()
    serializer_class = UserSerializer
    pk = None

    authentication_classes = []  # 局部禁用
    permission_classes = [] # 局部禁用
原文地址:https://www.cnblogs.com/surpass123/p/13307370.html