celery

概要

  • celery,处理任务的Python的模块。

    • 场景1:

      对【耗时的任务】,通过celery,将任务添加到broker(队列),然后立即给用户返回一个任务ID。
      当任务添加到broker之后,由worker去broker获取任务并处理任务。
      任务弯完成之后,再将结果放到backend中
      
      用户想要检查结果,提供任务ID,我们就可以去backend中去帮他查找。
      
    • 场景2:

      定时任务(定时发布/定时拍卖)
      

详细

1.celery

celery是一个基于Python开发的模块,可以帮助我们对任务进行分发和处理。

1.1 环境的搭建

pip3 install celery==4.4
安装broker: redis或rabbitMQ
pip3 install redis / pika

1.2 快速使用

  • s1.py

    from celery import Celery
    
    app = Celery('tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379')
    
    @app.task
    def x1(x, y):
        return x + y
    
    @app.task
    def x2(x, y):
        return x - y
    
  • s2.py

    from s1 import x1
    
    result = x1.delay(4, 4) #任务添加到broker(队列)
    print(result.id)
    
  • s3.py

    from celery.result import AsyncResult
    from s1 import app
    
    result_object = AsyncResult(id="任务ID", app=app) #提交任务ID,去backend查找
    print(result_object.status)
    

运行程序:

  1. 启动redis

  2. 启动worker

    linux:
    # 进入当前目录
    celery worker -A s1 -l info
    
    windows:
    
    pip install eventlet
    #执行:
    celery worker -A s1 -l info -P eventlet
    
  3. 创建任务

    python s2.py
    python s2.py
    
  4. 查看任务状态

    # 填写任务ID
    ptyhon s3.py 
    

1.3 django中应用celery

之后,需要按照django-celery的要求进行编写代码。

  • 第一步:【项目/项目/settings.py 】添加配置

    CELERY_BROKER_URL = 'redis://192.168.16.85:6379'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379'
    CELERY_TASK_SERIALIZER = 'json'
    
  • 第二步:【项目/项目/celery.py】在项目同名目录创建 celery.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings')
    
    app = Celery('demos')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 去每个已注册app中读取 tasks.py 文件
    app.autodiscover_tasks()
    
  • 第三步,【项目/app名称/tasks.py】

    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    @shared_task
    def mul(x, y):
        return x * y
    
  • 第四步,【项目/项目/__init__.py

    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    
  • 启动worker

    进入项目目录
    
    celery worker -A demos -l info -P eventlet
    
  • 编写视图函数,调用celery去创建任务。

    • url

      url(r'^create/task/$', task.create_task),
      url(r'^get/result/$', task.get_result),
      
    • 视图函数

      from django.shortcuts import HttpResponse
      from api.tasks import x1
      
      def create_task(request):
          print('请求来了')
          result = x1.delay(2,2)
          print('执行完毕')
          return HttpResponse(result.id)
      
      
      def get_result(request):
          nid = request.GET.get('nid')
          from celery.result import AsyncResult
          # from demos.celery import app
          from demos import celery_app
          result_object = AsyncResult(id=nid, app=celery_app)
          # print(result_object.status)
          data = result_object.get()
          return HttpResponse(data)
      
  • 启动django程序

    python manage.py ....
    

1.4 celery定时执行

def create_task(request):
    # # 1 耗时任务(立即执行)
    # result = tasks.add.delay(2, 8)
    # print(result.id)

    # 2 定时任务(需要转化为utc时间)
    ctime = datetime.datetime.now() # --datetime.datetime(2021, 3, 12, 3, 2, 12, 999416)
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) # utc时间
    s10 = datetime.timedelta(seconds=10)
    ctime_x = utc_ctime + s10
    # 使用apply_async并设定时间
    result =tasks.add.apply_async(args=[1, 3], eta=ctime_x)

    print(result.id)
    return HttpResponse('交给broker队列中,让celery去执行')



def get_result(request):
    task_id = request.GET.get('nid')
    print(task_id)
    result_object = AsyncResult(id=task_id, app=celery_app)
    # print(result_object.status)  # 获取状态
    # print(result_object.get())  # 获取数据
    # result_object.forget() # 将数据在backend中移除
    # result_object.revoke() # 取消任务
    # 使用模板
    if result_object.successful():
        data = result_object.get()
        result_object.forget()
        print('成功', data)
    elif result_object.failed():
        pass
    else:
        pass

    return HttpResponse('celery。。。')

1.5 周期性定时任务

  • celery
  • django中也可以结合使用

celery_django_demo点击下载

作者:华王 博客:https://www.cnblogs.com/huahuawang/
原文地址:https://www.cnblogs.com/huahuawang/p/14813546.html