Django中使用celery

一般使用celery来做Django的异步消息队列

先安装必要的包

pip3 install celery

我的项目目录结构:

celeryApp.py

 1 import celery
 2 import os
 3 
 4 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "videoApp.settings")
 5 
 6 import django
 7 django.setup()
 8 
 9 app = celery.Celery(main="celery_app", broker='redis://localhost:6379/2')
10 
11 app.autodiscover_tasks(['apps.video_manager'])
app.autodiscover_tasks()
接收参数为tasks.py文件所在的包

看一下 autodiscover_tasks的源码:

看注释,说的很清楚,如果你的tasks.py文件在 foo.bar.tasks.py, 那么参数传递 foo.bar,该方法接受列表,会将列表里的所有tasks.py的任务都注册

任务函数:

apps/video_manager/tasks.py

1 from videoApp.celeryApp import app
2 import time
3 
4 @app.task(name="fir_task")
5 def my_task():
6     with open('yeah.txt', 'w') as f:
7         f.write(str(time.time()))
8     return "success"

现在在项目根目录执行命令,开启celery

 celery -A videoApp.celeryApp worker -l info

控制台会显示info信息,注意看:

如果tasks里的函数出现在了这里,说明任务注册成功了,否则注册失败,如果注册失败,在进行调用的时候,会报错,任务未注册

任务里的return,return出去的数据仅仅是作为日志显示用的,目前未发现有别的用处。

 需要补充的是,任务一旦注册,代码就会写到内存里,这时候即便更新了任务代码再调用,也是执行的旧代码,需要重启celery才会再次更新新代码。

任务调用方式:

1 from apps.video_manager.tasks import my_task
2 
3 my_task.delay(*args, **kwargs)

delay的参数就是任务接受的参数,如果注册的任务接受参数,那么调用时将参数传递给delay即可。

原文地址:https://www.cnblogs.com/haiton/p/12581353.html