Python异步任务模块之-celery

celery 简单上手使用

  1. pip install celery
  2. 并且保持你有一个可使用的redis或者rabbitmq

teak.py

from celery import Celery

app = Celery("task",  # 与文件名无关
             broker="redis://127.0.0.1:6379",  # 消息中间件
             backend="redis://127.0.0.1:6379"  # 结果存放位置
             )


@app.task  ## 被装饰后可以执行的任务
def add(x, y):
    print("running.....")
    return x + y

启动celery work

command: celery -A task worker -l info

在task.py 文件位置打开python解释器

>>> import task
>>> task.add(1,2)
running.....
3
>>> aa = task.add.delay(1,2)
>>> aa
<AsyncResult: 80691275-5575-45e3-b525-1a0994a1e7df>
>>> aa.get()
3
>>>

celery work 完成相关的任务:

对于会执行很长时间的任务

@app.task 
def add(x, y):
    print("running.....")
    import time
    time.sleep(30)
    return x + y
  1. 此时在程序中任务未被work执行完成时,消费者去get会被hang住,我们可以使用 ready() 方法来判断是否结束:aa.ready() 会返回一个bool值
  2. 还可以在使用get方法中添加timeout参数,超时后抛出 celery.exception.TimeoutEroor 异常
  3. 具体的任务异常了(出错),也会导致get方法抛出异常
  4. 为了将work的异常获取到作为返回值,可以使用aa.get(propagate=False) 来获取异常内容,而详细的异常会保存在aa.traceback 中,这样不会导致取结果消费者的程序出问题

在项目中使用 celery

项目目录结构:

CeleryPro
├── CeleryPro
│   ├── __init__.py
│   ├── __pycache__
│   ├── celery.py
│   ├── settings.py
│   ├── tasks.py
│   ├── tasks_2.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
└── templates

celery.py

# python默认从当前路径导入,受用 absolute_import 来导入安装目录中的 celery
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery("Celery",  ## 你的celery项目名称
             broker='redis://127.0.0.1:6379/',
             backend='redis://127.0.0.1:6379/',
             include=["CeleryPro.tasks", "CeleryPro.tasks_2", ""]
             )

app.conf.update(
    result_expires=3600  # 任务结果存储在消息中间件中的expires时间
)

if __name__ == '__main__':
    app.start()

task.py

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动方法

# 前台启动:
command: celery -A CeleryPro worker -l info
# 后台启动多个 worker:
command: celery multi start  worker1 -A  CeleryPro worker -l info 
# 后台启动的 worker 会创建相关的 【wokrer名称.log】【wokrer名称.pid】
# 多个 worker 可以启动在多个机器上,只要保持消息中间件唯一

其他方法

command: celery multi restart worker1 worker2 -A CeleryPro
command: celery multi stop worker1 worker2 

定时任务

目录结构

├── CeleryPro
│   ├── __init__.py
│   ├── celery.py
│   ├── periodic_tasks.py  # 定时任务文件
│   ├── settings.py
│   ├── tasks.py
│   ├── tasks_2.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
└── templates

periodic_tasks.py

from __future__ import absolute_import, unicode_literals
from celery.schedules import crontab
from .celery import app


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, test.s("Hello"), name="10s_tasks")
    sender.add_periodic_task(30.0, test.s("World"), name="30s_tasks")
    sender.add_periodic_task(
        crontab(hour=21, minute=58, day_of_week=7),
        test.s("Happy monday")
    )


@app.task
def test(arg):
    print(arg)

启动

# 启动 worker,在启动之前,需要将新的任务调度文件注册到 celery 的include中
---------------------------------------
app = Celery("Celery",  ## 你的celery项目名称
             broker='redis://127.0.0.1:6379/',
             backend='redis://127.0.0.1:6379/',
             include=["CeleryPro.tasks", "CeleryPro.tasks_2", "CeleryPro.periodic_tasks"]
             )
---------------------------------------
command: celery -A CeleryPro worker -l info

# 启动 beat
command: celery -A CeleryPro.periodic_tasks beat -l info  # 需要指定项目中的具体任务文件

调度测试结果


动态添加定时调度任务,通过这种方式,我们可以按照参数的方式给传入新的定时任务

celery.py

app.conf.beat_schedule = {
    "add_every_5_seconds":{
        'task':'CeleryPro.tasks.add',
        'schedule': 5.5,
        'args':(1,2)
    }
}
app.conf.timezone = 'UTC'

更多文档可以查看:点击

在 Django 项目中使用 celery 文档:

Django 目录结构

CeleryPro # 项目
├── CeleryPro  
│   ├── __init__.py
│   ├── __pycache__
│   ├── celery.py  
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── app01   ## app 
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py   # 必须要为 tasks
│   ├── tests.py
│   └── views.py
├── manage.py
└── templates

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 即如果需要在自己的脚本中访问 django 项目的相关,models需要如下配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryPro.settings')


app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 当 celery 与 django 结合后,你的 selery 使用的消息中间件需要配置在 django 的项目配置中
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 到各个 app 中自动发现 tasks 文件
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

app01/tasks.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "Leon"
# Date: 2019/5/26
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task  ## 所有的 app 下都可以调用
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

settings.py

CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'

urls.py

from django.conf.urls import url
from django.contrib import admin
from app01 import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^AsyncCeleryCall', views.AsyncCeleryCall),
    url(r'^AsyncCeleryGet', views.AsyncCeleryGet),
]

app01/views.py

from django.shortcuts import render, HttpResponse

# Create your views here.
from app01 import tasks
import random


def AsyncCeleryCall(request):
    """
    
    :param request:
    :return:
    """
    if request.method == "GET":
        a = random.randint(1, 1000)
        t = tasks.add.delay(a, 6)
        print(a)
        return HttpResponse(t.id)

    elif request.method == "POST":
        pass

    else:
        pass


def AsyncCeleryGet(request):
    """

    :param request:
    :return:
    """
    from celery.result import AsyncResult
    if request.method == "GET":
        task_id = request.GET.get('id')
        res = AsyncResult(id=task_id)
        if res.ready():
            return HttpResponse(res.get())
        else:
            return HttpResponse(res.ready())

    elif request.method == "POST":
        pass

    else:
        pass

测试

触发任务

获取结果

Django定时任务

安装新的模块pip install django-celery-beat

在 settings 中加入 app

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    "django_celery_beat"  ## here
]

创建模块相关的数据库表

#: python manage.py migrate

Operations to perform:
  Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions
Running migrations:
  Applying django_celery_beat.0001_initial... OK
  Applying django_celery_beat.0002_auto_20161118_0346... OK
  Applying django_celery_beat.0003_auto_20161209_0049... OK
  Applying django_celery_beat.0004_auto_20170221_0000... OK
  Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
  Applying django_celery_beat.0006_auto_20180322_0932... OK
  Applying django_celery_beat.0007_auto_20180521_0826... OK
  Applying django_celery_beat.0008_auto_20180914_1922... OK
  Applying django_celery_beat.0006_auto_20180210_1226... OK
  Applying django_celery_beat.0006_periodictask_priority... OK
  Applying django_celery_beat.0009_periodictask_headers... OK
  Applying django_celery_beat.0010_auto_20190429_0326... OK
  Applying django_celery_beat.0011_auto_20190508_0153... OK

创建一个 admin 的新的超级用户

python manage.py createsuperuser

打开 django-admin后台

创建新的定时规则


创建任务与规则的关联,并设定相关的传入参数以及配置


启动 worker

command: celery -A CeleryPro worker -l info

启动 beat,每次新添加任务,需要重启 beat

celery -A CeleryPro beat -l info -S django

结果:


本次代码地址

下载

原文地址:https://www.cnblogs.com/forsaken627/p/10925714.html