3.Periodic Tasks

 celery beat是一个调度器,它可以周期内指定某个worker来执行某个任务。如果我们想周期执行某个任务需要增加beat_schedule配置信息.  

broker_url='redis://:@127.0.0.1:6379/1'
result_backend='redis://:@127.0.0.1:6379/2'

# 指定任务发到那个队列中
task_routes=({
    'proj.tasks.my_task5': {'queue': 'queue1'},
    'proj.tasks.my_task6': {'queue': 'queue1'},
    'proj.tasks.my_task7': {'queue': 'queue2'},
    },
)


# 配置周期性任务, 或者定时任务
beat_schedule = {
    'every-5-seconds':
        {
            'task': 'proj.tasks.my_task8',
            'schedule': 5.0,
            # 'args': (16, 16),
        }
}

  tasks.py模块内容如下:

from proj.celery import app as celery_app


@celery_app.task
def my_task1(a, b):
    print("my_task1任务正在执行....")
    return a + b


@celery_app.task
def my_task2(a, b):
    print("my_task2任务正在执行....")
    return a + b


@celery_app.task
def my_task3(a, b):
    print("my_task3任务正在执行....")
    return a + b


@celery_app.task
def my_task4(a, b):
    print("my_task3任务正在执行....")
    return a + b


@celery_app.task
def my_task5():
    print("my_task5任务正在执行....")




@celery_app.task
def my_task6():
    print("my_task6任务正在执行....")



@celery_app.task
def my_task7():
    print("my_task7任务正在执行....")


# 周期执行任务
@celery_app.task
def my_task8():
    print("my_task8任务正在执行....")

  启动woker处理周期性任务:

celery -A proj worker --loglevel=info --beat

  如果我们想指定在某天某时某分某秒执行某个任务,可以执行cron任务, 增加配置信息如下:

beat_schedule = {
    'every-5-minute':
        {
            'task': 'proj.tasks.period_task',
            'schedule': 5.0,
            'args': (16, 16),
        },
    'add-every-monday-morning': {
        'task': 'proj.tasks.period_task',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },

}

crontab例子: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

  开启一个celery beat服务:

celery -A proj beat

  celery需要保存上次任务运行的时间在数据文件中,文件在当前目录下名字叫celerybeat-schedule. beat需要访问此文件:

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule



原文地址:https://www.cnblogs.com/alexzhang92/p/9553801.html