Python

Celery - 概念

简单的灵活可靠的处理大量消息的分布式系统

专注于实时处理的异步任务队列, 同时也支持任务调度

结构图

使用场景

异步任务  将耗时的操作任务提交给 Celery 去异步执行 - 比如发送短信 / 邮件, 消息推送, 音视频处理等

定时任务  类似于 crontab, 比如每日的数据统计

消息中间件

可选  [ RabbitMQ / Redis ] 

Redis 的安装 跳转这里 

Celery - 使用

安装

pip install celery[redis]

基本使用 - 异步任务

基础版本

先创建一个简单的耗时阻塞的任务, 很显然会在中间卡顿 4s 等待

# -*- coding: utf-8 -*-
import time


def add(x, y):
    print "in tasks ---"
    time.sleep(4)
    return x + y


if __name__ == '__main__':
    print 'start ---'
    ret = add(2, 8)
    print "end ---"
    print ret
start ---
in tasks ---
end ---
10

加入 celery 的优化版本

 将原始的 的 add 方法重新优化一下变成 Celery 的版本

tasks.py

# -*- coding: utf-8 -*-
import time
from celery import Celery

broker = 'redis://localhost:6379/1'
backend = 'redis://localhost:6379/2'

app = Celery('my_task', broker=broker, backend=backend)


@app.task
def add(x, y):
    print "in tasks ---"
    time.sleep(4)
    return x + y

 app.py

# -*- coding: utf-8 -*-

from tasks import add

if __name__ == '__main__':
    print 'start ---'
    ret = add.delay(2, 8)
    print "end ---"
    print ret

 再运行的时候便没有卡顿, 而是以一个任务标识的形式进行返回

start ---
end ---
a3e85efa-859f-45de-af27-e582ff54ddef

此时任务将被丢入 redis 中等待调度, 需要打开 celery 的 worker 进行处理

Celery - worker 

在次目录下进行

celery worker  -A tasks -l INFO

-l 表示日志打印等级

-A 指定执行的函数文件

此时若因版本过高会触发系列报错, 详情 跳转这里 

成功后会打印配置信息, 比如此处设置的 redis 以及 被执行的 add 都可以显示出

简单测试 

打开 worker 之后就可以进行执行调用了

当前目录再次打开一个解释器进行简单操作

 在 worker 这边的日志就可以打印出来了

 

 相关的执行结果和执行状态也可以通过一些方法进行获取到

流程梳理

app.py 中进行 add 的函数调用,调用直接返回任务标识

真正的任务在 worker 中异步执行, 然后再 app.py 中的后续代码将不收到阻塞的影响

通过  .ready .get  方法可以获取执行状态以及相关的结果

Celery 目录结构

较为标准的目录格式是需要代码和配置文件分离更加直观

 

 __init__.py 

初始化文件中进行初始化的模型创建以及配置文件的导入

# -*- coding: utf-8 -*-
from celery import Celery

app = Celery('demo')

app.config_from_object('celery_app.celeryconfig')  # 通过 celery 实例加载配置模块

celeryconfig.py

配置文件中进行配置文件的相关操作, 这里主要是配置 Redis 的相关配置以及简单的时区更改展示

Redis 依旧是存放任务的, 以及存放结果需要两个配置配置

# -*- coding: utf-8 -*-

BROKER_URL = 'redis://localhost:6379/1'

CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

CELERY_TIMEZONE = 'Asia/Shanghai'  # 默认 UTC 时间

# 导入指定的任务模块
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2',
)

task1.py / task2.py

task 则用于实际逻辑的函数编写即可

# -*- coding: utf-8 -*-
import time
from celery_app import app


@app.task
def multiply(x, y):
    time.sleep(3)
    return x * y
# -*- coding: utf-8 -*-
import time
from celery_app import app


@app.task
def add(x, y):
    time.sleep(3)
    return x + y

app.py

app 用于外部的调用

# -*- coding: utf-8 -*-
from celery_app import task1, task2

if __name__ == '__main__':
    task1.add.apply_async(5, 10)
    task2.multiply.delay(5, 10)
    print 'end  -----'

 基础使用 - 定时任务

原文地址:https://www.cnblogs.com/shijieli/p/12011211.html