Celery--基本使用

使用celery包含三步:
1. 定义任务函数。
2. 运行celery服务。
3. 客户应用程序的调用。
 
一、直接方式
1.创建一个文件 tasks.py:
from celery import Celery
broker
= 'redis://127.0.0.1:6379/5' backend = 'redis://127.0.0.1:6379/6' app = Celery('tasks', broker=broker, backend=backend)
@app.task
def add(x, y):   return x + y
Celery的第一个参数是当前模块名称,这个参数是必须的,第二个参数是中间人关键字参数,传入了broker和backend。然后创建了一个任务函数add。
 
2.运行 Celery 职程服务器
celery -A tasks worker --loglevel=INFO (这条命令必须在tasks.py 所在的目录执行)
参数-A指定了Celery实例的位置,这个实例是在tasks.py文件中,Celery会自动在该文件中查找Celery对象实例。
--loglevel指定日志的级别,默认是warning。
在生产环境中你会想要让职程作为守护程序在后台运行。你需要用你所在平台提供的工具来实现,或是像 supervisord 这样的东西(更多信息见 Running the worker as a daemon)。
 
3.调用任务
可以用 delay() 方法来调用任务,这是 apply_async() 方法的快捷方式,只是 apply_async 可以进行更多的任务属性设置,比如 callbacks/errbacks 正常回调与错误回调、执行超时、重试、重试时间等等,具体参数可以参考这里
新建一个 main.py 文件 代码如下:
from tasks import add 

r = add.delay(2, 2)
r = add.delay(3, 3)
print(r.ready()) 
print(r.result)
print (r.get())
在celery命令行可以看见celery执行的日志。打开 backend的redis,也可以看见celery执行的信息。
 
二、使用配置文件方式
这一次创建 app,并没有直接指定 broker 和 backend,而是在配置文件。
Celery 的配置比较多,可以在 官方配置文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html  查询每个配置项的含义。
首先创建一个python包,姑且命名为proj。目录文件如下:
proj            # 项目根目录
├── celery_app         # 存放 celery 相关文件
│ ├── __init__.py     # 创建 celery 实例
│ ├── config.py        # 配置文件
│ └── tasks.py         # 任务函数
└── client.py            # 应用程序
 
__init__.py:
from celery import Celery

app = Celery('demo')                                # 创建 Celery 实例
app.config_from_object('celery_app.config')   # 通过 Celery 实例加载配置模块
celeryconfig.py:
#简单方式
#BROKER_URL = 'redis://127.0.0.1:6379/5'
#CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'

#其它方式
CELERY_IMPORTS = ( 'celery_app.tasks', )   # 指定导入的任务模块
CELERY_IGNORE_RESULT = False
BROKER_HOST = '127.0.0.1'
BROKER_PORT = 5672
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_TIMEZONE='Asia/Shanghai'       # 指定时区,默认是 UTC
可以使用 python -m celeryconfig 来验证配置是否正确。
tasks.py:
from celery_app import app

@app.task(name='tasks.add')
def add(x, y)
  return x + y
client.py:
#方式一
#from celery_app import tasks
#tasks.add.apply_async(args=[2, 8])        # 也可用 tasks.add.delay(2, 8)

#方式二 from celery_app import app
r
=app.send_task("tasks.add",[1,3]) print(r.ready()) print(r.status)
使用方法也很简单,在 proj 的同一级目录执行 celery:
celery -A celery_app worker -l INFO --beat
启动celery后台服务,这里是测试与学习celery的教程。在实际生产环境中,后台进程通常是需要作为守护进程运行在后台的,可以使用supervisor作为进程管理工具。
 -l info     与--loglevel=info的作用是一样的。
 --beat    周期性的运行。即设置 心跳。
 
指定路由
tasks.py:
from celery_app import app
import time
# 视频压缩
@app.task
def video_compress(video_name):
    time.sleep(10)
    print 'Compressing the:', video_name
    return 'success'

#上传视频 @app.task def video_upload(video_name): time.sleep(5) print u'正在上传视频' return 'success'

# 其他任务 @app.task def other(str): time.sleep(10) print 'Do other things' return 'success
celeryconfig.py:
from kombu import Exchange,Queue
BROKER_URL
= "redis://10.32.105.227:6379/5" CELERY_RESULT_BACKEND = "redis://10.32.105.227:6379/0" CELERY_QUEUES = (    Queue("default",Exchange("default"),routing_key="default"),    Queue("for_video_compress",Exchange("for_video_compress"),routing_key="video_compress"),    Queue("for_video_upload",Exchange("for_video_upload"),routing_key="video_upload")  ) CELERY_ROUTES = { 'tasks.video_compress':{"queue":for_video_compress","routing_key":"video_compress"}, 'tasks.video_upload':{"queue":"for_video_upload","routing_key:"video_upload"} }
在 celeryconfig.py 文件中,首先设置了broker以及result_backend,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange(当使用Redis作为broker时,Exchange的名字必须和Queue的名字一样)以及routing_key的值。
 
现在在一台主机上面启动一个worker,这个worker只执行for_video_compress队列中的消息,这是通过在启动worker是使用-Q Queue_Name参数指定的。
celery -A tasks worker -l info -n worker.%h -Q for_video_compress
然后到另一台主机上面执行下面代码启动video_compress: 
from tasks import *
video_compress_re = video_compress.delay(video_name)
执行完上面的代码之后,video_compress消息会被立即发送到for_video_compress队列中去。此时已经启动的worker.atsgxxx 会立即执行video_compress任务。
 
重复上面的过程,在另外一台机器上启动一个worker专门执行for_video_upload中的任务。
from tasks import *
video_upload_re = video_upload.delay(video_name)
在上面的 tasks.py 文件中还定义了other任务,但是在celeryconfig.py文件中没有指定这个任务route到那个Queue中去执行,此时执行other任务的时候,other会route到Celery默认的名字叫做celery的队列中去。
因为这个消息没有在celeryconfig.py文件中指定应该route到哪一个Queue中,所以会被发送到默认的名字为celery的Queue中,但是我们还没有启动worker执行celery中的任务。接下来我们在启动一个worker执行celery队列中的任务。
celery -A tasks worker -l info -n worker.%h -Q celery
 

原文地址:https://www.cnblogs.com/absoluteli/p/14016705.html