Django REST framework认证权限和限制和频率

认证、权限和限制

身份验证是将传入请求与一组标识凭据(例如请求来自的用户或其签名的令牌)相关联的机制。然后 权限 和 限制 组件决定是否拒绝这个请求。

简单来说就是:

认证确定了你是谁

权限确定你能不能访问某个接口

限制确定你访问某个接口的频率

认证

REST framework 提供了一些开箱即用的身份验证方案,并且还允许你实现自定义方案。

个人 敲码log:

1.

# Create your models here.
class UserInfo1(models.Model):
    id = models.AutoField(primary_key=True)  # 创建一个自增的主键字段
    # 创建一个varchar(64)的唯一的不为空的字段
    name = models.CharField(null=False, max_length=20)
    pwd = models.CharField(max_length=32,default=123)

class Token(models.Model):
    user = models.OneToOneField("UserInfo1",on_delete=models.CASCADE)
    token = models.CharField(max_length=128)

2.

# 我需要一个随机字符串
def get_random_str(user):
    import hashlib, time
    ctime = str(time.time())
    # 封装bytes类型一个MD5的加密对象
    md5 = hashlib.md5(bytes(user, encoding="utf8"))
    # md5.update 是拼接的效果,随机生成md5值
    md5.update(bytes(ctime, encoding="utf8"))
    return md5.hexdigest()


# 认证、权限和限制
class LoginModelView(APIView):
    def post(self, request):
        name = request.data.get("name")
        pwd = request.data.get("pwd")

        user = models.UserInfo1.objects.filter(name=name, pwd=pwd).first()
        res = {"state_code": 1000, "msg": None}
        if user:
            # 返回了一个usermd5 加密的字符串
            random_str = get_random_str(user.name)
            """
            当存在token时,则更新,不存在则创建,defaults: 是由 (field, value) 对组成的字典,用于更新对象。 
            返回一个由 (object, created)组成的元组,
            object: 是一个创建的或者是被更新的对象,
            created: 是一个标示是否创建了新的对象的布尔值。                      
            """
            token = models.Token.objects.update_or_create(user=user, defaults={"token": random_str})
            res["token"] = random_str
        else:
            res["state_code"] = 1001  # 错误状态码
            res["msg"] = "用户名或者密码错误"

        import json
        # 这是因为json.dumps 序列化时对中文默认使用的ascii编码.想输出真正的中文需要指定ensure_ascii=False:
        # 如果你这样的话就能把中文转成json字符串,而不是 u4e2du56fd
        return Response(json.dumps(res,ensure_ascii=False))

Django update_or_create 分析

update_or_create(defaults=None, **kwargs)
defaults 的值不同则创建,相同则更新

Member.objects.update_or_create(defaults={'user':1}, others={'field1':1,'field2':1})
当存在user=1时,则更新,不存在则创建

update_or_create 用法:

update_or_create(defaults=None, **kwargs)

kwargs: 来更新对象或创建一个新的对象。

defaults: 是由 (field, value) 对组成的字典,用于更新对象。

 

返回一个由 (object, created)组成的元组,

object: 是一个创建的或者是被更新的对象,

created: 是一个标示是否创建了新的对象的布尔值。

 

update_or_create: 方法通过给出的kwarg

 

定义一个认证类(详情请看 源码分析)

好处:

1.他能判断你是否是当前登录用户,当你没有带认证码 向我后端发请求的时候我是不会给你数据的。

2.他每一次登录的认证都会改变。

局部视图认证

示例1:

class TokenAuth(object):
    def authenticate(self, request):
        # 取到 request里面的 token值
        totken = request.GET.get("token")
        token_obj = models.Token.objects.filter(token=totken).first()
        if not token_obj:
            # 抛认证字段的异常
            raise exceptions.AuthenticationFailed("验证失败")
        else:
            return token_obj.user.name,token_obj.token

    def authenticate_header(self,request):
        pass
# 多条数据
class BookView(APIView):
    # 定义一个认证类
    authentication_classes = [TokenAuth]

而没有定义 token类的依然可以访问。

 

全局级别认证

# 定义全局认证,所有视图都需要认证
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES" : ["app01.utils.TokenAuth"]

}

from app01 import models
from rest_framework import exceptions


class TokenAuth(object):
    def authenticate(self, request):
        # 取到 request里面的 token值
        totken = request.GET.get("token")
        token_obj = models.Token.objects.filter(token=totken).first()
        if not token_obj:
            # 抛认证字段的异常
            raise exceptions.AuthenticationFailed("验证失败")
        else:
            return token_obj.user.name, token_obj.token

    def authenticate_header(self, request):
        pass

权限

只有VIP用户才能看的内容。

 

from rest_framework.permissions import BasePermission
class SVIPPermission(BasePermission):
    message="只有超级用户才能访问"
    def has_permission(self,request,view):
        username=request.user
        user_type=User.objects.filter(name=username).first().user_type

        if user_type==3:

            return True # 通过权限认证
        else:
            return False
# 封装了3层
class AuthorDetaiView(viewsets.ModelViewSet):
    permission_classes = [SVIPpermission,]
    throttle_classes = [] # 限制某个ip 每分钟访问次数不能超过20次
    # authentication_classes = [TokenAuth]
    # queryset serializer 这两个方法一定要定义成这个不然取不到值
    queryset = models.Author.objects.all()
    serializer_class = AuthorModelSerializers

限制(频率组件)

# 自定义局部限制

 

import time

# 自定义限制
VISIT_RECORD = {}
class VisitRateThrottle(object):
    def __init__(self):
        self.history = None

    def allow_request(self, request, view):

        """
        自定义频率限制60秒内只能访问三次
        """
        # 获取用户IP
        ip = request.META.get("REMOTE_ADDR")
        # 获取当前时间戳
        timestamp = time.time()

        # 如果当前访问ip没有在列表中 我就新建一个IP访问记录
        if ip not in VISIT_RECORD:
            VISIT_RECORD[ip] = [timestamp, ]
            # 可以通过验证
            return True

        # 如果列表中有值,我就取当当前ip地址 赋值给变量
        history = VISIT_RECORD[ip]
        self.history = history
        # 在列表头部 插入一个时间戳
        history.insert(0, timestamp)
        # 如果列表有值,最先插入的值小于 当前值-60 ,tiemstamp是当前时间是在增加的
        while history and history[-1] < timestamp - 60:
            # pop 取出 最后一个条件成立的值
            history.pop()
        # 列表中的时间戳 大于3 就返回falsse,否则通过
        if len(history) > 3:
            return False
        else:
            return True

    def wait(self):
        # 返回给前端还剩多少时间可以访问
        timestamp = time.time()
        # 求出还剩多少时间可以访问
        return 60 - (timestamp - self.history[-1])
# 视图使用
from
rest_framework.response import Response class AuthorModelView(viewsets.ModelViewSet): authentication_classes = [TokenAuth, ] permission_classes = [SVIPPermission, ] throttle_classes = [VisitRateThrottle, ] # 限制某个IP每分钟访问次数不能超过20次 queryset = Author.objects.all() serializer_class = AuthorModelSerializers # 分页 # pagination_class = MyPageNumberPagination # renderer_classes = []

# 全局使用

# 在settings.py中设置rest framework相关配置项
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth", ],
    "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission", ],
    "DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ],
}

二:内置频率类的使用

定义频率类,必须继承SimpleRateThrottle类:

复制代码
from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
# 必须配置scope参数 通过scope参数去settings中找频率限定的规则 scope = 'throttle'
# 必须定义 get_cache_key函数 返回用户标识的key 这里借用源码中BaseThrottle类(SimpleRateThrottle的父类)中的get_ident函数返回用户ip地址 def get_cache_key(self, request, view): return self.get_ident(request)
复制代码

局部使用:

视图函数中加上
throttle_classes = [VisitThrottle,]

全局使用:settings中配置

REST_FRAMEWORK={
    'DEFAULT_THROTTLE_CLASSES':['utils.common.VisitThrottle'],
# 局部使用也要在settings中配置上 DEFAULT_THROTTLE_RATES 通过self.scope取频率规则 (一分钟访问3次) 'DEFAULT_THROTTLE_RATES':{'throttle':'3/m',} }

设置错误信息为中文:

复制代码
class Course(APIView):
    authentication_classes = [TokenAuth, ]
    permission_classes = [UserPermission, ]
    throttle_classes = [MyThrottles,]

    def get(self, request):
        return HttpResponse('get')

    def post(self, request):
        return HttpResponse('post')
# 函数名为throttled 重写Throttled类中默认的错误信息 def throttled(self, request, wait): from rest_framework.exceptions import Throttled class MyThrottled(Throttled): default_detail = '访问频繁' extra_detail_singular = '等待 {wait} second.' extra_detail_plural = '等待 {wait} seconds.' raise MyThrottled(wait)



原文地址:https://www.cnblogs.com/Rivend/p/11844648.html