celery使用

1.celery简介

celery 是一个基于python开发的模块,可以帮助我们对任务进行分发和处理

 

 

2.环境的搭建

1. pip install celery == 4.4   // 这里使用4.4版本
2. 安装broker:redis 或 rabbitMQ
pip3 install redis 或 pika(操作rabbitMQ)

3.快速使用

样例代码

项目中创建一个celery_exmple文件夹,目录下创建以下py文件:

  • s1.py 相当于django应用中的tasks.py

    from celery import Celery
    ​
    app = Celery('tasks',broker='redis://192.168.10.22:6379',
        backend='redis://192.168.10.22:6379')
    ​
    @app.task
    def x1(x,y):
        return x + y
    ​
    @app.task
    def x2(x,y):
        return x - y
  • s2.py

    from s1 import x1
    ​
    result = x1.delay(4,4)
    print(result.id)   # 加到broker中的任务的任务id,可以根据任务id查询执行状态
  • s3.py

    from celery.result import AsyncResult
    from s1 import app
    ​
    result_object = AsyncResult(id='f0b4e6612-22cf-d5f5-d5f15684',app=app)
    print(result_object.status)  # 根据任务id查看任务的状态
    result_object.get()  # 获取任务执行返回的结果(函数的返回值)
    result_object.forget() # 在backend中移除该任务结果

运行程序:

1.启动redis

2.启动worker

# 进入当前目录celery_exmple:
celery worker -A s1 -l info  # 该命令还会报错 看下面
参数:
    worker 启动worker
    -A 指定创建celeryapp实例的文件
    -l info 打印日志到屏幕  # 线上不需要加该参数,一般写到日志文件

会报一个错误:valueError:not enough values to unpack(expected 3, get 0)

需要安装 eventlet:

pip install eventlet

再启动worker

celery worker -A s1 -l info -P eventlet

3.创建任务

程序调用,往队列添加任务

# 每次执行都会往broker添加一个任务,这里一般为web程序接口中的函数
python s2.py

4.查看任务状态

# 填写任务id
python s3.py

 

4. 在django中使用celery

1.使用django-celery模块

官方提供了更为方便使用celery的模块 django-celery

pip install django-celery

 

2.直接使用celery模块

项目目录结构:

1.在 settings文件中配置celery

CELERY_BROKER_URL = 'redis://192.168.10.22:6379'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://192.168.10.22:6379'
CELERY_TASK_SERIALIZER = 'json'

2.在settings所在目录(跟项目名同名那个)下创建 celery.py 文件

import os
from celery import Celery
​
os.environ.setdefault('DJANGO_SETTINGS_MODULE','demos.settings')
​
app = Celery('demos')  # 放入项目名
# 指定读取celery配置的文件
app.config_from_objects('django.conf:settings',namespace="CELERY")
​
# 自动去每个已注册app中读取tasks.py文件
celery_app.autodiscover_tasks() 

3.在settings所在目录(跟项目名同名那个)下的__init__ 文件,加入以下代码:

# 在django中加载celery配置
from .celery import app as celery_app
​
__all__ = ('celery_app',)

4.在app目录下创建tasks.py 文件

from celery import shared_task
​
@shared_task
def x1(x,y):
    return x + y
​
@shared_task
def x2(x,y):
    return x - y

5.启动worker

celery worker -A s1 -l info -P eventlet

6.启动django程序

 

3.执行定时任务

  1. 之前的任务会立即执行

    result = fun.x1.delay(2,2)
  2. 定时执行

    # 获取本地时间
    ctime = datetime.datetime.now()
    # 本地时间转换成utc时间
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    # 10秒后执行
    target_time = utc_ctime + datetime.timedelta(seconds=10)
    result = x1.apply_async(args=[2,2],eta=target_time)

4. 执行周期性定时任务

可以自己指定每天,每月。。到哪个时间点后自动执行

具体参考官方文档 https://docs.celeryproject.org/en/latest

 

主要API

result = x1.delay()

result.id    # 任务的id
result.status  # 根据任务id查看任务的状态
result.get()  # 获取任务执行返回的结果(函数的返回值),需先判断状态是否为success
result.forget() # 在backend中移除该任务结果
result.revoke()  # 根据任务id取消任务执行(只能取消未开始的,在执行中不能取消)
result.revoke(terminate=True)  # 强制取消;不推荐

一般使用示例:

# if result.status == 'SUCCESS':
if result.successful():
    result.get()
    result.forget()
# elif result.status == 'FAIL':
if result.failed():
    pass
else:   # 其他状态 'REVOKE'...
    pass
 

其他问题

celery实例.taskshare_task装饰任务函数的区别

  • 作用效果都一样,前者需要导入celery对象

  • shark_task:不依赖celery对象,加载内存之后自动关联celery对象

  • share_task:可与多个celery对象进行关联 (一般很少使用多个celery对象)

 

参考的相关文档:

具体参考官方文档 https://docs.celeryproject.org/en/latest 好难打开,速度好慢

别人翻译的文档:https://www.jianshu.com/p/a556cac5bf7d

https://blog.csdn.net/weixin_40475396/article/details/80439781

原文地址:https://www.cnblogs.com/Deaseyy/p/13466233.html