celery异步任务体系笔记

1.异步框架示意图

2.celery 官方文档

http://docs.celeryproject.org/en/latest/index.html

3.启动celery的命令

启动 sender

自动检测
celery -A app beat -l info

启动worker

自动检测
celery -A app worker  -l info
4.broker和backend

broker(任务存储)和backend(结果存储)都是用的是rmq,backend中的数据会有自动过期机制,1小时后结果将自动过期(可以任意修改)。

5.架构设计

1.发送任务

所有的复杂逻辑都在发送端进行处理,将需要执行的任务序列确定好后,放入rmq中,不考虑在rmq上有什么复杂操作。 暂时只用一个docker,将所有的异步任务都放到rmq中。

2.执行任务

执行任务只是简单的运行具体的函数比如发送邮件之类,从rmq 中获取到该worker相对应的任务直接执行,逻辑较为简单。可以根据任务的数量动态的增减docker的数目。一类任务是一个task.py文件,相当于任务之间相互隔离,每个worker只执行特定的一种任务。

3.获取任务结果

提交任务后会返回任务的唯一id,需要主动去根据任务id获取执行结果。要把任务id和一些必要信息存在mysql中。检查逻辑应该跟具体任务来设计,会关联到不同的事件。

发送邮件任务

1.celery的配置

1.1beat端

启动命令

Python
# 启动timed 命令
celery -A mtk_celery.cel worker -l info -c 1 -Q timed
# 启动job 命令
celery -A mtk_celery.cel worker -l info -Q job
# 启动beat命令
celery -A mtk_celery.cel beat -l info
# celery 路由设置
# -c 指定 worker 数目
# -Q 指定 队列类型
# 需要启动两种不同的执行者 timed 类型 只能启动一个worker , job 类型没有限制
 
celery路由相当于给不同任务分组,给不同类型的任务指定不同的queue,启动时带上参数q,该worker,会只完成该queue中的异步任务
task_routes = {'mtk_celery.task': {'queue': 'job'},
'mtk_celery.edm_timed': {'queue': 'timed'},
'mtk_celery.edm_jobs': {'queue': 'job'}
}

celery定时任务 配置

Python
# 定义定时任务
beat_schedule = {
'edm_timed': {
# 具体需要执行的函数
# 该函数必须要使用@app.task装饰
'task': 'mtk_celery.edm_timed',
# 定时时间
# 每分钟执行一次,不能为小数
'schedule': crontab(minute='*/10'),
# 或者这么写,每小时执行一次
# "schedule": crontab(minute=0, hour="*/1")
# 执行的函数需要的参数
'args': ()
}
}
# 将定时任务加入beat中
cel.conf.update(
result_expires=3600 * 2,
beat_schedule=beat_schedule,
task_routes=task_routes
)
 

2.worker端

2.1 timed端

将需要执行的任务指定在配置中,启动beat 即可,timed 会捕获到该任务进行执行,由于会有并发冲突,可能会读取到同一个edm,所以单独拆分出来,该worker只拥有一个,保证不会出现线程不安全问题。

2.2 send端

不存在并发线程不安全问题,直接启动多个处理端发送即可。

 

原文地址:https://www.cnblogs.com/xiao-xue-di/p/13824907.html