celery+django实践

实验目标:

访问http://127.0.0.1:8000/task_id/    返回任务ID:068cd3cc-3572-46c0-9a33-f028772030b1

访问http://127.0.0.1:8000/task_result/?id=068cd3cc-3572-46c0-9a33-f028772030b1    将上面的任务ID做查询参数,执行查询返回任务执行结果:252

实验过程:

要在django项目中使用celery,需要将celery.py加到项目目录下,将tasks.py加到app目录下,修改__init__.py配置和settings配置

目录结构如下:项目名proj, 应用名app

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 写上这个celery就能在单个.py文件中用Django的配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('projj')  #这个可以随便取,一般为项目名

# celery的配置在django的settings.py文件中
# namespace='CELERY'表示所有与celery有关的配置都是`CELERY_`的格式
app.config_from_object('django.conf:settings', namespace='CELERY')

'''
自动发现每个app下的tasks.py文件,例如:
- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py
'''
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
celery.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    time.sleep(10)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)
tasks.py
from __future__ import absolute_import, unicode_literals

# 保证celery_app一直处于加载状态
# @shared_task会使用它
from .celery import app as celery_app

__all__ = ['celery_app']
项目下的__init__.py
# 在settings.py最后加上:
CELERY_BROKER_URL = 'redis://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost'
settings.py增加celery配置

 需要注意的是如何时间方面有涉及到中国地区的话,需在配置中加入时区信息CELERY_TIMEZONE = 'Asia/Shanghai',默认为以utc为标准。

from django.contrib import admin
from django.urls import path
from app import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('task_id/', views.get_result_id),
    path('task_result/', views.get_result),
]
urls.py
from django.shortcuts import HttpResponse
from . import tasks
from celery.result import AsyncResult


def get_result_id(request):
    t = tasks.add.delay(228, 24)
    return HttpResponse(t.id)


def get_result(request):
    task_id = request.GET.get('id', '')
    res = AsyncResult(id=task_id)
    if res.ready():
        return HttpResponse(res.get())
    else:
        return HttpResponse('正在执行,稍后才有结果...')
views.py

启动方式:

1.先启动redis服务

`redis-server`

2.启动celery的worker

[root@localhost ~]# cd proj/       #一定要去项目目录下,否则会提示找不到proj
[root@localhost proj]# celery -A proj worker -l debug   

# 也可以启动多个worker, 下面是启动两个worker:w1和w2

[root@localhost proj]# celery multi start w1 -A proj -l debug
  celery multi v4.4.5 (cliffs)
  > Starting nodes...
  > w1@localhost.localdomain: OK
[root@localhost proj]# celery multi start w2 -A proj -l debug
  celery multi v4.4.5 (cliffs)
  > Starting nodes...
  > w2@localhost.localdomain: OK

3. 启动django项目

[root@localhost ~]# cd proj/
[root@localhost proj]# python3 manage.py runserver 0.0.0.0:8000

OK!访问就能成功了

django执行celery定时任务

1. 安装beat:

pip3 install django-celery-beat

2. 将beat加进app

INSTALLED_APPS = [
...
'django_celery_beat',
]

3. 在tasks.py中定义定时任务

@shared_task
def sayhi(name):
print("Hi," + name)

4.在表中定义定时任务

先生成表:

`python3 manage.py makemigrations`

`python3 manage.py  migrate`

启动流程:

1. 开启redis服务:`redis-server`

2.开启celery的worker:`celery -A proj worker -l debug`   //在项目目录下执行

3.启动django项目:`python3 manage.py  runserver 0.0.0.0:8000`

4. 启动beat,作用是定时丢任务到redis中:  `celery -A proj beat -l info -S django`

注意,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

原文地址:https://www.cnblogs.com/staff/p/13130500.html