DRF的认证权限频率组件

一.DRF的认证组件

token

  学习过使用cookie和session两种方式可以保存用户信息,这两种方式不同的是cookie保存在客户端浏览器中,而session保存在服务器中,他们各有优缺点,配合起来使用,可将重要的敏感的信息存储在session中,而在cookie中可以存储不太敏感的数据。

token认证的大致步骤:

  • 用户登录,服务器端获取用户名密码,查询用户表,如果存在该用户且第一次登录(或者token过期),生成token,否则返回错误信息
  • 如果不是第一次登录,且token未过期,更新token值

model.py

from django.db import models

# Create your models here.


class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    user_type_entry = (
        (1, 'Delux'),
        (2, 'SVIP'),
        (3, "VVIP")
    )
    user_type = models.IntegerField(choices=user_type_entry)
    address = models.CharField(max_length=32)

    def __str__(self):
        return self.username


class UserToken(models.Model):
    user = models.OneToOneField("User", on_delete=models.CASCADE)
    token = models.CharField(max_length=128)

我们无需实现get方法,因为涉及登录认证,所有写post方法接口,登录都是post请求,视图类如下所示:

from django.http import JsonResponse

from rest_framework.views import APIView

from .models import User, Book, UserToken
from .utils import get_token


class UserView(APIView):

    def post(self, request):
        response = dict()
        try:
            username = request.data['username']
            password = request.data['password']

            user_instance = User.objects.filter(
                user_name=username,
                password=password
            ).first()

            if user_instance:
                access_token = get_token.generater_token()

                UserToken.objects.update_or_create(user=user_instance, defaults={
                    "token": access_token
                })
                response["status_code"] = 200
                response["status_message"] = "登录成功"
                response["access_token"] = access_token
                response["user_role"] = user_instance.get_user_type_display()
            else:
                response["status_code"] = 201
                response["status_message"] = "登录失败,用户名或密码错误"
        except Exception as e:
            response["status_code"] = 202
            response["status_message"] = str(e)

        return JsonResponse(response)

获取随机字符串的方法用来生成token值:

import uuid


def generater_token():
    random_str = ''.join(str(uuid.uuid4()).split('-'))
    return random_str

DRF认证组件的使用

  定义一个认证类

class UserAuth(object):

    def authenticate_header(self, request):
        pass

    def authenticate(self, request):
        user_post_token = request.query_params.get('token')

        token_object = UserToken.objects.filter(token=user_post_token).first()
        if token_object:
            return token_object.user.username, token_object.token
        else:
            raise APIException("认证失败")

  在需要认证的数据接口里面指定认证类

class BookView(ModelViewSet):

    authentication_classes = [UserAuth, UserAuth2]

    queryset = Book.objects.all()
    serializer_class =  BookSerializer

DRF认证源码解析

执行self.initial()方法
执行self.perform_authentication(request),方法,注意,新的request对象被传递进去了
该方法只有一行request.user,根据之前的经验,解析器(request.data),我们知道这个user肯定也是request对的一个属性方法
所料不错,该方法继续执行self._authenticate(),注意此时的self是request对象
该方法会循环self.authenticators,而这个变量是在重新实例化request对象时通过参数传递的
传递该参数是通过get_authenticatos()的返回值来确定的,它的返回值是
[ auth for auth in self.authentication_classes ]
也就是我们的BookView里面定义的那个类变量,也就是认证类
一切都明朗了,循环取到认证类,实例化,并且执行它的authenticate方法
这就是为什么认证类里面需要有该方法
如果没有该方法,认证的逻辑就没办法执行
至于类里面的header方法,照着写就行,有兴趣的可以研究源码,这里就不细究了
该方法如果执行成功就返回一个元组,执行完毕
如果失败,它会捕捉一个APIException
如果我们不希望认证通过,可以raise一个APIException
View Code

多个认证组件的使用

class UserAuth2(object):

    def authenticate(self, request):
        raise APIException("认证失败")


class UserAuth(object):

    def authenticate_header(self, request):
        pass

    def authenticate(self, request):
        user_post_token = request.query_params.get('token')

        token_object = UserToken.objects.filter(token=user_post_token).first()
        if token_object:
            return token_object.user.username, token_object.token
        else:
            raise APIException("认证失败")


class BookView(ModelViewSet):

    authentication_classes = [UserAuth, UserAuth2]

  注意:如果需要返回什么数据,请在最后一个认证类中返回,因为如果在前面返回,在self._authentication()方法中会对返回值进行判断,如果不为空,认证的过程就会中止

  如果不希望每次都写那个无用的authenticate_header方法,继承BaseAuthentication类即可。

from rest_framework.authentication import BaseAuthentication

class UserAuth2(BaseAuthentication):

    def authenticate(self, request):
        raise APIException("认证失败")


class UserAuth(BaseAuthentication):

    def authenticate(self, request):
        user_post_token = request.query_params.get('token')

        token_object = UserToken.objects.filter(token=user_post_token).first()
        if token_object:
            return token_object.user.user_name, token_object.token
        else:
            raise APIException("认证失败")

全局认证组件

  如果认证类自己没有authentication_classes,就会到settings中去找,通过这个机制,我们可以将认证类写入到settings文件中即可实现全局认证。

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'authenticator.utils.authentication.UserAuth',
        'authenticator.utils.authentication.UserAuth2',
    ),
}

二.权限组件

  定义一个权限类

class UserPerms():
    message = "您没有权限访问该数据"
    def has_permission(self, request, view):
        if request.user.user_type > 2:
            return True
        else:
            return False

  在需要认证的数据接口里面指定权限类

class BookView(ModelViewSet):

    authentication_classes = [UserAuth]
    permission_classes = [UserPerms2]

    queryset = Book.objects.all()
    serializer_class =  BookSerializer

三.频率组件的使用

  定义一个频率类

import time
import math

from rest_framework import exceptions


class MyException(exceptions.Throttled):
    default_detail = '连接次数过多'
    extra_detail_plural = extra_detail_singular = '请在{wait}秒内访问'

    def __init__(self, wait=None, detail=None, code=None):
        super().__init__(wait=wait, detail=detail, code=code)


class VisitThrottle():
    user_visit_information = dict()
    visited_times = 1
    period = 60
    allow_times_per_minute = 5
    first_time_visit = True

    def allow_request(self, request, view):
        self.request_host = request_host = request.META.get("REMOTE_ADDR")
        current_user_info = self.user_visit_information.get(request_host, None)

        if not self.__class__.first_time_visit:
            self.user_visit_information[request_host][0] += 1
            current_visit_times = self.user_visit_information[request_host][0]

            if current_visit_times > self.allow_times_per_minute:
                if self._current_time - current_user_info[1] <= self.period:
                    if len(current_user_info) > 2:
                        current_user_info[2] = self._time_left
                    else:
                        current_user_info.append(self._time_left)

                    view.throttled = self.throttled
                    return None
                else:
                    self.__class__.first_time_visit = True

        if self.first_time_visit:
            self.__class__.first_time_visit = False
            self._initial_infomation()

        return True

    def wait(self):
        return self.period - self.user_visit_information[self.request_host][2]

    def throttled(self, request, wait):
        raise MyException(wait=wait)

    @property
    def _current_time(self):
        return time.time()

    @property
    def _time_left(self):
        return math.floor(self._current_time - self.user_visit_information.get(self.request_host)[1])

    def _initial_infomation(self):
        self.user_visit_information[self.request_host] = [self.visited_times, self._current_time]

  指定频率类

class BookView(ModelViewSet):
    throttle_classes = [ VisitThrottle ]
    queryset = Book.objects.all()
    serializer_class = BookSerializer

使用DRF的简单频率控制来控制用户访问频率(局部)

  局部访问频率的控制

from rest_framework.throttling import SimpleRateThrottle


class RateThrottle(SimpleRateThrottle):
      # 指定访问频率
    rate = '5/m'

     # 指定通过什么方式来区分用户
    def get_cache_key(self, request, view):
        return self.get_ident(request)

  指定频率类

from .utils.throttles import RateThrottle

# Create your views here.


class BookView(ModelViewSet):
    throttle_classes = [ RateThrottle ]
    queryset = Book.objects.all()
    serializer_class = BookSerializer

全局访问频率的控制

  指定一个类

class RateThrottle(SimpleRateThrottle):
    scope = "visit_rate"

    def get_cache_key(self, request, view):
        return self.get_ident(request)

  在settings里面指定频率类和访问频率

REST_FRAMEWORK = {
    "DEFAULT_THROTTLE_CLASSES": ('throttler.utils.throttles.RateThrottle',),
    "DEFAULT_THROTTLE_RATES": {
        "visit_rate": "5/m"
    }
}
原文地址:https://www.cnblogs.com/chenxi67/p/10102771.html