DRF之认证组件

DRFday04

1 路由

1.1 路由配置的几种方式

(1)直接写在urls.py的urlpatterns中配置
path('books/<int:pk>/',views.Book1View.as_view()),
(2)如果视图类继承自ViewSetMixin,路由中列表需要加 action
path('books4/', views.Book4View.as_view(actions={'get': 'list', 'post': 'create'})),
path('books4/<int:pk>/',
     views.Book4View.as_view(actions={'get': 'retrieve', 'put': 'update', 'delete': 'destroy'})),
(3)如果视图类继承自ModelViewSet,它会自动生成路由,其写法如下:

step 1(导入routers模块)

from rest_framework import routers

step 2 (实例化路由类得到路由对象)

有两个路由类DefaultRouter和SimpleRounter(DefaultRouter生成的路由相较于SimpleRounter更多)
router = routers.SimpleRouter()

step3 (注册路由)

格式:router.register('前缀','继承自ModelViewSet的视图类','别名')
router.register('books2','book2View','lxx')

step4 (将自动生成的路由加入到原来的路由列表中)

urlpatterns += router1.urls

views.py

from rest_framework.viewsets import ModelViewSet
from app01.models import BookModel
from app01.serializer import BookSerializer

class book1View(ModelViewSet):
    queryset = BookModel.objects.all()
    serializer_class = BookSerializer

1.2 在视图集中附加 action声明

在视图集中,如果想要让Router自动帮助我们为自定义的动作生成路由信息,需要使用 rest_framework.decorators.action装饰器

action装饰器可以接收两个参数

methods:声明该action对应的请求方式,列表传递
detail:声明该action的路径是否与单一资源对应,是否携带pk

给继承自ModelViewSet视图类中的自定义函数也添加路由

from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.viewsets import ModelViewSet
from app01.models import BookModel
from app01.serializer import BookSerializer


class Book1View(ModelViewSet):
    queryset = BookModel.objects.all()
    serializer_class = BookSerializer

    @action(methods=['GET', 'POST'], detail=False)
    def get_or_create(self, request):
        # request._request.method
        if request.method == 'GET':
            book = BookModel.objects.all()
            book_ser = BookSerializer(book, many=True)
            return Response(book_ser.data)
        else:
            book_ser = BookSerializer(data=request.data)
            if book_ser.is_valid():
                book_ser.save()
                return Response(book_ser.data)
            else:
                return Response({'status': 101, 'msg': '数据校验未通过'})

    @action(methods=['GET', 'PUT', 'DELETE'], detail=True)
    def get_update_or_delete(self, request, pk):
        print('test')
        instance = BookModel.objects.filter(pk=pk).first()
        if not instance:
            return Response({'status': 101, 'msg': '无效的pk'})
        if request.method == 'GET':
            book_ser = BookSerializer(instance)
            return Response(book_ser.data)
        elif request.method == 'PUT':
            book_ser = BookSerializer(instance=instance, data=request.data)
            if book_ser.is_valid():
                book_ser.save()
                return Response(book_ser.data)
            else:
                return Response({'status': 101, 'msg': '数据校验未通过'})
        elif request.method == 'DELETE':
            instance.delete()
            return Response({'status': 101, 'msg': '成功删除'})

使用方式

GET|POST
http://127.0.0.1:8007/books1/get_or_create/
GET|PUT|DELETE
http://127.0.0.1:8007/books1/6/get_update_or_delete/

2 认证

2.1 认证的实现

自定义认证的实现流程如下

step1 写一个类继承于BaseAuthentication,在新建的类中重写authenticate方法;
step2 在authenticate方法中书写认证逻辑:
	(1)认证成功,则返回两个值Request.user和Request.auth;
    (2)认证失败的话,则抛异常APIException或者AuthenticationFailed。
step3 在全局配置,还是局部配置:
    (1)局部配置,在视图类中写上如authentication_classes=[LoginAuthentication,];
    (2)全局配置,在settings.py中REST_FRAMEWORK中配置;
    (3)局部配置的优先级高于全局配置。
    
# 全局配置:
REST_FRAMEWORK=[
    "DEFAULT_AUTHENTICATION_CLASSES":["app01.app_auth.LoginAuthentication",]
]
# 认证配置的查找顺序
先在自己视图类中找,然后到配置文件中找全局配置
# 局部禁用的方案
authentication_classes=[]

2.2 认证组件的源码分析

以APIview为例子。我们还记得rest_framework框架在APIview中重写了 Django 的as_view方法,在它 的as_view方法中,调用了父类的as_view方法,在执行到self.dispatch()方法处,因为self是APIView实例化得到的对象,因此执行了APIView 的dispatch方法,在该方法中除了实现了对原生request对象的二次分装,还实现了(认证,权限,频率限制三大组件),最后去除掉了CSRF校验。

故而,我们先看下APIView的dispatch方法

def dispatch(self, request, *args, **kwargs):
    request = self.initialize_request(request, *args, **kwargs) # 完成对原生request对象的二次封装

    try:
        self.initial(request, *args, **kwargs) # 三大组件

其次,看下APIView 的initialize_request方法

def initialize_request(self, request, *args, **kwargs):
    # 该方法中完成对request对象的二次封装
    return Request(
        request,
        parsers=self.get_parsers(),
        authenticators=self.get_authenticators(), # 获取视图类中的认证配置
        negotiator=self.get_content_negotiator(),
        parser_context=parser_context
    )

然后,看下APIView的get_authenticators 方法

def get_authenticators(self):
    # 从视图类的认证配置中,实例化认证类
    return [auth() for auth in self.authentication_classes]

接着,看下APIView的initial 方法

def initial(self, request, *args, **kwargs):
    ...
    self.perform_authentication(request) # 认证组件
    self.check_permissions(request)   # 权限组件
    self.check_throttles(request)  # 频率组件

再看下APIView的perform_authentication方法

    def perform_authentication(self, request):
        # 这里的request是经过二次封装的request
        request.user  # 需要去DRF的request对象中找User属性(方法)

转到Request类中

@property
def user(self):
    if not hasattr(self, '_user'): # 没有'_user'属性
        with wrap_attributeerrors(): # 借助上下文管理器
            self._authenticate()  # 核心代码
    return self._user

最终,我们看下Request的_authenticate方法

def _authenticate(self):
    # 这里的self指的是Request对象实例化后的对象
    for authenticator in self.authenticators: # self.authenticators是APIView视图中配置的认证类实例化的对象 [LoginAuthentication(),]
        try:
            # 认证器(对象)调用认证方法authenticate(认证类对象self, request请求对象)
            # 返回值:登陆的用户与认证的信息组成的 tuple
            # 该方法被try包裹,代表该方法会抛异常,抛异常就代表认证失败
            user_auth_tuple = authenticator.authenticate(self) # self是Request对象实例化后的对象  ==>authenticate(self,request)第一个self是实例化的认证类对象,request是二次封装后的认证类
        except exceptions.APIException: # 捕获异常
            self._not_authenticated()
            raise

        # 返回值的处理
        if user_auth_tuple is not None:
            self._authenticator = authenticator
             # 如何有返回值,就将 登陆用户 与 登陆认证 分别保存到 request.user、request.auth
            self.user, self.auth = user_auth_tuple
            return
         # 如果返回值user_auth_tuple为空,代表认证通过,但是没有 登陆用户 与 登陆认证信息,代表游客
    self._not_authenticated()

2.3 认证组件的使用

class LogAuthentication(BaseAuthentication):
    """
    ".authenticate() must be overridden."
    """

    def authenticate(self, request):
        """
        认证逻辑的书写:
        (1)如果认证成功,则返回request.user,request.auth
        (2)如果认证失败,则抛异常AuthenticationFailed
        """
        token = request.GET.get('token')
        if token:
            token_obj = Token.objects.filter(token=token).first()
            if token_obj:
                return token_obj.user, token
            else:
                raise AuthenticationFailed('认证失败')
        else:
            raise AuthenticationFailed('缺少认证参数token')

可以有多个认证,从左往右依次执行

这里的话顺带看下BaseAuthentication的authenticate的源码:

class BaseAuthentication:
    """
    All authentication classes should extend BaseAuthentication.
    """

    def authenticate(self, request):
        """
        Authenticate the request and return a two-tuple of (user, token).
        """
        raise NotImplementedError(".authenticate() must be overridden.")
        
# 子类中如果没有重写authenticate方法会抛异常

案例:

auth_check.py

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from app01.models import Token


class LogAuthentication(BaseAuthentication):
    """
    ".authenticate() must be overridden."
    """

    def authenticate(self, request):
        """
        认证逻辑的书写:
        (1)如果认证成功,则返回request.user,request.auth
        (2)如果认证失败,则抛异常AuthenticationFailed
        """
        token = request.GET.get('token')
        if token:
            token_obj = Token.objects.filter(token=token).first()
            if token_obj:
                return token_obj.user, token
            else:
                raise AuthenticationFailed('认证失败')
        else:
            raise AuthenticationFailed('缺少认证参数token')

views.py

# LoginView
class LoginView(APIView):
    authentication_classes = []
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = User.objects.filter(username=username, password=password).first()
        if user:
            # 每次登陆成功都会生成一个随机的字符串
            token = uuid4()
            # update_or_create有就更新,没有就新增
            Token.objects.update_or_create(defaults={'token': token}, user=user)
            return Response({'status': 100, 'msg': '登陆成功'})
        else:
            return Response({'status': 101, 'msg': '用户名或者作者错误'})

测试效果

原文地址:https://www.cnblogs.com/surpass123/p/13275937.html