celery配置

1.1 安装celery

pip install Django==2.2
pip install celery==4.4.7
pip install redis==3.5.3

2 新建 配置celery

3 配置celery六步 (重点)

# -*- coding: utf-8 -*-
# celery.py
# -*- coding: utf-8 -*-
from datetime import timedelta

from celery import Celery
import os,sys
import django

# 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))

# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置


# 3.celery基本配置
app = Celery('proj',
             broker='redis://localhost:6379/14',
             backend='redis://localhost:6379/15',
             include=['celery_task.tasks',
                      'celery_task.tasks2',
                      ])

# 4.实例化时可以添加下面这个属性
app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {

    'add-every-5-seconds': {
        'task': 'celery_task.tasks.send_overtime',
        'schedule': 10000.0,
        'args': (16, 16)
    },
}
# 6.添加时区配置
app.conf.timezone = 'UTC'

if __name__ == '__main__':
   app.start()

4.task基础配置

from .celery import app  #从当前目录导入app
import os,sys
from .celery import CELERY_BASE_DIR
from django.core.mail import send_mail
from django.conf import settings
#1.test_task_crontab 测试定时任务
from celery_task.celery import app
import time, random

from workflow.models import *
from django.db.models import Q
@app.task
def test_task_crontab(start,end):
    time.sleep(5)
    # Create your tests here.
    print("定时任务即将耗时--------------------111111111111111111111")
    return random.randint(start,end)


@app.task(bind=True)
def send_sms_code(self, username,email,text):
    # time.sleep(60)
    sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
    # 在方法中导包
    from utils.task import send_mail_task
    time.sleep(5)
    try:
        # 用 res 接收发送结果, 成功是:0, 失败是:-1
        res = send_mail_task(username, email,text)
    except Exception as e:
        res = '-1'
    if res == '-1':
        # 如果发送结果是 -1 就重试.
        self.retry(countdown=5, max_retries=3, exc=Exception('短信发送失败'))

@app.task
def send_overtime(start,end):
    from django.utils import timezone

    now = timezone.now()
    # start_time = now - timezone.timedelta(days=7)
    start_time = now - timezone.timedelta(hours=12)  # 查询10天前的数据
    end_time = now
    qs = SubOrder.objects.filter(Q(approve_ts__range=(start_time, end_time)) & Q(suborder_status='1'))
    send_mail(subject='工单超时',
                message='jinwu提交的工单即将超时,请尽快审批',
                from_email=settings.EMAIL_FROM,  # 发送者邮箱
                recipient_list=['wangsai_python@163.com',],  # 接收者邮箱可以写多个
                fail_silently=False)
    return random.randint(start,end)

5.测试celery

5.1 启动celery

'''1.启动celery'''
#1.1 单进程启动
celery celery -A main worker -l INFO
#1.2 celery管理 
celery multi start celery_test -A celery_test -l debug --autoscale=50,5
#celery并发数:最多50个,最少5个 
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9 # 关闭所有celery进程

原文地址:https://www.cnblogs.com/wangxiaosai/p/13928956.html