celery_django 组合

celery + django:

异步任务:

#task.py:
from celery import Celery

# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend)

# tasks
# 任务就是一个功能函数,执行任务就是执行函数,任务的结果就是函数的返回值
@app.task
def add(a, b):
    res = a + b
    print('a + b = %s' % res)
    return res

# 手动添加脚本文件:

from tasks import add
result = add.delay(x,y)    # 立即执行的异步任务
print(reult.id)


# get_result.py:
    
from tasks import app
from celery.result import AsyncResult
id = 'ad4ca85d-998a-4cc9-9d24-655986a169b3'
# async = AsyncResult(id=id, app=app)
# res = async.get()
# print(res)

if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

延时任务:

# add_task.py
from celery_task.tasks import add

# 添加延迟任务
from datetime import datetime, timedelta
result = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(seconds=10))
print(result)

#get_reule.py
from celery_task.celery import app
from celery.result import AsyncResult
id = 'f9d97749-7e48-4b56-a31c-69fe73f9c1a7' #redis库的id

if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

django-celery:

基本使用

django中使用celery有两种方式:
    上面建立特定目录结构的是一种,
	另一种django-celery模块,对于django版本有严格的要求,要是项目换了环境,就无法使用

  安装需要的版本:
        celery==3.1.25
        django-celery==3.1.20

celery_config.py:

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

task.py:

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

views.py:

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py:

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
原文地址:https://www.cnblogs.com/shaozheng/p/12177897.html