django 中如何使用celery 和redis

1.先说才历史问题celery版本支持问题对于3.7以及以上暂时不支持,建议版本3.6,kombu 库里面使用了3.6async 方法名在3.7是内部方法

她。django-redis 要求redis<3.0 ,而celery支持redis版本2.x,所以会冲突,只能卸载django-redis 

我的python3.6.6+django2.2.16+redis2.10.6+django-celery3.3.1+Celery3.1.26.post2,我的redis server 用的3.0.0 

pip install -r requirements.txt  -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

amqp 1.4.9
anyjson 0.3.3
appdirs 1.4.4
billiard 3.3.0.23
celery 3.1.26.post2
celery-with-redis 3.0
certifi 2020.6.20
chardet 3.0.4
click 7.1.2
click-didyoumean 0.0.3
click-repl 0.1.6
coreapi 2.3.3
coreschema 0.0.4
distlib 0.3.1
Django 2.2.16
django-celery 3.3.1
django-filter 2.4.0
djangorestframework 3.12.1
drf-extensions 0.6.0
filelock 3.0.12
flower 0.9.5
humanize 3.0.1
idna 2.10
importlib-metadata 2.0.0
importlib-resources 3.0.0
install 1.3.4
itypes 1.2.0
Jinja2 2.11.2
kombu 3.0.37
MarkupSafe 1.1.1
pip 20.2.4
prometheus-client 0.8.0
prompt-toolkit 3.0.8
PyMySQL 0.10.1
pytz 2020.1
redis 2.10.6
requests 2.24.0
setuptools 39.0.1
six 1.15.0
sqlparse 0.4.1
tornado 6.0.4
uritemplate 3.0.1
urllib3 1.25.10
vine 5.0.0
virtualenv 20.0.35
wcwidth 0.2.5
zipp 3.3.1

2.配置setting.py

注册“djcelery" 应用

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
# 'codedog',
'rest_framework',
"app01",
"celeryapp",
"djcelery"
]

引入celery broker backend ,指定时区
from celeryapp.celeryconfig import *
BROKER_URL = "redis://localhost:6379/1"
BROKER_BACKEND="redis"
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"
CELERY_SEND_TASK_SENT_EVENT = True
CELERY_TIMEZONE  = "Asia/Shanghai"
3.在app下新建celeryconfig.py 引入如下任务配置

import djcelery
djcelery.setup_loader()

CELERY_IMPORTS = (
'celeryapp.tasks'
)
4.运行celery worker
python manage.py celery worker -l INFO
启动后如下:
leak, never use this setting in production environments!
  warn('Using settings.DEBUG leads to a memory leak, never '
[2020-10-18 16:51:45,876: WARNING/MainProcess] celery@DESKTOP-PBNSFDJ ready.
[2020-10-18 16:51:55,941: INFO/MainProcess] Received task: test-task[42fc0b2b-be80-4d61-a038-5ef13b80da92]
[2020-10-18 16:51:55,944: WARNING/Worker-1] start task  of celery ...........
[2020-10-18 16:51:55,944: WARNING/Worker-1] enter test demo args is (), {}................
[2020-10-18 16:51:55,945: WARNING/Worker-1] end task  of celery ...........
[2020-10-18 16:51:58,953: INFO/MainProcess] Task test-task[42fc0b2b-be80-4d61-a038-5ef13b80da92] succeeded in 3.0s: ()
[2020-10-18 17:06:08,823: INFO/MainProcess] Events of group {task} enabled by remote.
[2020-10-18 17:06:21,157: INFO/MainProcess] Received task: test-task[814aeb82-1675-42f2-a6a7-e1b31f09012d]
[2020-10-18 17:06:21,213: WARNING/Worker-1] start task  of celery ...........
[2020-10-18 17:06:21,220: WARNING/Worker-1] enter test demo args is (), {}................

  




5.安装flower ,监控 pip install flower==0.9.5
启动flower
方式一:
# celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:password@10.1.10.180:5762
如果是redis:  --broker=redis://localhost:6379/1
方式二:通过django manage 自动获取settings.py broker 配置

python manage.py celery flower
启动后console日志如下
[I 201018 17:06:03 command:140] Visit me at http://localhost:5555
[I 201018 17:06:03 command:145] Broker: redis://localhost:6379/1
[I 201018 17:06:03 command:148] Registered tasks:
    ['celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'test-task']

  

6.编写celery 任务tasks.py:

from celery.task import Task
import time

class TestTask(Task):
    name="test-task"
    def run(self,*args,**kwargs):
        print("start task  of celery ...........")
        print("enter test demo args is {}, {}................".format(args,kwargs))
        print("end task  of celery ...........")
        time.sleep(3)
        return   args

 7.注册路由和视图编写

from django.http.response import JsonResponse
from rest_framework.views import APIView
from celeryapp.tasks import TestTask

class DoCeleryView(APIView):

    def get(self,*args,**kwargs):
        res= TestTask.delay()
        return JsonResponse(data={"msg":"ok............"})

  


from django.conf.urls import url
from celeryapp import views

urlpatterns = [
url(r'^do$', views.DoCeleryView.as_view()),
]
8.注意以上操作需要先启动django
python manage.py runserver localhost:8000

 

 9.flower 任务可视化:

 appconfig.py 详细教程:https://www.imooc.com/learn/1051

原文地址:https://www.cnblogs.com/SunshineKimi/p/13838027.html