python 带参装饰器 +reids ,处理高频操作

因业务需要写了个python的装饰器与redis同时使用,用于处理高并发操作,防止记录重复操作

装饰器部分代码

import hashlib
from functools import wraps
from api.api_state import APIState
from api.base_api import APIResponse
from utils.redis import acquire_lock_with_timeout, release_lock


def lock_operation(option_name: str, lock_key: str, option_type="GET", paramer_type="request"):
    """
    操作加锁防止同一记录暴击
    @param option_name: 操作名称
    @param paramer_type: 取参类型 : request
    @param option_type: 请求类型 :GET,POST
    @param lock_key: 取参关键字
    @return:
    """

    def wrapper(func):
        def redislock(request, *args, **kwargs):
            if paramer_type == "request":
                if option_type == "GET":
                    key_paramer = request.request.query_params.get(lock_key, None)
                else:
                    key_paramer = request.request.data.get(lock_key, None)
            else:
                key_paramer = kwargs.get(lock_key, None)

            lock_name = hashlib.md5(f'{option_name}{key_paramer}'.encode()).hexdigest()
            identifier = acquire_lock_with_timeout(lock_name)
            if not identifier:
                return APIResponse(status=APIState.PARAMTER_ERROR.value, msg='操作太快了,请稍后再试')
            try:
                return func(request, *args, **kwargs)
            finally:
                release_lock(lock_name, identifier)

        return redislock

    return wrapper

reids 处理部分

import json
import logging
import math
import time
import uuid
from decimal import Decimal

import redis
from django.utils.timezone import now
from django_redis import get_redis_connection

from users.models import TCustomerinfo

logger = logging.getLogger(__name__)

conn = get_redis_connection("default")

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return json.JSONEncoder.default(self, obj)


def get_redis_value_by_key(key):
    return conn.get(key)


def set_redis_value(key, value, expires):
    conn.set(key, value, expires)


def set_cache_data(key, data, TOKEN_EXPIRED_TIME):
    cache_data = json.dumps(data, cls=DecimalEncoder)
    set_redis_value(key, cache_data, TOKEN_EXPIRED_TIME)
    return cache_data


def acquire_lock_with_timeout(lockname, acquire_timeout=3, lock_timeout=5):
    logger.info(f'acquire_lock: {lockname}')
    # 128位随机标识符。
    identifier = str(uuid.uuid4())
    lockname = 'lock:' + lockname
    # 确保传给EXPIRE的都是整数。
    lock_timeout = int(math.ceil(lock_timeout))

    end = time.time() + acquire_timeout
    while time.time() < end:
        # 获取锁并设置过期时间。
        if conn.setnx(lockname, identifier):
            conn.expire(lockname, lock_timeout)
            return identifier
        # 检查过期时间,并在有需要时对其进行更新。
        elif not conn.ttl(lockname):
            conn.expire(lockname, lock_timeout)

        time.sleep(.001)

    logger.info(f'can not lock: {lockname}')
    return False


def release_lock(lockname, identifier):
    logger.info(f'release_lock: {lockname}')
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname

    while True:
        try:
            # 检查并确认进程还持有着锁。
            pipe.watch(lockname)
            #取得锁值
            lockname_value = pipe.get(lockname)
            if lockname_value:
                lockname_value = lockname_value.decode()

            if lockname_value == identifier:

                # 释放锁。
                pipe.multi()
                pipe.delete(lockname)
                pipe.execute()
                logger.info("释放结束")
                return True

            pipe.unwatch()
            break

        # 有其他客户端修改了锁;重试。
        except redis.exceptions.WatchError:
            pass

    # 进程已经失去了锁。
    return False

使用方式:

class OrderLogisticsDetailsView(APIView):
    """
    get
        express_id:
        express_no:
    """

    @lock_operation(option_name="WL", paramer_type="kwargs", option_type="GET", lock_key="express_no")
    def get(self, request, *args, **kwargs):
        express_no = kwargs.get("express_no", None)
        express_id = kwargs.get("express_id", None)
        if (not express_id is None) and (not express_no is None):
            Logisticsinfo_ins = TLogisticsinfo.objects.filter(ls_id=express_id).first()
            ret = query_logistics_information(Logisticsinfo_ins, express_no)

            if ret:
                return ret

        return APIResponse(status=APIState.PARAMTER_ERROR.value)

参考链接:
https://mp.weixin.qq.com/s/nPNowhyNPQah-eQBbsj29Q

原文地址:https://www.cnblogs.com/YQYC/p/14250184.html