Celery的简单使用介绍笔记。

Celery的相关模块

  • 任务模块 Task

    包含异步任务和定时任务. 其中, 异步任务通常在业务逻辑中被触发并发往任务队列, 而定时任务由 Celery Beat 进程周期性地将任务发往任务队列.

  • 消息中间件 Broker(中间人)

    Broker, 即为任务调度队列, 接收任务生产者发来的消息(即任务), 将任务存入队列. Celery 本身不提供队列服务, 官方推荐使用 RabbitMQ 和 Redis 等.

  • 任务执行单元 Worker

    Worker 是执行任务的处理单元, 它实时监控消息队列, 获取队列中调度的任务, 并执行它.

  • 任务结果存储 Backend

    Backend 用于存储任务的执行结果, 以供查询. 同消息中间件一样, 存储也可使用 RabbitMQ, Redis 和 MongoDB 等

我在使用中,使用在Django的上传文件到oss服务器中,由于改操作不影响用户后续操作,如果让用户等带来不好的用户体验,可以用Celery进行异步操作。

Celery其实是一个独立的进程,开辟了一条独立的进程用于接受任务模块提交过来的任务。

上一个项目示意图

首相需要安装运行环境,

pip install 'celery[redis]'  #此安装模式告知,redis依赖安装在celery下

开始必须运行本地redis

先上一个最简单的Celery代码。

import time
from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/0'
app = Celery('my_task', broker=broker, backend=backend)

@app.task
def add(x, y):
    time.sleep(5)     # 模拟耗时操作
    return x + y

将文件保存为task.py

在Python环境下运行命令

celery worker -A task --loglevel=info  执行Celery

task是文件的文件名,--loglevel=info为显示的日志信息。

在环境中运行add

In [24]: add                                                                    
Out[24]: <@task: task.add of my_task at 0x1083e7250>

 add已经被装饰过,实际已经成为了一个task任务

如果直接执行add,其实跟运行普通函数没什么区别。

应该执行add.delay(2, 3),这样的话,才回执行Celery任务。

可以用变量接收ret = add.delay(2.4)

等待ret运行的任务执行出结果以后,ret.result可以返回运行结果。

Djando Celery实战,

首先新建一个独立的文件夹worker

里面新建__init__.py与config.py

在__init__文件里面实例化Celery,在config.py里面填写相关设置信息。

import os

from celery import Celery
from worker import config
# 加载Django 的 setting
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "swiper.settings")

celery_app = Celery('worker')            # 实例话一个celery
celery_app.config_from_object(config)      # 加载相关配置 
celery_app.autodiscover_tasks()          #自动监听任务

config.py

broker_url = 'redis://127.0.0.1:6379/0'
broker_pool_limit = 10  # Borker 连接池, 默认是10

timezone = 'Asia/Shanghai'
accept_content = ['pickle', 'json']

task_serializer = 'pickle'
result_expires = 3600  # 任务过期时间

result_backend = 'redis://127.0.0.1:6379/0'
result_serializer = 'pickle'
result_cache_max = 10000  # 任务结果最大缓存数量

worker_redirect_stdouts_level = 'INFO' #日志级别

在需要异步操作的函数上添加@celery_app.task

 启动Celery

celery worker -A worker --loglevel=info

执行具体的接口函数:xxx.delay()

后续有待结果的疑惑,如何进行多worker设置,还有就是result的取出问题,如果异步操作直接取出将是空,很多时候该操作还没返回值,worker还在运行中。

有空研究一下,redis里面的result_backend里面的相关内容信息。

原文地址:https://www.cnblogs.com/sidianok/p/11604069.html