Cleary基础

一、Celery介绍

1、Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时,也支持任务调度

注意:1、Celery目前不支持widows,用在widows可能或报错

            2、celery服务可以不依赖任何服务器,通过自身命令启动服务

            3、celery服务为为其他项目服务提供异步解决任务需求的

    4、会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

 二、Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker) 和任务执行结果存储(task result store) 组成

 1、消息中间件:

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

2、任务执行单元:

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

3、任务结果存储:

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

4、应用场景:

异步执行:解决耗时任务,比如发送短信、邮件、音频处理等

延迟执行:解决延迟任务

定时执行:解决周期任务,比如每天数据统计

三、Celery的使用

1、pip install celery

2、消息中间件:RabbitMQ/Redis

broker='redis://127.0.0.1:6379/3' #任务中间件
#有密码
backend='redis://:12345@127.0.0.1:6379/4' #任务结果仓库
#无密码
# backend='redis://127.0.0.1:6379/1'

3、创建app:  app=Celery('任务名',broker=broker,backend=backend)

4、启动celery服务:

# 非windows
 命令:celery worker -A 执行文件名 -l info
# windows:
 pip3 install eventlet
 celery worker -A 执行文件名 -l info -P eventlet

注意:celery5.0后,windows上该命令会报错,执行的时候,需要切换到selery项目的目录下才可执行

四、Celery执行异步、延迟、定时任务

基本结构

手动提交任务,手动获取结果

worker对应的文件 celery_task.py

from celery import Celery

broker='redis://:12345@127.0.0.1:6379/3'
backend='redis://:12345@127.0.0.1:6379/4'

app=Celery(__name__,broker=broker,backend=backend)

@app.task
def add(x,y):
    return x+y

添加任务的py文件

from celery_task import add

add.delay(3,4)

执行命令celery worker -A celery_task -l info -P eventlet

多任务结构(包架构封装)

使用方法:先在project目录下,执行该命令:

celery worker -A celery_task(包名) -l info -P eventlet  开启celery服务

再右键执行add_task.py(添加任务的py文件)

project
    ├── celery_task      # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py      # 添加任务
    └── get_result.py   # 获取结果

clery.py文件:

# -*-coding:utf-8 -*-

from celery import Celery

broker='redis://:12345@127.0.0.1:6379/3'
backend='redis://:12345@127.0.0.1:6379/4'

app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.tasks',])

tasks.py文件:

from .celery import app

@app.task
def add(n,m):   
    return n+m

@app.task
def mulp(n,m):
    return n*m

add_task.py文件:

# -*-coding:utf-8 -*-
from celery_task.tasks import add,mulp
from datetime import datetime,timedelta
print('执行任务了')
print('add',add,'mulp',mulp)
#添加异步任务
ret1=add.delay(10,20)
ret2=mulp.delay(2,5)
'''
添加延迟任务
#需要utc时间
utc时间:datetime.utcnow()
延迟10秒:seconds=10
时间类型:timedelta()
eta:是datetime类型
''' eta=datetime.utcnow()+timedelta(seconds=10) # 延迟任务使用的是 apply_async ret3=add.apply_async(args=(29,11),eta=eta)

get_result.py文件:(换上想要获取结果的任务id号,右键运行该文件就可以)

# -*-coding:utf-8 -*-
from celery_task.celery import app

from celery.result import AsyncResult
#任务id号
id = 'e3e462a3-08bb-4e1b-9b3d-51f4554de118'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

定时任务

跟异步任务不同之处:

1、celery.py文件中定时任务的配置,

2、不用手动提交任务,而是借助beat来定时提交任务。

# -*-coding:utf-8 -*-
from datetime import timedelta
from celery.schedules import crontab
from celery import Celery

broker='redis://:12345@127.0.0.1:6379/3'
backend='redis://:12345@127.0.0.1:6379/4'

app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.tasks',])

# 定时执行任务的配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用utc (跟时区配置时配套使用的)
app.conf.enable_utf=False

# 任务的定时配置
app.conf.beat_schedule={
    #自定义方法名:add-task
    'add-task':{
        'task':'celery_task.tasks.add',
        #定时时间
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (100,22),
    }
}

执行过程:先开启celery服务,再开启beat

beat服务启动命令:celery beat -A celery_task -l info

五、Django中使用celery

'''
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务

重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
'''
 
原文地址:https://www.cnblogs.com/nq31/p/14095899.html