flask利用celery和SQS实现异步任务(二):多队列

用celery的时候,通常都是把任务直接丢进broker里异步处理的。但也有另外一种实现方式:即可根据任务分类的不同分别使用不同的队列queue来处理。譬如说两种不同的任务,一个量多但不关键,一个量少但较为重要,如果放在同一个queue中,那么或多或少会带来相互之间的影响。所以应该将任务一放到queue1,而将另外的一个任务放在queue2中。

既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,就要提到celery的消息路由机制,AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去。

celery 多队列示意图:

还是以上篇文章 “flask利用celery和SQS实现异步任务(一):可集群” 的代码为例。

一、 flask celery 客户端代码:

1.1  在 Flask 的配置文件中加入以下代码:

class Config:
    ...
    # CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"
    # CELERY_BROKER_URL = "redis://127.0.0.1:6379/1"
    # BROKER_TRANSPORT_OPTIONS = {'region': 'cn-northwest-1'}
    
    CELERY_QUEUES = (    # 设置add队列,绑定routing_key
        Queue("celery_sms", Exchange("celery_sms"), routing_key="celery_sms"),    # queue 的名称可自己定义
        Queue("celery_upload2s3_images", Exchange("celery_upload2s3_images"), routing_key="celery_upload2s3_images"),
        Queue("celery_upload2s3_videos", Exchange("celery_upload2s3_videos"), routing_key="celery_upload2s3_videos")
    )

    CELERY_ROUTES = {    # 特定的任务(key)进入指定的queue
        'tasks.asyn_tasks.async_send_single_msg': {"queue": "celery_sms", "routing_key": "celery_sms"},        # key 为任务的路径;queue 和 routing_key 要和上面的CELERY_QUEUES中配置的一致
        'tasks.asyn_tasks.add_test': {"queue": "celery_upload2s3_images", "routing_key": "celery_upload2s3_images"},
        'tasks.asyn_tasks.multi_test': {"queue": "celery_upload2s3_videos", "routing_key": "celery_upload2s3_videos"},
    }
    
    """
    CELERY_QUEUES设置一个指定routing_key的队列queue,这个名字可以任意指定;
    CELERY_ROUTES设置路由,对指定的任务名,指定对应的队列和routing_key,注意,这里的routing_key需要和上面参数的一致。
    """

1.2  tasks的 asyn_tasks.py 中要有相应的任务,如下:

from __future__ import absolute_import
import json
import requests
from tasks.celery import celery
import time


@celery.task
def async_send_single_msg(obj_dict, mobile, text):  # code :验证码; mobile:手机号

    time.sleep(60)
    params = {
        "apikey": obj_dict.get("api_key"),
        "mobile": mobile,
        "text": text
        # "text"的设置决定了发送成功与否; "text" 的内容应该和 模板中的内容一致
    }
    print("sms param--->", params)
    response = requests.post(obj_dict.get("send_single_url"), data=params)
    res_dict = json.loads(response.text)  # 对 response.text 序列化
    return res_dict


@celery.task
def add_test(x, y):
    time.sleep(40)
    return x + y


@celery.task
def multi_test(x, y):
    time.sleep(50)
    return x * y

二、celery worker 中的代码变化

2.1  在 celeryconfig.py 中加入以下代码:

CELERY_QUEUES = (
    Queue("celery_sms", Exchange("celery_sms"), routing_key="celery_sms"),
    Queue("celery_upload2s3_images", Exchange("celery_upload2s3_images"), routing_key="celery_upload2s3_images"),
    Queue("celery_upload2s3_videos", Exchange("celery_upload2s3_videos"), routing_key="celery_upload2s3_videos")
)

CELERY_ROUTES = {
    'tasks.asyn_tasks.async_send_single_msg': {"queue": "celery_sms", "routing_key": "celery_sms"},
    'tasks.asyn_tasks.add_test': {"queue": "celery_upload2s3_images", "routing_key": "celery_upload2s3_images"},
    'tasks.asyn_tasks.multi_test': {"queue": "celery_upload2s3_videos", "routing_key": "celery_upload2s3_videos"}
}

2.2 在 create_celery.py 中更新 celery 配置

from __future__ import absolute_import
from celery import Celery
from tasks import celeryconfig


def make_celery():
    celery = Celery(
        'tasks',
        backend=celeryconfig.CELERY_RESULT_BACKEND,
        broker=celeryconfig.CELERY_BROKER_URL,
        include=['tasks.asyn_tasks']
    )
    celery.conf.update(
        broker_transport_options=celeryconfig.BROKER_TRANSPORT_OPTIONS,
        task_queues=celeryconfig.CELERY_QUEUES,
        task_routes=celeryconfig.CELERY_ROUTES
    )

    return celery

2.3 在 asyn_tasks.py 中也要有相应的任务

三、 部署

步骤二中的代码可根据自己的需要部署在多台服务器上。

3.1   哪个 worker 去消费哪个queue中的任务,是由启动 celery worker 命令时的参考 -Q 决定的,如下:

启动一个 celery worker专门消费 “celery_upload2s3_videos”这个 queue 中的任务:

celery -A tasks.celery.celery worker -Q celery_upload2s3_videos --loglevel=INFO

启动一个 celery worker专门消费 “celery_upload2s3_images”这个 queue 中的任务:

celery -A tasks.celery.celery worker -Q celery_upload2s3_images  --loglevel=INFO

3.2  如果在同一台主机上启动多个worker,则需要加上 -n 参考,如下:

celery -A tasks.celery.celery worker -Q celery_upload2s3_images -n workerAdd@%h --loglevel=INFO

celery -A tasks.celery.celery worker -Q celery_upload2s3_videos -n workerMulti@%h --loglevel=INFO

-n 参考参考链接:

https://docs.celeryproject.org/en/latest/userguide/workers.html

3.3  如果想让某个 celery worker 能够消费所有queue中的任务, 则启动该 worker 时不指定 -Q 即可。如下:

celery -A tasks.celery.celery worker --loglevel=INFO

参考链接:

https://blog.csdn.net/weixin_39318540/article/details/80473021

https://blog.csdn.net/tmpbook/article/details/52245716

https://my.oschina.net/u/4313588/blog/3511336

https://niubidian.top/blog/show/40/

celery 常用命令:

https://blog.csdn.net/weixin_44649870/article/details/105844668

原文地址:https://www.cnblogs.com/neozheng/p/13756040.html