Flask+Celery 异步任务

Flask+Celery 异步任务

一、 安装

  1. pip install redis
  2. pip install celery
    之所以要安装redis,是因为需要redis作为celery的消息中间件

二、配置

  1. 新建一个celery_fun.py作为celery的配置文件,并把所有相关方法放进此py。
# ============== create celery ==============
from celery import Celery

# 配置
redis_url = f'redis://:{REDIS_INFO.get("password")}@{REDIS_INFO.get("host")}:{REDIS_INFO.get("port")}/{REDIS_INFO.get("celery_db")}'

# 初始化celery
celery_app = Celery("auto_app",
                    broker=redis_url,
                    backend=redis_url
                    )

三、调用

  1. 先在celery.py定义需要使用celery的函数。
@celery_app.task()
def add_active(active_info):
    """
    异步添加信息
    :param active_info:
    :return:
    """
    # PyMongo 不是进程安全,所以子进程需要创建自己的连接
    mongo = MongoClient()
    mongo.add_data(MONGO_ACTIVE_TASK, active_info)
    mongo.close()
  1. 在Flask里接口进行调用
@assist.route('add_active_info', methods=['POST'])
def add_active_info():
    """添加信息"""
	return_result={'ret_data':{}}
	
	# 调用celery
    celery_result = add_active.delay(active_info)
    
    return_result['ret_data']['celery_id'] = celery_result.id
    return jsonify({'messsage':return_result}),200

  1. 可以提供一个接口查询任务的状态
@auto_app.route('/get_celery_state', methods=['POST'])
def get_celery_state():
    info = request.get_json()
    celery_id = info.get('celery_id')
    result = AsyncResult(celery_id, app=celery_app)
    summary = {
        "state": result.state,
        "id": result.id,
    }
    return jsonify(summary), 200

四、扩展

  1. delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 task在一分钟后再执行
add_active.apply_async(active_info, countdown=60)
  1. delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。
  2. bind为True,会传入self给被装饰的方法,这个参数会让 Celery 将 Celery 本身传入
@celery.task(bind=True)

五、启动

  1. redis需要开启服务
  2. celery启动命令celery -A celery_fun.celery_app worker --loglevel=info
  3. 启动flask
原文地址:https://www.cnblogs.com/zy-mousai/p/14830412.html