django-celery提供给顾客使用实例

导入数据库

from djcelery import models as celery_models

 
celery_models.PeriodicTask.objects.create(...)
celery_models.PeriodicTask.ojects.get(name='add')
 
查询任务信息
def read(self, request, *args, **kwargs):
        try:
            task = celery_models.PeriodicTask.objects.get(name=self.TASK_NAME)
            if task.enabled:
                return {
                    'enabled': True,
                    'day_of_month': int(task.crontab.day_of_month),
                    'last_run_at': task.last_run_at if task.last_run_at else '0'
                }
            else:
                return {'enabled': False}
        except celery_models.PeriodicTask.DoesNotExist:
            return {'enabled': False}

更新日期

def create(self, request, *args, **kwargs):
        enabled = request.POST.get('enabled', None)
        if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]:
            return self.operate_fail('无效参数')
        if enabled == self.DISABLED_POST_VALUE:
            self.disable_task(self.TASK_NAME)
            return self.operate_success()
        else:
            try:
                day_of_month = int(request.POST.get('day_of_month', ''))
                if day_of_month > 28 or day_of_month < 1:
                    return self.operate_fail('日期必须在1-28日之间')
                task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading",
                                                                                 task="mrs_app.my_celery.tasks.monthly_reading_task")
                if created:
                    crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month,
                                                                           hour=0,
                                                                           minute=0)
                    crontab.save()
                    task.crontab = crontab
                    task.enabled = True
                    task.save()
                else:
                    task.crontab.day_of_month = day_of_month
                    task.crontab.save()
                    task.enabled = True
                    task.save()
                return self.operate_success()
            except ValueError:
                return self.operate_fail('抄表日不能为空')

关闭定时

def disable_task(self, name):
        try:
            task = celery_models.PeriodicTask.objects.get(name=name)
            task.enabled = False
            task.save()
            return True
        except celery_models.PeriodicTask.DoesNotExist:
            return True

 定义任务的两种格式

  • 类定义:一个继承了celery.app.task的类并实现了run方法

  • 函数定义:@task装饰的函数   

from celery import task
 
#第一种,函数方式  
 @task(name='monthly_reading')
def monthly_reading_task():
    task_obj = MonthlyReading(debug=False)
    task_obj.start()
     
#第二种,类定义
class MonthlyReadingTask(Task):
    name='monthly_reading'
    def run(*args, **kwargs): 
        task_obj = MonthlyReading(debug=False)
        task_obj.start()
原文地址:https://www.cnblogs.com/tuifeideyouran/p/4192054.html