celery 异步发送短信验证码、延迟任务

短信

## celery.py
import os, django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup()


from celery import Celery

broker = 'redis://127.0.0.1:6379/15'
backend = 'redis://127.0.0.1:6379/15'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 自动任务的定时配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
    # 定时任务:任务名自定义
    # 'update_banner_cache': {
    #     'task': 'celery_task.tasks.update_banner_cache',  # 任务源
    #     'args': (),  # 任务参数
    #     'schedule': timedelta(seconds=8), # 定时添加任务的时间
    #     # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
    # },
    'send_msg': {
        'task': 'celery_task.tasks.send_msg',
        'args': (),
        'schedule': timedelta(seconds=10),
    }
}

tasks.py

from .celery import app

from home.models import Banner
from django.conf import settings
from django.core.cache import cache
from home.serializers import BannerModelSerializer

@app.task

def update_banner_cache():
    banner_query = Banner.objects.filter(is_delete = False, is_show=True).order_by('-order')[:settings.BANNER_COUNT]
    banner_list = BannerModelSerializer(banner_query, many=True).data
    for banner_dic in banner_list:
        banner_dic['image'] = settings.BASE_URL + banner_dic['image']
    cache.set('banner_list', banner_list)


#异步定时发送手机验证码
from libs.tx_sms import sms
@app.task
def send_msg():
    code = sms.get_code()
    mobile = '13322332233'
    result = sms.send_msg(mobile, code)
    print(result)

然后Terminal开两个窗口,回到根目录,执行, celery worker -A celery_task -l info -P eventlet 和 celery beat -A celery_task -l info 则10s会发一次验证码

延迟任务

## celery.py
from celery import Celery

# broker:任务仓库
broker = 'redis://127.0.0.1:6379/15'
# backend:任务结果仓库
backend = 'redis://127.0.0.1:6379/15'
# include:任务(函数)所在文件
app = Celery(broker=broker, backend=backend, include=['celery_package.tasks'])


task.py

from .celery import app

@app.task
def jump(n1, n2):
    res = n1 * n2
    print('n1 * n2 = %s' % res)
    return res

add_task.py


from celery_package.tasks import jump

# 直接执行函数
# jump(10, 20)

# 添加celery立即任务
# jump.delay(10, 20)

# 添加延迟任务
# args是jump任务需要的参数,没有就设置为空()
# eta是该任务执行的UTC格式的时间
from datetime import datetime, timedelta
def eta_second(second):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_ctime + time_delay
def eta_days(days):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(days=days)
    return utc_ctime + time_delay

# apply_async就是添加延迟任务
jump.apply_async(args=(200, 50), eta=eta_second(10))

**右键运行add_tasks. 然后再Terminal窗口执行celery worker -A celery_package -l info -P eventlet 则过10s后,才会产生运算结果 **

原文地址:https://www.cnblogs.com/michealjy/p/11979233.html