Django3+celery_rabbitmq 实现异步

1.安装

	pip install django-celery
	pip install flower

2.部署RabbitMQ:

1.下载下载并安装erlang原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。
		1.下载地址:http://www.erlang.org/downloads
		2.安装路径:D:Worktoolserl
		3.环境变量:
			set ERLANG_HOME = D:Worktoolserl;
			set PATH=%Path%;%ERLANG_HOME%in;
2.安装下载并安装RabbitMQ
		1.下载地址:http://www.rabbitmq.com/download.html
		2.环境变量:
			set RABBITMQ_HOME = D:WorktoolsRabbitMQ Server;
			set PATH=%Path%;%RABBITMQ_HOME%
abbitmq_server-3.8.3sbin;
		3.RabbitMQ安装好后接下来安装RabbitMQ-Plugins。
			1.打开命令行cd,输入RabbitMQ的sbin目录;
			2.rabbitmq-plugins enable rabbitmq_management
			3.rabbitmqctl status 验证安装成功
		4.启动Rabbit-server
			rabbitmq-server

3.启动web服务,启动celery

        python manage.py runserver 127.0.0.1:6000
	python manage.py celery worker -l info   
        # 如果要启动flower
	#python manage.py celery flower
	#python manage.py celery flower --basic_auth=username:password     

4.在project的settings同级目录创建celery.py文件

# -*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
from django.conf import settings
import django


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'projectname.settings')

app = Celery('projectname')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

5.在project的settings同级目录__init__.py文件添加:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

6.在app目录创建tasks.py(必须是这个名字)

# -*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from project.celery import app

# 在需要异步的函数加装饰器@shared_task
@shared_task
def task_begin(cases):
    try:
        cmd = 'ipconfig'
        res = os.popen(cmd)
        print(cmd)
    except Exception as err:
        log.error("*" * 50 + " test fail " + "*" * 50)
        log.error(os.path.basename(__file__).replace(".py", "") + 'fail: %s' % str(err))

7.在views添加请求

# -*- coding:utf-8 -*-
from django.shortcuts import render
from django.http import request, response, HttpRequest, HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_protect, csrf_exempt
from django.core import serializers
import sys
import os
import json
import requests
import multiprocessing
from .tasks import *



def start(request):
    try:
        res = {"code": 200, "msg": "sucess"}
        if request.method == "POST":
           # 如果函数有参数,添加在delay里面
           task_begin.delay()
        else:
            res["code"] = 405
            res["msg"] = "method error"
        return HttpResponse(json.dumps(res), content_type="application/json")
    except Exception as err:
        raise Exception(str(err))    

8.在settings中配置celery和rabbitmq:

import os
from corsheaders import *
# from .celeryconfig import *
import djcelery

djcelery.setup_loader()
# celery配置
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_BROKER_URL = 'amqp://rabbitmqguest:guest@localhost:5672/vhost'
CELERY_TIMEZONE = 'Asia/Shanghai'
# CELERY_REDIS_MAX_CONNECTIONS = 4
# CELERYD_CONCURRENCY = 4
# BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 5}  #
# BROKER_URL = 'redis://localhost:6379/0'  # redis的配置,如果是rabbitmq就不是这个配置
# CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
DEBUG = False

  

原文地址:https://www.cnblogs.com/breakcircle/p/12758912.html