从零开始搭建django前后端分离项目 系列三(实战之异步任务执行)

前面已经将项目环境搭建好了,下面进入实战环节。这里挑选项目中涉及到的几个重要的功能模块进行讲解。

celery执行异步任务和任务管理

Celery 是一个专注于实时处理和任务调度的分布式任务队列。由于本项目进行数据分析的耗时比较长,所以采用异步方式执行任务。本项目中Broker使用redis,Result Backend使用django的数据库,部分配置如下settings.py(具体配置见项目代码):

import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/5'
BROKER_POOL_LIMIT = 0
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务
CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend'
# CELERY_RESULT_BACKEND = 'redis://10.39.211.198:6379/6'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 10 #  每个worker最多执行10个任务就会被销毁,可防止内存泄露

项目中涉及到的celery任务执行成功、执行失败、执行完成、执行被终止、执行失败的事件和信号如下:

@task_prerun.connect
def pre_task_run(task_id, task, sender, *args, **kwargs):
    logger.info('task [{task_id}] 开始执行, taskname: {task.name}'.format(task_id=task_id, task=task))

@task_revoked.connect
def task_revoked(request,terminated,sender,expired,signal,signum):
    now=datetime.now()
    task_id=request.id
    logger.warn('task [{0}] 被停止。'.format(task_id))
    job = Job.objects.filter(task_id=task_id).first()
    if job:
        job.runtime = (now - job.create_date).seconds
        job.save()

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        job=Job.objects.filter(task_id=task_id).first()
        if job:
            channel = job.id
            print('channel:', channel)
            redis_helper = RedisHelper(channel)
            redis_helper.public('task [{0}] success。'.format(task_id))
        logger.info('task [{0}] 执行成功, success'.format(task_id))
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        job = Job.objects.filter(task_id=task_id).first()
        if job:
            channel = job.id
            print('channel:', channel)
            redis_helper = RedisHelper(channel)
            redis_helper.public('failed')
        logger.error('task [{0}] 执行失败, reason: {1} ,einfo: {2}'.format(task_id,exc,einfo))
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        now = datetime.now()
        job = Job.objects.filter(task_id=task_id).first()
        if job:
            job.runtime = (now - job.create_date).seconds
            job.save()

获取任务执行结果:

from celery.result import AsyncResult
res=AsyncResult(taskid).get()

终止任务:

from celery.task.control import broadcast, revoke, rate_limit,inspect
revoke(task_id, terminate=True)

celery任务启动:

启用事件发送:
python manage.py celery -A myproject worker -l info -E --autoscale=6,3
启动快照相机:
python manage.py celerycam -F 10 -l info

在开发过程中发现,当异步任务中导入sklearn包时报错 

AttributeError: 'Worker' object has no attribute '_config'

所以在项目task.py中需要添加如下代码:

from celery.signals import worker_process_init
@worker_process_init.connect
def fix_multiprocessing(**_):
  from multiprocessing import current_process
  try:
    current_process()._config
  except AttributeError:
    current_process()._config = {'semprefix': '/mp'}

并且需要把sklearn相关包从文件开始导入移到函数内部导入,具体见项目代码。

效果图:

原文地址:https://www.cnblogs.com/dotafeiying/p/9668875.html