flask中celery的使用

目标:让一些耗时操作实现异步执行

员工:celery, redis, celery_worker 

场景:现在从北京到上海,有一批货物要运送,一共6卡车的货物。第一种方法,就是客户(任务发布方)自己开一辆车从北京到上海,需要6天。第二种方案,把货卸载到快递网点(redis),把货物贴上序号(消息队列),快递员(celery)叫来三辆货车(celery_worker)依次拉货,需要2天。

举例:一个耗时任务,需要耗时3个小时 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。

思路:将任务编号,组成一个队列,放到redis数据库中,然后让celery的进程去队列里依次执行这些函数,执行完,将结果返回给数据库中

步骤:

  1. 谁干:实例化celery
  2. 在哪干:redis数据库地址告诉celery
  3. 授权:哪些函数对象可以异步,用装饰器@celery.task授权这些函数可以
  4. 执行:耗时函数执行异步,调用delay方法

代码:

 1 import time
 2 from flask import Flask
 3 from celery import Celery
 4 
 5 app = Flask(__name__)
 6 app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
 7 app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
 8 
 9 # 一,谁干?实例化celery对象,让celery来干
10 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
11 # 二,哪里干?redis数据库地址告诉celery
12 celery.conf.update(app.config)
13 
14 # 三,授权,celery.task表明哪些函数可以在celery worker里运行
15 @celery.task
16 def my_background_task(arg1):
17     time.sleep(arg1)
18     return arg1
19 
20 
21 @app.route('/', methods=['GET', "POST"])
22 def hello_world():
23     # 四,函数用delay方法表名函数是在celery worker中运行
24     celery_id=my_background_task.delay(30)
25     return "<h1>{}!<h1>".format(celery_id)
26 
27 
28 if __name__ == '__main__':
29     app.run()

 celery的配置,也可以通过定义配置类来实现:

 1 import time
 2 from flask import Flask
 3 from celery import Celery
 4 
 5 
 6 class CeleryConfig():
 7     timezone = 'UTC'
 8     BROKER_URL = 'redis://localhost:6379/0'  # 消息队列存放地址
 9     CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # celery worker 执行结果返回存放地址
10     CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区
11     CELERY_ACKS_LATE = True  # 只有当worker执行完任务后,才会告诉MQ,消息被消费。
12     CELERYD_FORCE_EXECV = True  # 非常重要,有些情况下可以防止死锁
13     CELERY_IGNORE_RESULT = True  # 忽略结果,不关心运行结果时可以关闭
14     CELERY_TASK_SERIALIZER = 'json'  # 任务序列化方式
15     CELERY_DISABLE_RATE_LIMITS = True  # 对任务消费的速率进行限制开关
16     CELERYD_PREFETCH_MULTIPLIER = 1  # worker预先获取任务数量
17     CELERYD_MAX_TASKS_PER_CHILD = 30  # worker最大执行任务数,超过数量销毁,防止内存泄漏等问题
18     CELERY_CREATE_MISSING_QUEUES = True  # 队列不存在即创建
19     BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 7 * 24 * 60 * 60, 'max_retries': 1}  # celery worker超时自动重启时间
20     CELERYD_CONCURRENCY = 3  # celery worker 最大并行数
21 
22 
23 app = Flask(__name__)
24 # 一,谁干?实例化celery对象,让celery来干
25 celery = Celery(app.name)
26 # 二,哪里干?redis数据库地址告诉celery
27 celery.config_from_object(CeleryConfig)
28 
29 
30 # 三,授权,celery.task表明哪些函数可以在celery worker里运行,time_limit是celery worker最大存活时间,单位是s,超时进程自杀
31 @celery.task(time_limit=3 * 60 * 60)
32 def my_background_task(arg1):
33     time.sleep(arg1)
34     return arg1
35 
36 
37 @app.route('/', methods=['GET', "POST"])
38 def hello_world():
39     # 四,函数用delay方法表名函数是在celery worker中运行
40     celery_id = my_background_task.delay(30)
41     return "<h1>{}!<h1>".format(celery_id)
42 
43 
44 if __name__ == '__main__':
45     app.run()
全世界的程序员们联合起来吧!
原文地址:https://www.cnblogs.com/chaojiyingxiong/p/15058833.html