Flask-爱家租房项目ihome-06-celery发送短信

Celery简介

celery是一个专注于实时处理和任务调度的分布式任务队列。主要用来异步处理一些发送邮件或者短信之类的耗时操作.

工作流程为任务的生产者(producer)产生任务, 把任务放入中间人(broker)的队列中, 然后任务消费者(worker)broker队列中获取任务, 并执行该任务, 若该任务有返回值, 则可以把返回值放入存储(backend)中, 后续生产者再去backend提取结果.

image-20200826105229560

在Flask中使用Celery

创建celery对象

from flask import Flask
from celery import Celery
# 创建flask的app对象
app = Flask(__name__)
# 可以将celery相关配置项配置到app的配置中
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
# 创建celery对象, 必须指定broker, 不然worker启动时会报错
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# 让celery对象使用app中的配置
celery.conf.update(app.config)

定义任务

# 定义的任务需要使用celery.task装饰器装饰
@celery.task
def my_task(arg1, arg2):
    return arg1+arg2

调用任务

# 在flask后台逻辑中调用定义的任务, 注意要使用delay方法才能进行异步处理, 否则只会同步处理
task = my_task.delay(10, 20)

启动worker

# tasks为定义任务的文件路径
celery -A tasks worker -l=info

在本项目中使用celery发送短信

在flask配置文件中添加celery配置

# config.py
class BasicConfig:
    ......
    # 远程服务器
    REMOTE_SERVER = 'alex-gcx.com'
    # redis中定义celery使用的2号库
    REDIS_CELERY_DB = 2  # celery的broker/backend
	# celery配置
    CELERY_BROKER_URL = f'redis://{REMOTE_SERVER}:6379/{REDIS_CELERY_DB}'
    CELERY_RESULT_BACKEND = CELERY_BROKER_URL  # backend和broker使用同一个redis库

创建celery对象

一开始我是在flask app的应用工厂程序create_app中创建的celery, 就像之前的redis_connect一样

# ihome/api_1_0/__init__.py
celery = None  # 初始化为None
# 创建应用工厂
def create_app():
    ......
    app = Flask(__name__)  # 创建flask应用
    ......
    # 重新设置celery
    global celery
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

定义任务

ihome/api_1_0下创建任务文件tasks.py, 添加发送短信的任务代码

from ihome import celery
from ronglian_sms_sdk import SmsSDK
from ihome.utils import constants
from ihome.utils.response_codes import RET
from flask import current_app
import json

@celery.task
def send_sms_code(sms_code, phone):
    """发送短信任务"""
    result = {'errno': RET.OK}
    try:
        sdk = SmsSDK(constants.ACCID, constants.ACCTOKEN, constants.APPID)
        tid = '1'
        mobile = phone
        # 短信模板为:....您的验证码是{1},请于{2}分钟内正确输入
        # data参数为元组,第一个值为模板中的{1},第二个值为模板中的{2}
        data = (sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60)
        # 发送短信,接受返回值
        sms_resp_json = sdk.sendMessage(tid, mobile, data)
    except Exception as e:
        current_app.logger.error(e)
        result['errno'] = RET.THIRDERR
        result['errmsg'] = '发送短信异常'
    else:
        # 处理返回值
        sms_resp_dict = json.loads(sms_resp_json)
        sms_status = sms_resp_dict.get('statusCode')
        if sms_status not in ('000000', '112310'):
            # 发送失败
            result['errno'] = RET.THIRDERR
            result['errmsg'] = sms_resp_dict.get('statusMsg')
    return result

调用任务

ihome/api_1_0/verify_codes.py的短信接口中调用发送短信的任务

from . import tasks

@api.route("/sms_codes/<re(r'1[^120]d{9}'):phone>")
def send_sms_code(phone):
	.......
    tasks.send_sms_code.delay(sms_code, phone)
    ......

启动worker

在终端中进入ihome项目的根目录, 执行启动命令

(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
Error: 
Unable to load celery application.
'nonetype' object has no attribute 'task'

解决celery创建对象的错误

  • 报错说nonetype, 原来是启动worker时, 确实导入了celery对象, 但是这个对象只做了初始化为None的这一步, 并没有走应用工厂create_app中的重设celery的方法.

  • 因为worker的启动命令是独立于flask应用启动的, 尽管那边已经把整个flask项目启动起来了, 但是worker的启动和flask的启动是没有关系的. 所以根据worker的启动命令, 在tasks.py文件中从ihome包导入了celeryfrom ihome import celery, 因此程序会执行ihome__init__.py文件, 因此会执行celery = None, 但是create_app方法内部的代码并不会执行, 只是定义了一个create_app方法而已, 所以最终celery的结果就为None

  • 但是因为这里需要让celery使用app对象的配置, 所以得在app的创建后才能创建celery对象, 于是把创建celery对象的语句移到了manager.py中创建app的语句app = create_app('dev')后面

    # manager.py
    app = create_app('dev')
    # 创建celery对象
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    

    定义任务的tasks.py文件中导入celery对象语句也需要改成from manager import celery

  • 再次运行worker启动命令, 发现可以正常启动

(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
 
 -------------- celery@alex v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.3.0-3-amd64-x86_64-with-Deepin-20-apricot 2020-08-26 12:28:44
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         ihome:0x7f281f2045f8
- ** ---------- .> transport:   redis://alex-gcx.com:6379/2
- ** ---------- .> results:     redis://alex-gcx.com:6379/2
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
[tasks]
  . ihome.api_1_0.tasks.send_sms_code

[2020-08-26 12:28:44,525: INFO/MainProcess] Connected to redis://alex-gcx.com:6379/2                                                                                   
[2020-08-26 12:28:44,617: INFO/MainProcess] mingle: searching for neighbors
[2020-08-26 12:28:45,788: INFO/MainProcess] mingle: all alone
[2020-08-26 12:28:46,034: INFO/MainProcess] celery@alex ready.

解决包循环导入的错误

但是在刷新flask应用的网页时, 又报错了:

ImportError: cannot import name 'celery' from 'manager' (/home/alex/python/FlaskIhome/manager.py)

一般报错说不能导入某个模块cannot import name xxxx而不是找不到某个模块No module named xxxx, 那就不是模块没有安装的问题, 而是循环导入的问题, 即导包时发生了死锁. 理一下导包的过程就能发现问题了.

  • worker的启动文件tasks.py头部导入了manager模块: from manager import celery
  • manager模块导入并执行了ihome模块的create_app方法
  • create_app方法中导入了蓝图api_1_0: from ihome.api_1_0 import api
  • 在蓝图api_1_0中导入了发送短信的视图文件from . import verify_codes
  • 在视图文件verify_codes头部又导入了celery任务文件tasks: from . import tasks

image-20200826130459411

所以解决办法是延迟导入: 把verify_codes.py中导入任务from . import tasks从头部移到send_sms_code方法中调用任务tasks.send_sms_code.delay(sms_code, phone)的上一行, 这样导入时就不会执行导入tasks这句话了, 只有在运行调用任务的代码时才会进行导入tasks, 即什么时候用再什么时候导入

@api.route("/sms_codes/<re(r'1[^120]d{9}'):phone>")
def send_sms_code(phone):
    ......
    # 调用异步发送短信的任务
    from . import tasks
    result = tasks.send_sms_code.delay(sms_code, phone)
    ......

再次启动worker和flask应用, 点击发送短信验证码后, 就可以正常发送短信了

(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info                                               
 -------------- celery@alex v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.3.0-3-amd64-x86_64-with-Deepin-20-apricot 2020-08-26 13:12:13
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         ihome:0x7fa4b1b1c320
- ** ---------- .> transport:   redis://alex-gcx.com:6379/2
- ** ---------- .> results:     redis://alex-gcx.com:6379/2
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . ihome.api_1_0.tasks.send_sms_code

[2020-08-26 13:12:13,689: INFO/MainProcess] Connected to redis://alex-gcx.com:6379/2
[2020-08-26 13:12:13,732: INFO/MainProcess] mingle: searching for neighbors
[2020-08-26 13:12:14,876: INFO/MainProcess] mingle: all alone
[2020-08-26 13:12:15,149: INFO/MainProcess] celery@alex ready.
[2020-08-26 13:12:33,465: INFO/MainProcess] Received task: ihome.api_1_0.tasks.send_sms_code[4ce67842-48d7-4dd9-8af8-1704182f685c]                                     
.........

优化celery的目录结构-模块化

上面的celery已经可以正常工作了, 但是我们将celery对象定义在了manager.py文件中, 而当初我们规定manager.py只是作为启动文件用的, 不应该定义项目业务相关的细节, 所以定义在这里不太合适.

并且我们只定义了一个tasks.py文件, 如果项目比较大定义的任务比较多的话, 那都挤在一个任务文件中显然也是不合理的. 因此我们可以将celery单独定义成一个模块化的功能.

创建celery_tasks模块

ihome下创建一个新的python包, 名为celery_tasks, 用来标识celery的任务模块, 在该目录下新建一个main.py文件, 用来创建celery对象和管理任务以及worker的启动

# ihome/celery_tasks/main.py
from celery import Celery
from manager import app as flask_app

# 创建celery对象
celery = Celery(flask_app.name, broker=flask_app.config['CELERY_BROKER_URL'])
# 添加celery配置
celery.conf.update(flask_app.config)

注:

  1. 还是celery的配置项还是关联了flask的app对象, 当然也可以不必关联app对象, 直接另外定义一个celery的配置文件也是可以的

  2. 这里导入flask的app时, 不能使用flask的current_app, 因为前面说过了worker启动和flask启动没有关系, 因此启动worker时读取不到current_app

  3. 使用的是从manager中导入的app对象, 但是注意导入app后, 需要将app重命名一下, 如重命名为flask_app, 因为celery的对象默认的名字也叫app, 这样celery读取的时候会报错:'flask' object has no attribute 'user_options'

    (flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.celery_tasks.main worker -l=info
    Error: 
    Unable to load celery application.
    'flask' object has no attribute 'user_options'
    

    至于之前在manager.py中创建celery对象时也是使用的app并没有报错, 我猜想可能是因为manager.py中的app是新创建的变量, 而不是从某个地方import导入的, 而这里会报错是因为app是从外面导入过来的, 所以报错了吧.

创建发送短信任务模块

ihome/celery_tasks目录下再创建一个python包, 命名为send_sms_code, 用来标识发送短信的任务, 再在其目录下创建tasks.py文件, 里面编写具体的任务函数

from ihome.celery_tasks.main import celery
from ronglian_sms_sdk import SmsSDK
from ihome.utils import constants
from ihome.utils.response_codes import RET
from flask import current_app
import json

@celery.task
def send_sms_code(sms_code, phone):
    """发送短信任务"""
    result = {'errno': RET.OK}
    try:
        sdk = SmsSDK(constants.ACCID, constants.ACCTOKEN, constants.APPID)
        tid = '1'
        mobile = phone
        # 短信模板为:....您的验证码是{1},请于{2}分钟内正确输入
        # data参数为元组,第一个值为模板中的{1},第二个值为模板中的{2}
        data = (sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60)
        # 发送短信,接受返回值
        sms_resp_json = sdk.sendMessage(tid, mobile, data)
    except Exception as e:
        current_app.logger.error(e)
        result['errno'] = RET.THIRDERR
        result['errmsg'] = '发送短信异常'
    else:
        # 处理返回值
        sms_resp_dict = json.loads(sms_resp_json)
        sms_status = sms_resp_dict.get('statusCode')
        if sms_status not in ('000000', '112310'):
            # 发送失败
            result['errno'] = RET.THIRDERR
            result['errmsg'] = sms_resp_dict.get('statusMsg')
    return result

注:

  1. 把之前创建的ihome/tasks.py移过来就好了, 只是导入celery的路径改成从main中导入.

  2. 如果还有其他任务, 比如发送邮件等, 就可以同样在ihome/celery_tasks目录下创建send_mailpython包, 并创建它下面的tasks.py任务文件即可.

  3. 注意把之前调用任务的业务代码verify_codes.py的导入任务语句路径改成: from ihome.celery_tasks.send_sms_code import tasks

将短信任务添加到main中管理

回到ihome/celery_tasks/main.py中, 追加一句话, 可以将celery_tasks下定义的具体任务模块添加到main中

# 自动搜索任务
celery.autodiscover_tasks(['ihome.celery_tasks.send_sms_code'])

注:

  1. 添加的是一个列表, 也就是说可以把多个任务都添加到列表中
  2. 列表中的名字是项目根目录的绝对路径, 只需要写到具体模块名(send_sms_code)就可以了, 前提是模块名下的任务文件必须名为tasks.py, 否则需要写到具体的任务文件, 如'ihome.celery_tasks.send_sms_code.my_tasks'

最终celery文件结构

ihome/celery_tasks/
├── __init__.py
├── main.py
└── send_sms_code
    ├── __init__.py
    └── tasks.py

再次启动flask应用和worker测试是否可以正常使用.注意启动脚本到是main.py

(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.celery_tasks.main worker -l=info
原文地址:https://www.cnblogs.com/gcxblogs/p/13565993.html