Celery--分布式任务队列:

任务队列用作一种在线程或计算机之间分配工作的机制。

Celery用Python编写,但是该协议可以用任何语言实现。

只要涉及到第三方服务(发短信,发邮件)的时候,建议用异步队列来实现。

例如有一个四段服务是串行的,第一、二段和第四段是服务器本身的,而第三段是第三方任务,在程序运行时,第三方任务的耗时非常慢,串行的代码实现第三段服务了消耗的时间很长,造成用户体验差。这时候我们就可以使用异步来处理代码。异步就是把一段程序中的某个模块放到另一个进程执行的,在这里我们可以让第三段任务异步处理,让整个程序并行执行(单独把第三段拉出去执行),这样一来服务器本身只执行第一、二、四段。就算第三方任务就算执行时间很长,也不会影响整个代码流程了。

Celery及异步任务的处理:

Celery有两个功能,一个是处理异步任务,一个是处理定时任务。

Celery处理任务的流程有三部分,分别是

  • 消息中间件(Broker) -- 消息中间件相当于一个队列,用来存放任务
  • 任务执行单元(Celery Worker) -- 负责监听消息中间件
  • 结果存储(Backend) -- 存储任务处理结果

任务模块(Task):包含异步任务和定时任务,其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由Celery Beat 进程周期性地将任务发往任务队列。

消息中间件(Broker):负责任务队列调度,接收任务生产者发来的任务,将任务存入队列,Celery本身不提供队列服务,官方推荐使用Redis和RabbitMQ等。

任务执行单元(Worker): Worker是执行任务的处理单元,它实施监控消息队列,获取队列中调度的任务,并执行它。

任务结果存储(Backend): Backend用于存储任务的执行结果,以供查询,同消息中间件一样,存储也可以使用Redis、RabbitMQ和MongoDB等。

Django中使用Celery的工作流程:

  1. Django把耗时任务封装成一个方法
  2. 然后把耗时任务丢到队列(消息中间件)里面,
  3. Celery Worker监听到队列中有任务,就创建Worker,有多少个耗时任务就创建多少个Worker(可以是进程或者线程)
  4. 工人操作完任务以后就把结果放到存储中。

Celery在Django中使用官方文档连接:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#using-celery-with-django

配置celery:

  • Django == 1.11.11
  • celery == 4.4.0

新建一个worker文件,创建__init__文件

__init__.py > 除了目录名,其他都是固定写法

import os

from celery import Celery

from worker import config

# 加载django的环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "主目录名.settings")

# 实例化celery
celery_app = Celery('swiper(主目录名)')

# 加载配置文件
celery_app.config_from_object(config)

# 自动注册任务
celery_app.autodiscover_tasks()

worker > 新建config.py , 用来指定消息中间件使用什么队列,这里使用Redis。

broker_url = 'redis://127.0.0.1:6379/0'

使用celery:封装短信验证码到func.py文件中

在公共目录common下,新建func.py文件 >

import random

from libs.yuntongxun.sms import CCP
from worker import celery_app
from django_redis import get_redis_connection


# 把django中第三方或者耗时的方法变成celery中的任务
@celery_app.task
def send_sms(phone):
    ccp = CCP()

    sms_code = create_sms_code(4)

    # 把验证码存储redis的缓存中
    redis_cli = get_redis_connection()
    redis_cli.set(f'smscode-{phone}', sms_code, 18000)

    ccp.send_template_sms(phone, [sms_code, 3], 1)

def create_sms_code(num):

    start = 10 ** (num - 1)
    end = 10 ** num - 1

    return random.randint(start, end)

把celery任务放到队列中

# 把执行发短信的任务放入队列
# delay接收的参数就是被celery封装的参数,
# data.get("phone")获取用户输入的手机号
send_sms.delay(data.get("phone"))

启动celery消息队列:(Windows对多进程不是很友好,所以在Windows中使用单进程)

参数:

  • -A 指定文件名 -l 自制输出格式 --pool=solo(单继承执行,Linux系统不需要)
celery worker -A worker -l info --pool=solo

异步任务已开启!!!worker开始监听

原文地址:https://www.cnblogs.com/lance-lzj/p/13961682.html