Django项目中接受前端页面点击事件异步执行之celery+redis

问题场景:

当在Python Django项目中,创建多个APP应用,并且Django实现后端业务逻辑处理过程,属于(后端代码),既然后有后端代码,那基本会与前端代码(前端页面)进行人机交互操作。在此情况下,基本操作前端输入对应的参数和点击对应的按钮事件,将数据实时传输给后端代码进行业务处理,然后并在最短时间内反馈到前端,实现一次完整的输入输出过程(该过程实时并高效快速),但往往该情况属于实时处理,既然有实时,也存在异步操作,此处就以Python调用ansible执行自动化部署操作为例,在该例子中,前端提交部署请求,Python调用ansible进行部署处理,在处理过程中,一般简单操作会快速执行完成并返回结果数据,但当部署复杂的ansible操作时其中等待的时间会更长,给前端会处于等待状态,会因此带来拥塞情况和用户体验较差。针对此情况,Django项目有他处理的成熟方案。

解决方案:

根据以上提出的问题,进行分析,既要让用户不处于一直等待状态,又要让该任务后端异步执行,同时用户需要在任务执行完成后被动知道最终结果。综上所述得出方案:Django项目中采用celery异步任务执行+redis任务队列数据存储+执行任务结束回调过程。

具体实现:mysql存储执行结果日志,redis缓存,Django1.11.7,Python3.6.3(asstes项目,asstes_cd APP应用)

1、Django项目基础依赖包:ansible,celery,Django,Django,django-celery-beat(django项目中会生成定时任务表),redis

(assets) root@python-dev:/application/assets# pip list
Package             Version    
------------------- -----------    
ansible             2.3.2.0   
celery              4.2.0     
Django              1.11.7     
django-celery-beat  1.1.1      
django-filter       1.1.0      
django-mysql        2.2.2      
django-rest-swagger 2.1.2      
djangorestframework 3.7.3    
pip                 10.0.1    
PyMySQL             0.8.0   
redis               2.10.6     
requests            2.18.4     
requests-ntlm       1.1.0      
setuptools          28.8.0          
urllib3             1.22     
virtualenv          16.0.0     

2、Django项目asstes主项目配置:redis数据库用于celery任务存放

CELERY_BROKER_URL = 'redis://localhost:6379/0'
#CELERY_RESULT_BACKEND = 'redis://'
#CELERY_RESULT_PERSISTENT = False
#CELERY_TASK_SERIALIZER = 'json'
#CELERY_RESULT_SERIALIZER = 'json'
#CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = TIME_ZONE
# CELERY_ENABLE_UTC = True

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_beat',
]

 

3、Django项目asstes主目录下__init__.py配置启用celery在项目中使用

import pymysql
pymysql.install_as_MySQLdb()
from .celerytask import app as celery_app

# include all celery task

__all__ = ['celery_app']

4、Django项目asstes主目录下建立celerytask.py实现Django与celery任务对接

import os
from celery import Celery
from celery.schedules import crontab

# By default, celery searches for a submodule named celery.py
# http://docs.celeryproject.org/en/latest/getting-started/
# next-steps.html#about-the-app-argument
#
# -- Refer
# http://docs.celeryproject.org/en/latest/
# https://github.com/celery/celery/tree/master/examples/django/
# http://docs.celeryproject.org/en/latest/userguide/application.html
# http://docs.celeryproject.org/en/latest/userguide/configuration.html


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'assets.settings')


#app = Celery('oms', backend='rpc://', broker='pyamqp://guest@localhost//')
app = Celery('assets')


# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')


# Load task modules from all registered Django app configs.
#from django.conf import settings
#app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()

5、APP asstes_cd应用中建立view.py业务类(与前端交互API),tasks.py(celery任务处理类)

view类:由于内容过长,直接上重点

from assets_cd import tasks #导入task celery任务处理
.............省略.............
oper_log_id_list = [] ip_list = [] ssh_list = [] path_list = [] with transaction.atomic(): for deploy in deploy_log_list: deploy.save() oper_log_id_list.append(deploy.id) ip_list.append(deploy.host_ip) ssh_list.append(deploy.ssh_user) path_list.append(deploy.service_path) callback_id = ','.join([str(x) for x in oper_log_id_list]) ip_list = ','.join([str(ip) for ip in ip_list]) ssh_list = ','.join([str(ssh) for ssh in ssh_list]) path_list = ','.join([str(path) for path in path_list]) logger.info('callback_id, {}'.format(callback_id)) logger.info('add tasks_deploy, tasks_deploy.run_playbook_task_deploy.delay') for playbook in play_book_list: tasks.run_playbook_task.apply_async((playbook,), queue='priority.high') #此处任务丢入执行 logger.info('add tasks_deploy completed, congrats')
          #将以下日志反馈到前端 deployment_result_list.append({
'id': '{}'.format(callback_id), 'ip_list': ip_list, 'ssh_user': ssh_list, 'path': path_list, 'status':'任务提交celery成功,请在到日志列表中查询相关记录.' })
 .............省略.............

tasks.py类

# -*- coding: utf-8 -*-
import logging
import os
import time
try: import simplejson as json
except ImportError: import json
from util import ansibleutil
import logging
from util import deploymentutil
from celery import shared_task #导入celery任务执行装饰器
import traceback
import datetime
import requests
import time
from django.db import transaction
try:
    import simplejson as json
except ImportError:
    import json
from assets_cd.models import DeployLog

logger = logging.getLogger(__name__)

@shared_task  #装饰器引用,执行celery异步开始
def run_playbook_task(playbook):
    begin_time = datetime.datetime.now()
    end_time = datetime.datetime.now()
    diff_time = end_time - begin_time
    elapsed_time = '{}.{}'.format(diff_time.seconds, diff_time.microseconds)
    serial_number = playbook['serial_number']
    callback_url = playbook['callback_url']

    try:
        ssh_user = playbook['ssh_user']
        sudo_user = playbook['sudo_user']
        playbook_path = playbook['playbook_path']
        playbook_extra_vars = playbook['playbook_extra_vars']
        os_type = playbook['os_type']
        ip = playbook['ip']
        result = 0
        DeployLog.objects.filter(
            serial_number=serial_number).update(
            begin_time=begin_time,
            status=-3,
            result=-3,
        )

#ansible任务执行过程,耗时就在此处,将整个任务丢入celery异步执行。不影响前端快速响应 ansible_result
= ansibleutil.run_playbook( serial_number=serial_number, ssh_user=ssh_user, sudo_user=sudo_user, playbook_path=playbook_path, playbook_extra_vars=playbook_extra_vars, os_type=os_type, ) deploymentutil.del_maint_ip_list(ip) logger.info('ansible_result {}'.format(ansible_result)) #执行ansible操作完成后结果状态反馈。 result = ansible_result log_file = DeployLog.objects.get(serial_number=serial_number) if log_file: log_content = log_file.result_log # with open(log_file,'r') as f: # log_content = f.read() log_content = str(log_content).replace(' ', '<br>').replace(' ', '&nbsp;' * 8) logger.info(log_content) else: logger.info("reslut:[]") except: msg = traceback.format_exc() logger.error(msg) # log_dir = '/var/log/ansible' log_file = DeployLog.objects.get(serial_number=serial_number) if log_file: data = log_file.result_log +' '+ msg with transaction.atomic(): log_file.result_log = data log_file.save() else: logger.error(msg) result = -1 try: logger.info("callback_url:{}".format(callback_url)) logger.info("result:{}".format(result)) if result != 0: DeployLog.objects.filter( serial_number=serial_number).update( begin_time=begin_time, end_time=end_time, elapsed_time=elapsed_time, status=result, result=result, )
#执行结束前进行接口回调操作
if callback_url != "" or callback_url is not None: log_file = DeployLog.objects.get(serial_number=serial_number) log = -1 data_log = { "id": "{}".format(log_file.id), "result": int(log), } logger.info(json.dumps(data_log)) try: req = requests.post(callback_url,json.dumps(data_log),timeout=120) logger.info(req.status_code) logger.info(req.content) if req.status_code != 200: for i in range(3): time.sleep(5) req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break elif req.status_code == 200: logger.info("callback log url seccess.") except Exception as e: logger.info(e.args[0]) for i in range(3): time.sleep(5) try: req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break except Exception as e: logger.info(e.args[0]) pass else: DeployLog.objects.filter( serial_number=serial_number).update( end_time=end_time, elapsed_time=elapsed_time, status=result, result=result, ) if callback_url != "" or callback_url is not None: log_file = DeployLog.objects.get(serial_number=serial_number) log = log_file.result data_log = { "id": "{}".format(log_file.id), "result": int(log), } logger.info(json.dumps(data_log)) try: req = requests.post(callback_url,json.dumps(data_log),timeout=120) if req.status_code != 200: for i in range(3): time.sleep(5) req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break elif req.status_code == 200: logger.info("callback log url seccess.") except Exception as e: logger.info(e.args[0]) for i in range(3): time.sleep(5) try: req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break except Exception as e: logger.info(e.args[0]) pass
except: logger.error(traceback.format_exc()) logger.info('run_ansible_playbook, result {}'.format(result))

6、执行流程:

前端页面事务提交--->后端view接收任务--->任务数据存入redis--->celery触发任务执行(任务来源redis数据库)--->结果反馈前端--->形成闭环

原文地址:https://www.cnblogs.com/zksfyz/p/9365308.html