celery-3

结合django的定时任务  

应用场景:

  1、异步执行,如耗时任务;

  2、定时任务

broker:redis

执行任务:worker

celery_pro目录:

  celeryf.py  __init__.py  config.py  tasks1.py  tasks2.py

celeryf.py

from __future__ import absolute_import,unicode_literals
from celery import Celery

app = Celery(
    main='mycelery',    #主模块名
    broker='redis://:lyb@localhost:6379/0',     #消息代理
    backend='redis://:lyb@localhost:6379/0',    #结果存储
    include=['celery_pro.tasks1','celery_pro.tasks2'],     #定义worker的文件,不含.py
)

app.config_from_object('celery_pro.config')    #读取配置文件,可以是导入的模块,也可以是点分隔的文件字符串表示

if __name__ == '__main__':
    app.start()

__init__.py  #空

config.py  #http://docs.celeryproject.org/en/latest/userguide/configuration.html

import os
from datetime import timedelta
from celery.schedules import crontab

BASE_DIR = os.path.abspath('')
# PATH_TEST = 'path_test'

CELERY_ENABLE_UTC = False  # 不是用UTC
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务结果的时效时间
# CELERYD_LOG_FILE = BASE_DIR + "/log/celery/celery.log"  # log路径
# CELERYBEAT_LOG_FILE = BASE_DIR + "/log/celery/beat.log"  # beat log路径
# CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']  # 允许接受的格式 一般采用msgpack(快)和json(跨平台)
# CELERY_RESULT_SERIALIZER = 'json'  # 读取任务结果格式
# CELERY_TASK_SERIALIZER = 'msgpack'  # 任务序列化反序列化采用msgpack

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'celery_pro.tasks2.add_2',
        'schedule': 30.0,
        'args': (16, 16)
    },
    'add-every-10-seconds': {
        'task': 'celery_pro.tasks1.test',
        'schedule': 10.0,
        'args': ['nihao']
    },
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'celery_pro.tasks2.add_2',
        'schedule': crontab(day_of_week='monday'),
        'args': (16, 16),
    },
}

tasks1.py

from __future__ import absolute_import,unicode_literals
from celery.schedules import crontab
from .celeryf import app

#1、第一种形式
# @app.on_after_configure.connect
# def setup_periodic_tasks(sender, **kwargs):
#     # Calls test('hello') every 10 seconds.
#     sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
#
#     # Calls test('world') every 30 seconds
#     sender.add_periodic_task(30.0, test.s('world'), expires=10)
#
#     # Executes every Monday morning at 7:30 a.m.
#     sender.add_periodic_task(
#         crontab(hour=7, minute=30, day_of_week=1),
#         test.s('Happy Mondays!'),
#     )

#2、第二种形式
# app.conf.beat_schedule = {
    # 'add-every-30-seconds': {
    #     'task': 'celery_pro.tasks2.add_2',
    #     'schedule': 30.0,
    #     'args': (16, 16)
    # },
    # 'add-every-10-seconds': {
    #     'task': 'celery_pro.tasks1.test',
    #     'schedule': 10.0,
    #     'args': ['nihao']
    # },
    # # Executes every Monday morning at 7:30 a.m.
    # 'add-every-monday-morning': {
    #     'task': 'celery_pro.tasks2.add_2',
    #     'schedule': crontab(),
    #     'args': (16, 16),
    # },
# }


@app.task
def test(arg):
    print(arg)

tasks2.py

from __future__ import unicode_literals,absolute_import
from .celery import app
import time

@app.task
def add_2(x,y):
    print('x + y :')
    return x+y

@app.task
def mul(x,y):
    time.sleep(20)
    return time.time()

启动worker:

(venv) l@l:~/myfile/pycharm/py3$ python celery_pro/celeryf.py worker -l debug

启动celery beat即调度器:

celery -A celery_pro.tasks1 beat -l debug
渐变 --> 突变
原文地址:https://www.cnblogs.com/lybpy/p/8698904.html