Django中使用Celery

  • 安装

    pipenv install django-celery
    
  • 主配置文件导入celery配置

    # settings.py
    
    from .celeryconfig import *
    BROKER_BECKEND = 'redis'
    BROKER_URL = 'redis://192.168.2.128:6379/1'
    CELERY_RESULT_BACKEND = 'redis://192.168.2.128:6379/2'
    
  • celery配置文件

    #celeryconfig.py
    
    from datetime import timedelta
    
    import djcelery
    # setup_loader用于扫描所有app下tasks.py文件中的task
    djcelery.setup_loader()
    
    # 设置不同的任务队列,将普通任务和定时任务分开,redis中实际key如下
    # 1) "_kombu.binding.work_queue"
    # 2) "_kombu.binding.beat_tasks"
    CELERY_QUEUES = {
        'beat_tasks': {
            'exchange': 'beat_tasks',
            'exchange_type': 'direct',
            'binding_key': 'beat_tasks'
        },
        'work_queue': {
            'exchange': 'work_queue',
            'exchange_type': 'direct',
            'binding_key': 'work_queue'
        },
    }
    
    # 设置默认队列
    CELERY_DEFAULT_QUEUE = 'work_queue'
    
    CELERY_IMPORTS = (
        'course.tasks',
    )
    
    # 有些情况下可以防止死锁
    CELERY_FORCE_EXECV = True
    
    # 设置并发的worker数量
    CELERY_CONCURRENCY = 4
    
    # 允许重试
    CELERY_ACKS_LATE = True
    
    # 每个worker最多执行100个任务被销毁,可以防止内存泄漏
    CELERY_MAX_TASKS_PER_CHILD = 100
    
    # 单个任务的最大运行时间
    CELERY_TASK_TIME_LIMIT = 12 * 30
    
    # 定时任务配置
    CELERYBEAT_SCHEDULE = {
        'task1': {
            'task': 'course-task',
            'schedule': timedelta(seconds=5),
            'options': {
                'queue': 'beat_tasks'
            }
        }
    }
    
    
  • app中创建tasks.py并创建task

    # (app)course->tasks.py
    
    import time
    from celery.task import Task
    
    
    class CourseTask(Task):
        name = 'course-task'
    
        def run(self, *args, **kwargs):
            print('start course task')
            time.sleep(4)
            print('args={}, kwargs={}'.format(args, kwargs))
            print('end course task')
    
  • app视图函数调用task

    # (app)course->views.py
    
    from django.http import JsonResponse
    from course.tasks import CourseTask
    
    
    def do(request):
        # 执行异步任务
        print('start do request')
        # CourseTask.delay()
        CourseTask.apply_async(args=('hello',), queue='work_queue')
        print('end do request')
        return JsonResponse({'result': 'ok'})
    
  • 启动命令

    python manage.py celery worker -l INFO
    python manage.py celery beat -l INFO
    
原文地址:https://www.cnblogs.com/Peter2014/p/11629080.html