[Flask]celery异步任务队列的使用

Celery异步任务队列

目录结构树:

配置文件config.py:

# 设置中间人地址
broker_url = 'redis://127.0.0.1:6379/1'

  

主main.py:

import sys
import os

from celery import Celery

from flask import Flask
from flask_mail import Mail

CELERY_DIR = os.path.dirname(os.getcwd())
sys.path.insert(0, CELERY_DIR)
import config

mail = Mail()

app = Flask(__name__)
app.config.from_object(config.config.get(os.environ.get('env'))
mail.init_app(app)


def make_celery(app):
    # 创建celery对象并设置
    celery = Celery(app.import_name)
    # 加载配置
    celery.config_from_object('celery_tasks.config')
    # celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    # 启动celery worker时自动发现任务
    celery.autodiscover_tasks(['celery_tasks.email',])
    return celery

celery = make_celery(app)

  

任务函数tasks.py:

from flask_mail import Message

import config
from celery_tasks.main import celery, mail


# 使用装饰器将send_email函数装饰成任务函数
@celery.task(name='send_email')
def send_email(to, subject, html_message):

    msg = Message(
        subject,
        sender=config.Config.MAIL_USERNAME,
        html=html_message,
        recipients=[to]
    )
    mail.send(msg)


if __name__ == '__main__':
    send_email.delay('xx@xx.com', 'xx', 'xx')

  

启动命令:

celery worker -A main.celery -l info

  

发出任务函数:

send_email.delay('xx@xx.com', 'xx', 'xx')
原文地址:https://www.cnblogs.com/ttkl/p/10819181.html