5.0更新
celery -A celery_task beat
celery -A celery_task worker
Celery
官方
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
注意:
1
|
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
|
Celery异步任务框架
1 2 3 4 5 6 7 8 9
|
""" 1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求 """
|
Celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
Celery的安装配置
pip install celery
消息中间件:RabbitMQ/Redis
app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)
两种celery任务结构:提倡用包管理,结构更清晰
Celery执行异步任务
基本结构
1 2 3 4 5 6 7 8 9 10
|
import celery import time
|
包架构封装(多任务结构)
1 2 3 4 5 6 7
|
project ├── celery_task │ ├── __init__.py │ ├── celery.py │ └── tasks.py ├── add_task.py └── get_result.py
|
基本使用
celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
|
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
from .celery import app import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m
@app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m
|
add_task.py
1 2 3 4 5 6 7 8 9 10 11 12
|
from celery_task import tasks
|
get_result.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
|
高级使用
celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
|
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
from .celery import app
import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m
@app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m
|
get_result.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
|
django中使用
celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
""" celery框架django项目工作流程 1)加载django配置环境 2)创建Celery框架对象app,配置broker和backend,得到的app就是worker 3)给worker对应的app添加可处理的任务函数,用include配置给worker的app 4)完成提供的任务的定时配置app.conf.beat_schedule 5)启动celery服务,运行worker,执行任务 6)启动beat服务,运行beat,添加任务
重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下 """
|
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
from .celery import app
from django.core.cache import cache from home import models, serializers from django.conf import settings @app.task def update_banner_list(): queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT] banner_list = serializers.BannerSerializer(queryset, many=True).data for banner in banner_list: banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']
cache.set('banner_list', banner_list, 86400) return True
|