celery-1

1、什么是celery

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

专注于实时处理的异步任务队列

同时也支持任务调度


celery架构:

image


Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。


消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等


任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。


任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等


版本支持情况

Celery version 4.0 runs on        
    Python ❨2.7, 3.4, 3.5❩        
    PyPy ❨5.4, 5.5❩    
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.​    

If you’re running an older version of Python, you need to be running an older version of Celery:​                
    Python 2.6: Celery series 3.1 or earlier.        
    Python 2.5: Celery series 3.0 or earlier.        
    Python 2.4 was Celery series 2.2 or earlier.​    

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

对windows系统的支持不太友好;


2、使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计


3、celery的安装配置

pip3 install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名',backend='xxx',broker='xxx')


4、celery的基本使用

创建celery_task_s1.py

from celery import Celery
import time

# redis不加密码
broker='redis://127.0.0.1:6379/0'
backend='redis://127.0.0.1:6379/1'

# redis加密码
# broker='redis://:123456@127.0.0.1:6379/0'
# backend='redis://:123456@127.0.0.1:6379/1'

# 一定要指定一个名字,这里的名字是test
app=Celery('test',backend=backend,broker=broker)

# 任务其实就是一个函数
# 需要用一个装饰器,表示该任务是被celery管理的,并可以被其执行
@app.task    # @app名字.task
def add(x,y):
    time.sleep(2)
    return x+y


创建用于提交任务的py文件add_task.py:

# 用于提交任务的py文件
import celery_task_s1

# 正常情况下的同步执行任务
# ret = celery_task_s1.add(3, 4)
# print(ret)

# 提交任务到消息队列中
# 只是把任务提交到消息队列中(redis),并没有执行
ret = celery_task_s1.add.delay(3, 4)
print(ret)   # 0c5b8d12-dbf6-46ab-a425-875a98d2b5a5 任务号id


任务提交完成后,需要启动worker,可以用命令启动:celery worker -A celery_task_s1 -l info

但是windows中需要用的命令是:celery worker -A celery_task_s1 -l info -P eventlet

eventlet是一个模块,需要安装: pip3 install eventlet

#参数解释
-A celery_task_s1   #要启动任务的文件名
-l info             #日志级别,可以指定


启动:

可以在pycharm的命令行中启动,也可以在cmd中启动;

image

任务已经执行了,并且结果已经保存进了redis

image


可以创建py文件:celery_result.py,运行这个py文件,可以查看任务执行结果(这里的结果是7):

from celery.result import AsyncResult
from celery_task_s1 import app

async = AsyncResult(id="0c5b8d12-dbf6-46ab-a425-875a98d2b5a5", app=app)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')


5、celery多任务结构

结构:

pro_cel
    ├── celery_task   # celery相关文件夹,名字可以变
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── user_task.py    #所有任务函数
    │    └── order_tasks.py    #所有任务函数
    ├── celery_result.py  # 检查结果
    └── add_task.py    # 提交任务


案例:

image


celery.py

from celery import Celery

backend='redis://127.0.0.1:6379/0'
broker='redis://127.0.0.1:6379/1'

app=Celery('test',broker=broker, backend=backend,
           # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
           include=['celery_task.order_task',
                    'celery_task.user_task'
           ])

# 时区
# app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
# app.conf.enable_utc = False


user_task.py

import time
from celery_task.celery import app

@app.task
def user_add(x, y):
    time.sleep(1)
    return x + y


order_task.py

import time
from celery_task.celery import app

@app.task
def order_add(x, y):
    time.sleep(1)
    return x + y


add_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add

order_add.delay(5, 6)
user_add.delay(10, 6)


celery_result.py

from celery.result import AsyncResult
from celery_task.celery import app

# id需要变换
async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=app)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')


启动:

可以先启动work,当没有任务时,它就会等着,有任务时就会执行;

在多任务结构中启动work时,直接指定包名即可:

celery worker -A celery_task -l info -P eventlet


启动完work,可以再去启动broker(add_task.py)

此时work发现有任务提交时,直接就会执行,并得到结果;

原文地址:https://www.cnblogs.com/weiyiming007/p/12543556.html