Celery的常用知识

什么是Clelery

  Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统。专注于实时处理的异步任务队列。同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

** 消息中间件 **
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

** 任务执行单元 **
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

** 任务结果存储 **
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名',backend='xxx',broker='xxx')

实例分析

基本使用
创建项目celerytest

创建py文件:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密码
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

创建py文件:add_task.py,添加任务

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

创建py文件:run.py,执行任务,或者使用命令执行:celery worker -A celery_app_task -l info
run.py

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

创建py文件:result.py,查看任务执行结果

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')
```执行 add_task.py,添加任务,并获取任务ID

执行 run.py ,或者执行命令:celery worker -A celery_app_task -l info

执行 result.py,检查任务状态并获取结果

更进一步的请参考下面参考文档

#### 参考文档

* [参考文档](https://www.jb51.net/article/158326.htm)
* [知乎经典](https://www.jianshu.com/p/620052aadbff)
原文地址:https://www.cnblogs.com/dylancao/p/12246407.html