Django Rest Framwork的认证组件 权限组件以及频率组件

提示:

再看这篇博客之前 最好先看看关于CBV FBV的那篇博客  看完之后再来理解源码以及流程就相对方便多了 https://www.cnblogs.com/wakee/p/12553402.html

本片博客部分参考:https://www.cnblogs.com/wupeiqi/articles/7805382.html

1 DRF简介

'''
基于cbv完成满足restful规范的接口
'''

2 安装drf

pip install djangorestframework

# 安装好之后再settings.py注册app: INSTALLED_APPS = [..., 'rest_framework']

  

3 认证组件

注意

组件中有俩种获取数据的方式
1 request.data.get()
2 request._request.GET.get()

区别是request.data是前端放在body中的数据
request._request.GET.get()是get请求带的参数

  

# 对用户进行登录认证
class AuthView(APIView):

    def post(self,request,*args,**kwargs):
        ret = {
            'code':1000,
            "msg":None
        }
        # 从post 里面取数据
        print(request.data)
		# 获取数据
        username = request.data.get("username")
        password = request.data.get("password")
        
        
        
        
class Authtication(BasicAuthentication):
    def authenticate(self,request):

        # 获取数据
        print(request.data)  # body中的数据
        token = request._request.GET.get("token")
        print(token)

  

 

3.1 authentication

"""
系统:session认证
rest_framework.authentication.SessionAuthentication
ajax请求通过认证:
cookie中要携带 sessionid、csrftoken,请求头中要携带 x-csrftoken

第三方:jwt认证 
rest_framework_jwt.authentication.JSONWebTokenAuthentication
ajax请求通过认证:
请求头中要携带 authorization,值为 jwt空格token

自定义:基于jwt、其它
1)自定义认证类,继承BaseAuthentication(或其子类),重写authenticate
2)authenticate中完成
    拿到认证标识 auth
    反解析出用户 user
    前两步操作失败 返回None => 游客
    前两步操作成功 返回user,auth => 登录用户
    注:如果在某个分支抛出异常,直接定义失败 => 非法用户
"""

  

3.2源码流程

dispatch
	--封装request
    	--获取定义的认证类(全局/局部),通过列表生成式创建对象
    --initial
    	--perform_auhentication
        	--request.user(内部循环列表生成式创建的对象)

 具体源码

# 第一步
    def dispatch(self, request, *args, **kwargs):
        """
        `.dispatch()` is pretty much the same as Django's regular dispatch,
        but with extra hooks for startup, finalize, and exception handling.
        """
        self.args = args
        self.kwargs = kwargs
        # 对原生的request进行加工,看第二步 
        request = self.initialize_request(request, *args, **kwargs)  # 点击initialize_request进入第二步看request
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            self.initial(request, *args, **kwargs)

            # Get the appropriate handler method
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed

            response = handler(request, *args, **kwargs)

        except Exception as exc:
            response = self.handle_exception(exc)

        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response
    
    
    
    
  # 第二步  
    
    def initialize_request(self, request, *args, **kwargs):
        """
        Returns the initial request object.
        """
        parser_context = self.get_parser_context(request)

        # 对request进行了加工 不仅有原生request 还有其他的类对象比如authentication_classes
        return Request(
            request,
            parsers=self.get_parsers(),
            authenticators=self.get_authenticators(), # 点击get_authenticators()进入第三步
            negotiator=self.get_content_negotiator(),
            parser_context=parser_context
        )
    
    
    
   # 第三步
    
     def get_authenticators(self):
        """
        Instantiates and returns the list of authenticators that this view can use.
        """
        # 谁调用他就拿到那个类的对象
        return [auth() for auth in self.authentication_classes]  # 点击authentication_classes 第四步
    
    
    
    # 第四步:
    class APIView(View):

        # The following policies may be set at either globally, or per-view.
        renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
        parser_classes = api_settings.DEFAULT_PARSER_CLASSES
        authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES # 点击后看的部分
        throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
        permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
        content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS
        metadata_class = api_settings.DEFAULT_METADATA_CLASS
        versioning_class = api_settings.DEFAULT_VERSIONING_CLASS
    
    
  
    
    
    # 第五步
    def dispatch(self, request, *args, **kwargs):
        """
        `.dispatch()` is pretty much the same as Django's regular dispatch,
        but with extra hooks for startup, finalize, and exception handling.
        """
        self.args = args
        self.kwargs = kwargs
        # 对原生的request进行加工 ,其实就是
        request = self.initialize_request(request, *args, **kwargs)  # 点击initialize_request进入第二步看request
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            self.initial(request, *args, **kwargs)  # 点击进入第六步

            # Get the appropriate handler method
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed

            response = handler(request, *args, **kwargs)

        except Exception as exc:
            response = self.handle_exception(exc)

        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response
    
    
    
    
    # 第六步
    def initial(self, request, *args, **kwargs):
        """
        Runs anything that needs to occur prior to calling the method handler.
        """
        self.format_kwarg = self.get_format_suffix(**kwargs)

        # Perform content negotiation and store the accepted info on the request
        neg = self.perform_content_negotiation(request)
        request.accepted_renderer, request.accepted_media_type = neg

        # Determine the API version, if versioning is in use.
        version, scheme = self.determine_version(request, *args, **kwargs)
        request.version, request.versioning_scheme = version, scheme

        # Ensure that the incoming request is permitted
        self.perform_authentication(request)  # 点击进入下一步
        self.check_permissions(request)
        self.check_throttles(request)
        
     
    
     # 第七步   
    def perform_authentication(self, request):
        """
        Perform authentication on the incoming request.

        Note that if you override this and simply 'pass', then authentication
        will instead be performed lazily, the first time either
        `request.user` or `request.auth` is accessed.
        """
        request.user # 点击user进入下一步

    
    # 第八步
    @property
    def user(self):
        """
        Returns the user associated with the current request, as authenticated
        by the authentication classes provided to the request.
        """
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():
                self._authenticate() # 点击进入下一步
        return self._user
    
    
    # 第九步
def _authenticate(self):
        # 遍历拿到一个个认证器,进行认证
        # self.authenticators配置的一堆认证类产生的认证类对象组成的 list
        for authenticator in self.authenticators:
            try:
                # 认证器(对象)调用认证方法authenticate(认证类对象self, request请求对象)
                # 返回值:登陆的用户与认证的信息组成的 tuple(request.user,request.auth)
                # 该方法被try包裹,代表该方法会抛异常,抛异常就代表认证失败
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise

            # 返回值的处理
            if user_auth_tuple is not None:
                self._authenticator = authenticator
                # 如何有返回值,就将 登陆用户 与 登陆认证 分别保存到 request.user、request.auth
                self.user, self.auth = user_auth_tuple
                return
        # 如果返回值user_auth_tuple为空,代表认证通过,但是没有 登陆用户 与 登陆认证信息,代表游客
        self._not_authenticated()

  

3.3  自定登录认证

models.py

from django.db import models

class Userinfo(models.Model):
    user_type_choice ={
        (1,'普通用户'),
        (2,'VIP用户'),
        (3,'SVIP用户')
    }
    user_type = models.IntegerField(choices=user_type_choice)
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=64)


# 用户token表
class UserToken(models.Model):
    user = models.OneToOneField(to="Userinfo",on_delete=True)
    token = models.CharField(max_length=64)

  

views.py

from django.shortcuts import render
from rest_framework.views import APIView
from api import models
from django.http import JsonResponse,HttpResponse


# 生成随机字符创(token)  并且进行更新
def md5(username):
    import hashlib
    import time
    ctime = str(time.time())
    m = hashlib.md5(bytes(username,encoding="utf-8") )
    m.update(bytes(ctime,encoding="utf-8"))
    return m.hexdigest()


# 对用户进行登录认证
class AuthView(APIView):

    def get(self,request):
        return HttpResponse("GET请求")

    def post(self,request,*args,**kwargs):
        ret = {
            'code':1000,
            "msg":None
        }

        # 从post 里面取数据
        # 注意 获取值是request.data.get()来获取
        print(request.data)  # 这是前端带来的数据
        username = request.data.get("username")
        password = request.data.get("password")

        print(username,password)
        obj = models.Userinfo.objects.filter(username=username,password=password).first()
        if not obj:
            ret["code"] = 1001
            ret["msg"] = "用户名或密码错误"
            return JsonResponse(ret)
        if obj:

            # 为用户创建token
            token = md5(username=username)
            # 存在就更新 不存在就创建
            models.UserToken.objects.update_or_create(user=obj,defaults={"token":token})
            ret["token"] = token
            return JsonResponse(ret)

        else:
            ret["code"] =1002
            ret["msg"] = "请求异常"
            return JsonResponse(ret)

  

3.4 自定义认证类

自定义类模板

1 创建类 继承BasicAuthentication 自己实现authenticate方法(就是自定义实现认证的逻辑)
2 返回值:
    --None,表示不管当前认证,让下一位类来进行认证
    --raise exceptions.AuthenticationFailed("用户认证失败")  #  from rest_framework import exceptions
    --(元素1,元素2)  # 元素1赋值给request.user  元素二赋值给request.auth

3 局部使用或全局使用

 自定义认证案例 

from rest_framework import exceptions
from rest_framework.authentication import BasicAuthentication

# 第一步要写一个认证类 继承BasicAuthentication class Authtication(BasicAuthentication):

  # 第二步重写authenticate方法 def authenticate(self,request): token = request._request.GET.get("token") token_obj = models.UserToken.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("用户认证失败") # 在restframwork内部会将两个字段赋值给request,以供后续操作 return (token_obj.user,token_obj) def authenticate_header(self, request): # 此方法带着 不必写代码逻辑 pass

  

使用自定义类

局部使用  views.py

from rest_framework import exceptions
from rest_framework.authentication import BasicAuthentication
# 第一步要写一个认证类
class Authtication(BasicAuthentication):
    
    # 第二步重写authenticate方法
    def authenticate(self,request):
        # token = request._request.get("token")
        token_obj = models.UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("用户认证失败")

        # 在restframwork内部会将两个字段赋值给request,以供后续操作
        return (token_obj.user,token_obj)

    def authenticate_header(self, request):
        pass


    
    
class OrderVIew(APIView):
    '''
    登陆之后可以查看订单信息
    '''  
    
    # 第三步使用自定义类
    authentication_classes = [Authtication,]
    
    
    # 获取订单信息
    # self.dispatch
    def get(self,request,*args,**kwargs):
        # 要首先判断是否是登录  登录的才能看订单信息
        # request.user 对应class Authtication(object):中的return (token_obj.user,token_obj)第一个参数
        # request.auth 对应class Authtication(object):中的return (token_obj.user,token_obj)第二个参数
        ret = {"code":1000,"msg":None,"data":None}

        try:
            ret["data"] = order_dict
        except Exception as e:
            pass
        return JsonResponse(ret)

  

全局使用

在settings.py中

REST_FRAMEWORK = {
    # 'DEFAULT_AUTHENTICATION_CLASSES': ['apps.api.utils.auth.Authtication',],
    'DEFAULT_AUTHENTICATION_CLASSES': ['apps.api.utils.auth.Authtication'],  # 是自定义认证类的路径
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,
}

或者 

REST_FRAMEWORK = {
	# python中认证的配置
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework.authentication.BasicAuthentication',   # 基本认证
        'rest_framework.authentication.SessionAuthentication',  # session认证
    ),
    
    # python中权限的配置,如果没有指明,系统默认的权限是允许所有人访问的
    'DEFAULT_PERMISSION_CLASSES': (
        'rest_framework.permissions.AllowAny',
        
        # 全局配置:一站式网站(所有操作都需要登录后才能访问)
        # 'rest_framework.permissions.IsAuthenticated',
    )
}

  

 

4 权限组件

功能:为不同的视图赋予不同的权限    用法和认证组件类似  源码流程和认证类似

源码

# 和认证组件类似 也是从dispatch开始往后看
self.check_permissions(request)
    认证细则:
    def check_permissions(self, request):
        # 遍历权限对象列表得到一个个权限对象(权限器),进行权限认证
        for permission in self.get_permissions():
            # 权限类一定有一个has_permission权限方法,用来做权限认证的
            # 参数:权限对象self、请求对象request、视图类对象
            # 返回值:有权限返回True,无权限返回False
            if not permission.has_permission(request, self):
                self.permission_denied(
                    request, message=getattr(permission, 'message', None)
                )

  

#权限组件自带的四个权限,默认全局配置的是AllowAny,
    - 可以在settings.py中更换全局配置
    - 可以在视图类中针对性的局部配置
    - 可以自定义

class AllowAny(BasePermission):
    # 游客与登陆用户都有所有权限
    def has_permission(self, request, view):
        return True
​
class IsAuthenticated(BasePermission):
    # 游客没有任何权限,登陆用户才有权限
    def has_permission(self, request, view):
        return bool(request.user and request.user.is_authenticated)
​
class IsAdminUser(BasePermission):
    # 游客没有任何权限,登陆用户才有权限
    def has_permission(self, request, view):
        return bool(request.user and request.user.is_staff)
​
class IsAuthenticatedOrReadOnly(BasePermission):
    # 认证规则必须是只读请求或是合法用户: 游客只读,合法用户无限制
    def has_permission(self, request, view):
        return bool(
            request.method in SAFE_METHODS or 
            request.user and
            request.user.is_authenticated
        )

  

自定义权限类

'''
1) 创建继承BasePermission的权限类
2) 实现has_permission方法
3) 实现体根据权限规则 确定有无权限
4) 进行全局或局部配置
​
认证规则
i.满足设置的用户条件,代表有权限,返回True
ii.不满足设置的用户条件,代表有权限,返回False
'''

使用

局部使用

# 第一步 自定义权限类  继承BasePermission
from rest_framework.permissions import BasePermission
class MyPermission(BasePermission):
    def has_permission(self,request,view):
        if request.user.user_type !=3:
            return False  # 无权访问
        return True  # 有权访问

    
    
class OrderVIew(APIView):

    # 添加一个需求 用户等级是3就可以访问
    permission_classes = [MyPermission,]  # 局部权限配置

    def get(self,request,*args,**kwargs):
        ret = {"code":1000,"msg":None,"data":None}

        try:
            ret["data"] = order_dict
        except Exception as e:
            pass
        return JsonResponse(ret)

  

全局使用

settings.py进行配置  如有视图中还有    permission_classes = [],优先 使用视图中 没有的再去配置文件中找

REST_FRAMEWORK = {
	# DRF中认证的配置
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework.authentication.BasicAuthentication',   # 基本认证
        'rest_framework.authentication.SessionAuthentication',  # session认证
    )
    
     
    # DRF中权限的配置
    # python中权限的配置,如果没有指明,系统默认的权限是允许所有人访问的
    'DEFAULT_PERMISSION_CLASSES': (
        'rest_framework.permissions.AllowAny',  # 所有用户都应权限
        
        # 一站式网站(所有操作都需要登录后才能访问)
        # 'rest_framework.permissions.IsAuthenticated',
        
        # 自定义配置的类
        #‘DEFAULT_PERMISSION_CLASSES‘:[‘app01.utils.permission.SVIPPermission‘,]#可以自定义多个权限  可以写地址
        
        # 自定义配置的类
        # 'rest_framework.permissions.MyPermission',

    )
}

  

 

5 频率组件

源码流程

1. 和认证的流程一样,进入initial(request)
2. 其中check_throttles(request)是节流的函数
3. allow_request()就是节流函数(要复写)(get_throttles循环所有节流类)
4. VisitThrottle自定义权限类
   allow_request()返回值:

    - True, 允许访问
    - False, 访问太频繁
    
   wait()返回值:返回一个整数,表示下次还有多久可以访问

  

两种使用方式

1 继承BaseThrottle,自己实现allow_request(返回True或者False表示可以访问或者访问频率太高)
	和wait方法 (表示还需等待到少秒)
    
2 继承SimpleRateThrottle,自己实现 get_cache_key和设置一个scope(配置文件字段) 
    from rest_framework.throttling import  SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
        scope = "Vistor"
		# 获取用户的IP
        def get_cache_key(self, request, view):
            return self.get_ident(request)

    class UserThrottle(SimpleRateThrottle):
        scope = "User"

        def get_cache_key(self, request, view):
            return self.user.username
  
局部配置  全局配置
class AuthView(APIView):
 	# 局部配置
    throttle_classes = [VisitThrottle,]

全局配置
1 : 针对第一种继承类方法
'DEFAULT_THROTTLE_CLASSES': ['apps.api.utils.throttle.UserThrottle'], # 是自定义类的路径

 
2 : 针对第二种继承类方法
	'DEFAULT_THROTTLE_RATES': {
      'Vistor': '3/m',
      'User': '10/m'
     },

  

第一种方式(建议使用)

最好写在专门的组件文件夹中 和views视图分开

from rest_framework.throttling import  SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
    scope = "Vistor"

    def get_cache_key(self, request, view):
        return self.get_ident(request)

class UserThrottle(SimpleRateThrottle):
    scope = "User"

    def get_cache_key(self, request, view):
        return self.user.username

  

第二种方式

最好写在专门的组件文件夹中 和views视图分开

# 节流控制
from rest_framework.throttling import BaseThrottle
import time

# 设置一个空字典 放每次匿名登录的时间
# 有个缺点 将用户ip放到全局变量中 每次启动时都会清空
VISIT_REVORD={}

class VisitThrottle(BaseThrottle):
    '''
    根据IP来限制用户的访问次数
    '''
    def __init__(self):
        self.history = None

    def allow_request(self, request, view):
        # 获取用户的ip
        remote_addr = request.META.get('REMOTE_ADDR')

        ctime = time.time()

        if remote_addr not in VISIT_REVORD:
            # 讲用户IP添加到字典中
            VISIT_REVORD[remote_addr] = [ctime,]
            return True


        # 获取到用户ip
        history = VISIT_REVORD.get(remote_addr)
        self.history  = history

        # 判断是否有ip  且判断字典最后一位是否小于当前时间减去30秒时
        while history and history[-1] < ctime-30:
            # 如果是减去最后一位 pop默认减去最后一位
            history.pop()


        if len(history) <3:
            # 当次数小于三时 将当前操作时间放进去
            history.insert(0,ctime)
            return True


    def wait(self):
        '''
        告知用户还需要等多少时间
        '''
        ctime = time.time()
        # history[-1]使用户最用一次访问时间
        wait_time = 30 -(ctime-self.history[-1]) # 提示用户还有多少秒才能访问
        return wait_time

  

 
 
 
 

 

原文地址:https://www.cnblogs.com/wakee/p/12556212.html