drf认证、权限、频率、过滤、排序、异常处理

认证组件

使用方法:

1、新建一个认证类文件,继承BaseAuthentication

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.response import Response
from app03 import models
class Bookauth(BaseAuthentication):

    def authenticate(self, request):
        token=request.META.get('HTTP_TOKEN')
        if token:
            token_obj=models.UserToken.objects.filter(token=token).first()
            if token_obj:
                return token_obj.user,token
            else:
                raise AuthenticationFailed('验证失败')
        else:
            raise AuthenticationFailed('请求头没有token信息')

2、认证类局部配置

'''
局部使用,在列表内发那个值认证类
局部禁用,设置列表为空
'''
class Login(APIView):
    #局部使用
    authentication_classes = [Bookauth,]
    #局部禁用
    # authentication_classes = []
    def post(self, request):
        user = models.User.objects.filter(username=request.data.get('username'), password=request.data.get('password')).first()
        if user:
            token = uuid.uuid4()

            models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
            return Response("{'msg':'登陆成功'}")
        else:
            return Response("{'msg':'用户不存在'}")

 3、全局配置,在settings文件中配置:

REST_FRAMEWORK={
    # "DEFAULT_AUTHENTICATION_CLASSES":["app04.MyAuth.Login_auth",]
    "DEFAULT_AUTHENTICATION_CLASSES":["app03.BookAuth.Bookauth",]
}

认证源码分析:

#1 APIVIew----》dispatch方法---》self.initial(request, *args, **kwargs)---->有认证,权限,频率
#2 只读认证源码: self.perform_authentication(request)
#3 self.perform_authentication(request)就一句话:request.user,需要去drf的Request对象中找user属性(方法) 
#4 Request类中的user方法,刚开始来,没有_user,走 self._authenticate()

#5 核心,就是Request类的 _authenticate(self):
    def _authenticate(self):
        # 遍历拿到一个个认证器,进行认证
        # self.authenticators配置的一堆认证类产生的认证类对象组成的 list
        #self.authenticators 你在视图类中配置的一个个的认证类:authentication_classes=[认证类1,认证类2],对象的列表
        for authenticator in self.authenticators:
            try:
                # 认证器(对象)调用认证方法authenticate(认证类对象self, request请求对象)
                # 返回值:登陆的用户与认证的信息组成的 tuple
                # 该方法被try包裹,代表该方法会抛异常,抛异常就代表认证失败
                user_auth_tuple = authenticator.authenticate(self) #注意这self是request对象
            except exceptions.APIException:
                self._not_authenticated()
                raise

            # 返回值的处理
            if user_auth_tuple is not None:
                self._authenticator = authenticator
                # 如何有返回值,就将 登陆用户 与 登陆认证 分别保存到 request.user、request.auth
                self.user, self.auth = user_auth_tuple
                return
        # 如果返回值user_auth_tuple为空,代表认证通过,但是没有 登陆用户 与 登陆认证信息,代表游客
        self._not_authenticated()

 权限

# APIView---->dispatch---->initial--->self.check_permissions(request)(APIView的对象方法)
    def check_permissions(self, request):
        # 遍历权限对象列表得到一个个权限对象(权限器),进行权限认证
        for permission in self.get_permissions():
            # 权限类一定有一个has_permission权限方法,用来做权限认证的
            # 参数:权限对象self、请求对象request、视图类对象
            # 返回值:有权限返回True,无权限返回False
            if not permission.has_permission(request, self):
                self.permission_denied(
                    request, message=getattr(permission, 'message', None)
                )
权限源码分析

使用方法:

(1)自定义一个权限类,继承BasePermission类,代码如下:

from rest_framework.permissions import BasePermission

class User_per(BasePermission):
    def has_permission(self, request, view):
        user=request.user
        is_staff=user.is_staff
        if is_staff==1: #1:管理员 2:普通用户 3:匿名用户
            return True
        else:
            return False

(2)全局配置跟局部配置:

REST_FRAMEWORK = {
    # 权限全局配置
    "DEFAULT_PERMISSION_CLASSES": ['app01.app_per.User_per', ]
}

#局部配置
class TestView1(APIView):
    #局部禁用
    permission_classes = []
    #局部配置
    permission_classes = [User_per]
    def get(self,request):
        return Response('我是匿名用户')

内置权限:

from rest_framework.permissions import IsAdminUser
from rest_framework.authentication import SessionAuthentication
class Supper(APIView):
    permission_classes = [IsAdminUser]
    authentication_classes = [SessionAuthentication]
    def get(self,request):
       
        return Response('这是超级管理员的测试')

频率

全局配置:settings文件下

REST_FRAMEWORK = {
    #限制
    'DEFAULT_THROTTLE_CLASSES': (
        'rest_framework.throttling.AnonRateThrottle',#未登录用户
        'rest_framework.throttling.UserRateThrottle',#登陆用户
    ),
    'DEFAULT_THROTTLE_RATES': {
        'anon': '3/m', #1分钟访问3次,m后面无论带什么都没影响,因为它只截取m
        'user': '5/m'
    },
}

局部配置:

from rest_framework.throttling import UserRateThrottle
class TestView1(APIView):
    #频率局部配置
    throttle_classes = [UserRateThrottle]
    def get(self,request):
        return Response('我是匿名用户')

过滤

django自带的过滤SearchFilter:

from rest_framework.filters import SearchFilter
# 导入过滤类SearchFilter.
class Course_View(GenericViewSet,ListModelMixin):
    queryset = models.Course.objects.filter(is_show=True,is_delete=False).all()
    serializer_class = Course_ser.Course_serializers
    # 过滤
    filter_backends = [SearchFilter]
    # 过滤字段
    search_fields=['price']  #django自带的过滤

django-filter

1、安装:pip install django-filter

2、在settings中进行app注册:

INSTALLED_APPS = [
    'django.contrib.admin',
    ....'django_filters',
]

3、全局配置:

REST_FRAMEWORK = {
   #过滤全局配置
    'DEFAULT_FILTER_BACKENDS': ('django_filters.rest_framework.DjangoFilterBackend',),
}

4、局部配置以及在视图类的使用:

django-filter以某个字段过滤的两种方式:

1、配置过滤字段

from django_filters.rest_framework import DjangoFilterBackend
class BookView(ModelViewSet):
    queryset = models.Book.objects.all()
    serializer_class = Book_ser
    #局部配置
    filter_backends = [DjangoFilterBackend]
    #配置要过滤的字段
    filter_fields = ('name','price',)

2、配置过滤类

from django_filters.rest_framework import DjangoFilterBackend
class BookView(ModelViewSet):
    queryset = models.Book.objects.all()
    serializer_class = Book_ser
    #局部配置
    filter_backends = [DjangoFilterBackend]
    #配置要过滤的类(自己写的类)
    filter_class=filters.CourseFiltersset


#CourseFilterseet类代码如下:
from django_filters.filterset import FilterSet
from django_filters import filters
from course import models

class CourseFiltersset(FilterSet):class Meta:
        model=models.Course
        fields=['price']

3、区间过滤

from django_filters.rest_framework import DjangoFilterBackend
class BookView(ModelViewSet):
    queryset = models.Book.objects.all()
    serializer_class = Book_ser
    #局部配置
    filter_backends = [DjangoFilterBackend]
    #配置要过滤的类(自己写的类)
    filter_class=filters.CourseFiltersset


#CourseFilterseet类代码如下:
from django_filters.filterset import FilterSet
from django_filters import filters
from course import models

class CourseFiltersset(FilterSet):
    #以price字段为筛选字段
    min_price=filters.NumberFilter(field_name='price',lookup_expr='gt')
    max_price=filters.NumberFilter(field_name='price',lookup_expr='lte')
    class Meta:
        model=models.Course
        fields=['course_category']

自定义过滤类

# filters.py
# 自定义过滤规则
from rest_framework.filters import BaseFilterBackend

class MyFilter(BaseFilterBackend):
    def filter_queryset(self, request, queryset, view):
        # 真正的过滤规则
        # params=request.GET.get('teacher')
        # queryset.filter('''''')
        return queryset[:1]
 
# 使用:在视图类中配置
filter_backends=[DjangoFilterBackend,OrderingFilter,MyFilter]

排序

1、导入OrderingFilter模块

from rest_framework.filters import OrderingFilter

2、在视图类上的使用:

class BookView(ModelViewSet):
    authentication_classes = []
    permission_classes = []
    throttle_classes = []

    queryset = models.Book.objects.all()
    serializer_class = Book_ser
    #局部配置
    # filter_backends = [DjangoFilterBackend]
    # #配置要过滤的字段
    # filter_fields = ('name','price',)
    filter_backends = [OrderingFilter]
    ordering_fields = ('id', 'price')

    '''
    使用
          -号:表示降序
    '''
    http://127.0.0.1:8000/books2/?ordering=-price
    http://127.0.0.1:8000/books2/?ordering=price

异常处理

1、自定义一个异常处理方法,用来重写rest_framework.views中exception_handler方法:

from rest_framework.views import exception_handler
from rest_framework.response import Response
from rest_framework import status
def my_exception_handler(exc, context):
    response=exception_handler(exc, context)
    # 返回两种情况,一个是None,drf没有处理
    #一个是response对象,django处理了,处理格式不是很好
    
    if not response:
        if isinstance(exc, ZeroDivisionError):
            return Response(data={'status': 9999, 'msg': "除以0的错误" + str(exc)}, status=status.HTTP_400_BAD_REQUEST)
        return Response(data={'status':1111,'msg':str(exc)},status=status.HTTP_400_BAD_REQUEST)
    else:
        return Response(data={'status':2222,'msg':response.data.get('detail')},status=status.HTTP_400_BAD_REQUEST)

全局配置:

REST_FRAMEWORK = {
  #异常
    'EXCEPTION_HANDLER': 'app01.wsigi.APIhandle_exceptn'#自己写的异常处理方法
}

排序过滤过程源码分析:

#排序字段过滤字段源码解析:
'''
流程:先ListModelMinxin中list方法=》GenericAPIView的filter_queryset方法(作用实例化OrderingFilter,并调用它的方法filter_queryset())
=》OrderingFilter(filter_queryset())=>调用get_ordering方法来获取ordering筛选字段=>再回到filter_queryset方法根据返回的字段进行排序
''' from rest_framework.filters import OrderingFilter from rest_framework.filters import SearchFilter class Course_View(GenericViewSet,ListModelMixin): queryset = models.Course.objects.filter(is_show=True,is_delete=False).all() serializer_class = Course_ser.Course_serializers # 排序和过滤 filter_backends = [OrderingFilter,DjangoFilterBackend] # 排序字段 ordering_fields=['id','price','students'] # 过滤字段 filter_fields=['course_category','price'] #search_fields=['price'] #django自带的过滤 #1、先从listMOdelMixin中可以看到list方法调用了filter_queryset class ListModelMixin: def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset() ...... #因为CourseView本身没有filter_queryset方法,所以从GenericViewSet找,它也没有,再从它父类generics.GenericAPIView找 def filter_queryset(self, queryset): for backend in list(self.filter_backends): #将OrderingFilter类实例化并调用它自身的filter_queryset方法 queryset = backend().filter_queryset(self.request, queryset, self) return queryset #再从OrderingFilter类中找到它的filter_quesryset方法 def get_ordering(self, request, queryset, view): # self.ordering_param 是配置文件中ordering params = request.query_params.get(self.ordering_param) if params: # fields=[id,price] fields = [param.strip() for param in params.split(',')] #去除无效的过滤条件(视图类中没有配置的字段,直接去除) ordering = self.remove_invalid_fields(queryset, fields, view, request) if ordering: return ordering # self.get_default_ordering方法:是根据ordering_fields反射来获取字段 ['id','price','students'] return self.get_default_ordering(view) def filter_queryset(self, request, queryset, view): #获取筛选字段 ordering = self.get_ordering(request, queryset, view) if ordering: return queryset.order_by(*ordering) return queryset
#过滤过程源码解析;
#流程:ListModelMixin中list方法=>list方法中调用GenericAPIView的filter_queryset方法 =》来对 filter_backends = [SearchFilter]中这个类实例化并调用fiter_queryse方法
 def filter_queryset(self, request, queryset, view):
        '''
        get_search_fields方法:利用反射来获取视图类中配置的search_fields中的字段
        get_search_terms方法:是对请求连接进行处理,来获取过滤字段的参数,获取2
        比如:http://www.baidu.com/?price=2
        '''
        search_fields = self.get_search_fields(view, request)
        search_terms = self.get_search_terms(request)

        if not search_fields or not search_terms:
            return queryset
        #将过滤字段进行拼接成过滤条件,如:['id_icontains']
        orm_lookups = [
            self.construct_search(str(search_field))
            for search_field in search_fields
        ]
        base = queryset
        conditions = []
        for search_term in search_terms:
            queries = [
                models.Q(**{orm_lookup: search_term})
                for orm_lookup in orm_lookups
            ]
            conditions.append(reduce(operator.or_, queries))
        #过滤完的queryset
        queryset = queryset.filter(reduce(operator.and_, conditions))

        if self.must_call_distinct(queryset, search_fields):
            # Filtering against a many-to-many field requires us to
            # call queryset.distinct() in order to avoid duplicate items
            # in the resulting queryset.
            # We try to avoid this if possible, for performance reasons.
            queryset = distinct(queryset, base)
        return queryset
原文地址:https://www.cnblogs.com/nq31/p/13926977.html