celery 定时任务

1 基本概念

使用 Celery 实现定时任务的步骤:

(1) 创建一个 Celery 实例

(2) 配置文件中配置任务 ,发布任务 celery A xxx beat

(3) 启动 Celery Worker

(4) 存储结果

使用 Celery 实现异步任务的步骤:

(1) 创建一个 Celery 实例

(2) 启动 Celery Worker ,通过delay() 或 apply_async()(delay 方法封装了 apply_async, apply_async支持更多的参数 ) 将任务发布到broker

(3) 应用程序调用异步任务

(4)存储结果 (发布的任务需要return才会有结果,否则为空)

2 celery定时任务简单使用

目录结构如下

celeryconfig.py

from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
from celery.schedules import crontab
from datetime import timedelta


broker_url = "redis://127.0.0.1:6379/2"   # 使用redis存储任务队列
result_backend = "redis://127.0.0.1:6379/6"  # 使用redis存储结果

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = "Asia/Shanghai"  # 时区设置
worker_hijack_root_logger = False  # celery默认开启自己的日志,可关闭自定义日志,不关闭自定义日志输出为空
result_expires = 60 * 60 * 24  # 存储结果过期时间(默认1天)

# 导入任务所在文件
imports = [
    "celery_task.epp_scripts.test1",  # 导入py文件
    "celery_task.epp_scripts.test2",
]


# 需要执行任务的配置
beat_schedule = {
    "test1": {
        "task": "celery_task.epp_scripts.test1.celery_run",  #执行的函数
        "schedule": crontab(minute="*/1"),   # every minute 每分钟执行
        "args": ()  # # 任务函数参数
    },

    "test3": {
        "task": "celery_task.epp_scripts.test1.celery_run",  # 执行的函数
        "schedule": timedelta(seconds=10),  # 每10s执行一次
        "args": ()  # # 任务函数参数
    },

    "test2": {
        "task": "celery_task.epp_scripts.test2.celery_run",
        "schedule": crontab(minute=0, hour="*/1"),   # every minute 每小时执行
        "args": ()
    },

}

# "schedule": crontab()与crontab的语法基本一致
# "schedule": crontab(minute="*/10",  # 每十分钟执行
# "schedule": crontab(minute="*/1"),   # 每分钟执行
# "schedule": crontab(minute=0, hour="*/1"),    # 每小时执行

celery.py

# coding:utf-8
from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
from celery import Celery

# 创建celery应用对象
app = Celery("celery_demo")

# 导入celery的配置信息
app.config_from_object("celery_task.celeryconfig")

tes1.py

# coding:utf-8
from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
from celery import Celery

# 创建celery应用对象
app = Celery("celery_demo")

# 导入celery的配置信息
app.config_from_object("celery_task.celeryconfig")

test2.py

from celery_task.celery import app

def test33():
    print("test33----------------")
    # print("------"*50)

def test44():
    print("test44--------------")
    # print("------" * 50)
    test33()

@app.task
def celery_run():
    test33()
    test44()

3 启动

先在一个终端: celery -A celery_task beat   发布任务,在celery_task 同级目录下执行

再在另一个终端:celery -A pinduoduo worker -l info -P eventlet  开启worker 在celery_task 同级目录下执行 

需要注意,上面的参数 -P eventlet 主要是为了解决在win10 下报  Celery ValueError: not enough values to unpack (expected 3, got 0) 的错误,需要安装eventlet先

pip install eventlet -i https://pypi.douban.com/simple/

参考:https://blog.csdn.net/Shyllin/article/details/80940643

参考:https://blog.csdn.net/qq_30242609/article/details/79047660

原文地址:https://www.cnblogs.com/jec1999/p/10663068.html