Django——django-celery异步任务设置过期时间

django-celery异步任务设置过期时间

场景

在django做项目的时候,因为一些特殊的场景,所以需要用到异步操作,比如发短信,发邮件。设置了django-celery,通过redis作为中间件存储。有一次redis意外死亡了,过了很久才有人提出来,说登录短信接收不到,看了日志发现了问题,重启了redis以后,手机收到了一堆的短信轰炸

分析

因为django-celery的生产消费者模型,待消费的任务队列,没有过期时间,所以复活的redis,将未执行的任务全部执行了。

于是我在本地检测了一下,关掉celery以后,运行redis,执行异步操作,会在redis中你设置的数据库中生成一个名为celery的列表(keys *),看了一下里面的内容(lrange celery 0 -1),内容结构如下:

b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "shadow": null, "eta": null, "expires": "2021-05-19T17:12:04.087801+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"}}',
 b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "shadow": null, "eta": null, "expires": "2021-05-19T17:11:55.480755+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "34dc18c5-b960-4b29-80d6-126e76e3d161"}}'

单独查看一条数据(lindex celery 0)

{
	"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
	"content-encoding": "utf-8",
	"content-type": "application/json",
	"headers": {
		"lang": "py",
		"task": "permission_changed",
		"id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
		"shadow": null,
		"eta": null,
		"expires": null,
		"group": null,
		"group_index": null,
		"retries": 0,
		"timelimit": [null, null],
		"root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
		"parent_id": null,
		"argsrepr": "(5, 1)",
		"kwargsrepr": "{}",
		"origin": "gen66816@wjh-MacBook-Pro.local"
	},
	"properties": {
		"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
		"reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8",
		"delivery_mode": 2,
		"delivery_info": {
			"exchange": "",
			"routing_key": "celery"
		},
		"priority": 0,
		"body_encoding": "base64",
		"delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"
	}
}

我发现里面有一个属性expires ,这不就是设置过期时间的吗。
于是我开始看celery的源码,哪里能设置这个expires属性,切入点在.delay()方法这里,

    def delay(self, *args, **kwargs):
        return self.apply_async(args, kwargs)

再看apply_async方法,因为没有看到expires关键字参数,所以我猜他在options里面,于是找了一下options的取值
看到这么一段

preopts = self._get_exec_options()
options = dict(preopts, **options) if options else preopts

再看_get_exec_options方法

    def _get_exec_options(self):
        if self._exec_options is None:
            self._exec_options = extract_exec_options(self)
        return self._exec_options

果然,在extract_exec_options中找到了redis中存储的属性的关键字由来

extract_exec_options = mattrgetter(
    'queue', 'routing_key', 'exchange', 'priority', 'expires',
    'serializer', 'delivery_mode', 'compression', 'time_limit',
    'soft_time_limit', 'immediate', 'mandatory',  # imm+man is deprecated
)

因为preopts = self._get_exec_options()的self,本身类是class Task:
所以,只要在你定义的task任务中的task装饰器上传入expires属性,就可以了
至于应该传什么样的值,在源码中找到这么一段(挺不好找的,amqp.py/AMQP构造方法下的task_protocols属性)

        if isinstance(expires, numbers.Real):
            self._verify_seconds(expires, 'expires')
            now = now or self.app.now()
            timezone = timezone or self.app.timezone
            expires = maybe_make_aware(
                now + timedelta(seconds=expires), tz=timezone,
            )

很明显,seconds是秒,所以最后expires属性在redis中存储的是一个准确的时间格式,这里有一个细节
又是一个小细节
时间的存储涉及到了时区,celery的当前时间取的是self.app.now(),于是乎

        def now(self):
        """Return the current time and date as a datetime."""
        now_in_utc = to_utc(datetime.utcnow())
        return now_in_utc.astimezone(self.timezone)

WTF?竟然默认用的是UTC,所以我选择先加上expires属性给个5分钟看看会不会生效以及会不会差8个小时
果然!生效了,果然,差了8个小时
又是一个小细节
怎么办呢?我选择重写celerynow方法
只设置now,不管timezone真的可以吗?答案是可以的,看一下maybe_make_aware方法

def maybe_make_aware(dt, tz=None):
    """Convert dt to aware datetime, do nothing if dt is already aware."""
    if is_naive(dt):
        dt = to_utc(dt)
        return localize(
            dt, timezone.utc if tz is None else timezone.tz_or_local(tz),
        )
    return dt

dt, timezone.utc if tz is None else timezone.tz_or_local(tz),这里如果时间有时区,就用自己的
于是乎

实现

django-celery的项目结构,网上一抓一大把,我就不写了,先看main文件

import os
from celery import Celery

from django.utils import timezone

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SHBMCRM.settings')


class MyCelery(Celery):

    def now(self):
        return timezone.localtime()  # 使用django自带的时间,会根据settings中设置的TIME_ZONE获取当前时间,很方便


celery_app = MyCelery('SHBMCRM')
celery_app.config_from_object('celery_tasks.config')
celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.message'])
# 设置过期时间 60s * 5 = 5分钟
@celery_app.task(name='new_member', expires=60*5)  
def new_member(mobile, user):
    # do somethings

效果

{
	"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
	"content-encoding": "utf-8",
	"content-type": "application/json",
	"headers": {
		"eta": null,
		"expires": "2021-05-20T10:26:41.945686+08:00",   // 有过期时间了
		"group": null,
		"kwargsrepr": "{}",
		"origin": "gen66816@wjh-MacBook-Pro.local"
	},
}
第一条就是过期了
[2021-05-20 10:21:42,017: INFO/MainProcess] Received task: permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094]   expires:[2021-05-20 10:26:41.945686+08:00]
第二条执行成功
[2021-05-20 10:21:42,036: INFO/ForkPoolWorker-1] Task permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] succeeded in 0.01773979200000042s: None

创作不易,转载请注明出处及附带链接

原文地址:https://www.cnblogs.com/pywjh/p/14793140.html