Celery之操作进阶+定时任务+超时任务

1 Celery异步发送邮件(内部运行main.py)

  • 可用于测试

1.1 整体目录结构

1.2 main.py

  • 一定要有__init__.py文件,不然会报错
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os, sys
from celery import Celery
# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
import django
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")
# 针对于celery中使用了django代码的问题,要对django环境进行配置

# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery('celery',
             broker='redis://127.0.0.1:6379/2',   # 任务存放的地方
             backend='redis://127.0.0.1:6379/3',   # 结果存放的地方
             include=['tasks', 'tasks_beat'])
            # 由于上面配置了路径,所以导入时需要注意

1.3 tasks.py

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行

# 相当于装饰器,将下面函数打包给app
from main import app

@app.task(bind=True)
def send_email(self, user_list):
    # 在方法中导包
    from utils.MySendEmail import EmailInform
    # time.sleep(5)
    try:
        # 用 res 接收发送结果, 成功是:0, 失败是:-1
        res = EmailInform(user_list)
    except Exception as e:
        res = '-1'
    if res == '-1':
        # 如果发送结果是 -1 就重试.
        self.retry(countdown=5, max_retries=3, exc=Exception('邮箱发送失败'))

1.4 MySendEmail.py

from django.conf import settings
from django.core.mail import send_mail
from user.models import User


def EmailInform(user_list):
    for user_id in user_list:
        email = User.objects.filter(pk=user_id).first().email
        username = User.objects.filter(pk=user_id).first().username
        subject = '工单系统'
        message = ''
        from_email = settings.EMAIL_FROM
        recipient_list = [email]
        html_message = 'dear{},您有新的工单审批信息了!可以点击以下链接进行工单审批:<a href="http://127.0.0.1:8080/login/">审批工单</a>'.format(username)
        send_mail(subject=subject,
                  message=message,
                  from_email=from_email,
                  recipient_list=recipient_list,
                  html_message=html_message)

1.5 utils.py

  • 一定要在函数内部导包,不然会出现报错!
def addWorkerOrder(request):
    user_info = decodeToken(request)
    user_id = user_info.get('user_id')
    name = request.data.get('name')
    form = request.data.get('form')
    flowconf = FlowConf.objects.filter(name=name).first().pk
    dic = {
        'flowconf_id': flowconf,
        'create_user_id': user_id,
        'order_status': '1',
        'parameter': form
    }
    work_obj = WorkOrderModel.objects.create(**dic)
    approveconf_obj = list(NewFlowUserRoleActionConf.objects.filter(flowconf_id=work_obj.flowconf_id).values())
    time = 0
    for item in approveconf_obj:
        if item['approvetype'] == '1':
            if time == 0:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': '',
                    'approbe_user_role': Role.objects.filter(pk=item['approve_type_id']).first().zh_name,
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '1'
                }
                SubOrderModel.objects.create(**child_data)
                time += 1
            else:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': '',
                    'approbe_user_role': Role.objects.filter(pk=item['approve_type_id']).first().zh_name,
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '1',
                    'suborder_status': '3'
                }
                SubOrderModel.objects.create(**child_data)
                time += 1
        if item['approvetype'] == '2':
            if time == 0:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': User.objects.filter(pk=item['approve_type_id']).first().id,
                    'approbe_user_role': '',
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '2'
                }
                print('obj', child_data)
                SubOrderModel.objects.create(**child_data)
                time += 1
            else:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': User.objects.filter(pk=item['approve_type_id']).first().id,
                    'approbe_user_role': '',
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '2',
                    'suborder_status': '3'
                }
                print('obj', child_data)
                SubOrderModel.objects.create(**child_data)
    user_list = []
    user_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_user_id
    if user_id:
        user_list.append(user_id)
    else:
        role_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_userrole_id
        user_get = UserRole.objects.filter(role_id=role_id).values('user_id')
        for i in user_get:
            user_list.append(i.get('user_id'))
    sys.path.insert(0, os.path.join('/home/worker/opwf_project', 'celery_task'))
    from celery_task.tasks import send_email
    send_email.delay(user_list)
    res = {'msg': '添加成功', 'code': 200, 'id': work_obj.pk, 'flowconf': work_obj.flowconf_id}
    return res

def chooseSuborder(request):
    # 子工单id
    suborder_id = request.data.get('suborder_id')
    # 子工单审批状态
    action_status = request.data.get('action_status')
    # 意见
    decision = request.data.get('decision')
    # 所属实例工单id
    mainorder = SubOrderModel.objects.filter(pk=suborder_id).first().mainorder_id
    # 子工单序号
    sequence_number = SubOrderModel.objects.filter(pk=suborder_id).first().sequence_number
    SubOrderModel.objects.filter(pk=suborder_id).update(action_status=action_status)
    SubOrderModel.objects.filter(pk=suborder_id).update(suborder_status=2)
    SubOrderModel.objects.filter(pk=suborder_id).update(approve_text=decision)
    squenue_list = []
    squenue_obj = SubOrderModel.objects.filter(mainorder_id=mainorder).values('sequence_number')
    for i in squenue_obj:
        squenue_list.append(i.get('sequence_number'))
    print('list', squenue_list)
    index = squenue_list.index(sequence_number) + 1
    print('index', index)
    if action_status == '4':
        # 4 子工单退回--------主工单 驳回
        WorkOrderModel.objects.filter(pk=mainorder).update(order_status=2)  # 主工单一旦完成,子工单不变
        # if squenue_list[index-1] != squenue_list[-1]:
        #     for m in squenue_list[index:]:
        #         SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
        #         SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
    if action_status == '3':
        # 3 子工单拒绝-------主工单 完成
        WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3)  # 主工单一旦驳回,子工单不变
        # if squenue_list[index-1] != squenue_list[-1]:
        # for m in squenue_list[index:]:
        #     SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
        #     SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
    if action_status == '2':
        # suborder_status 1 待处理 2 已经处理 3 待上一节点处理
        if squenue_list[index - 1] == squenue_list[-1]:
            WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3)
        else:
            # suborder_status 1 待处理 2 已经处理 3 待上一节点处理
            SubOrderModel.objects.filter(mainorder_id=mainorder, sequence_number=index + 1).update(suborder_status=1)
            user_list = []
            user_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_user_id
            if user_id:
                user_list.append(user_id)
            else:
                role_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_userrole_id
                user_get = UserRole.objects.filter(role_id=role_id).values('user_id')
                for i in user_get:
                    user_list.append(i.get('user_id'))
            # EmailInform(user_list)
            sys.path.insert(0, os.path.join('/home/worker/opwf_project', 'celery_task'))
            from celery_task.tasks import send_email
            send_email.delay(user_list)
    res = {'msg': 'ok', 'code': 200}
    return res

1.6 views.py

class WorkOrderView(APIView):
    serializer_class = WorkOrderSerializer
    page_size = 4

    def get(self, request):
        id = request.query_params.get('id')
        if id is None:
            user_info = decodeToken(request)
            queryset = getWorkerorder(user_info)
            ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
        else:
            ser = WorkOrderModel.objects.filter(id=id)
            ret = WorkOrderSerializer(ser, many=True).data
        return Response(ret)

    def post(self, request):
        res = addWorkerOrder(request)
        return Response(res)


class SubOrderView(APIView):
    serializer_class = SubOrderSerializer
    page_size = 4

    def get(self, request):
        mainorder = request.query_params.get('mainorder')
        if mainorder is None:
            user_info = decodeToken(request)
            queryset = getSuborder(user_info)
            ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
        else:
            ser = SubOrderModel.objects.filter(mainorder_id=mainorder)
            ret = SubOrderSerializer(ser, many=True).data
        return Response(ret)

    def post(self, request):
        res = chooseSuborder(request)
        return Response(res)

1.7 启动任务

  • 文件夹内部启用
celery -A celery_task worker -l INFO

2 Celery异步发送邮件(真实开发)+定时任务+超时任务

  • 应用于工单系统,当创建工单时,异步发邮件的方式通知工单配置流第一节点的指定审批人或者指定角色。每周一8点30通知所有未审批工单的当前节点指定审批人来进行审批。当工单超时24小时,自动通知创建人工单已经超时。

2.1 整体目录结构

2.2 celery.py

# -*- coding: utf-8 -*-
# from __future__ import absolute_import, unicode_literals
#
import os, sys
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
print(sys.path)
from celery import Celery


# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
import django
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")

# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery('celery',
             broker='redis://127.0.0.1:6379/2',   # 任务存放的地方
             backend='redis://127.0.0.1:6379/3',   # 结果存放的地方
             include=['celery_task.tasks', 'celery_task.tasks_beat', 'celery_task.tasks_timeout'])
            # 由于上面配置了路径,所以导入时需要注意

app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

from celery.schedules import crontab
# 配置定时任务
app.conf.beat_schedule = {
    'add-every-monday-seconds': {
        'task': 'celery_task.tasks_beat.say_hello',        
        'schedule': crontab(hour=8, minute=30, day_of_week=2),
    },
    'check-every-24-hours': {
        'task': 'celery_task.tasks_timeout.timeout_workorder',
        'schedule': 180.0,
        # 每180秒发送一次任务(定时检测有没有超时工单,一旦超时就通知创建者)
    }
}


app.conf.timezone = 'Asia/Shanghai'

if __name__ == '__main__':
   app.start()

2.3 tasks.py

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行

# 相当于装饰器,将下面函数打包给app
from celery_task.celery import app

@app.task(bind=True)
def send_email(self, user_list):
    # 在方法中导包
    from utils.MySendEmail import EmailInform
    # time.sleep(5)
    try:
        # 用 res 接收发送结果, 成功是:0, 失败是:-1
        res = EmailInform(user_list)
    except Exception as e:
        res = '-1'
    if res == '-1':
        # 如果发送结果是 -1 就重试.
        self.retry(countdown=5, max_retries=3, exc=Exception('邮箱发送失败'))

2.4 tasks_beat.py

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行

# 相当于装饰器,将下面函数打包给app
from celery_task.celery import app

@app.task(bind=True)
def say_hello(self):
    # 在方法中导包
    from utils.MySendEmail import sayHello
    # time.sleep(5)
    try:
        # 用 res 接收发送结果, 成功是:0, 失败是:-1
        res = sayHello()
    except Exception as e:
        res = '-1'
    if res == '-1':
        # 如果发送结果是 -1 就重试.
        self.retry(countdown=5, max_retries=3, exc=Exception('邮箱发送失败'))

2.5 tasks_timeout.py

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行
from django.db.models import Q
from django.utils import timezone
# 相当于装饰器,将下面函数打包给app
from celery_task.celery import app
from user.models import User
from workerorder.models import WorkOrderModel

now = timezone.now()
start_time = now - timezone.timedelta(hours=24)
end_time = now


@app.task(bind=True)
def timeout_workorder(self):
    # 在方法中导包
    from utils.MySendEmail import timeOutEmail
    workorder_queryset = WorkOrderModel.objects.exclude(
        Q(create_time__range=(start_time, end_time))).filter(Q(order_status='1'))
    for query in workorder_queryset:
        username = User.objects.filter(pk=query.create_user_id).first().username
        email = User.objects.filter(pk=query.create_user_id).first().email
        res = timeOutEmail(username, email)
        WorkOrderModel.objects.filter(id=query.id).update(order_status=2)

2.6 MySendEmail.py

# -*- coding: utf-8 -*-
from django.conf import settings
from django.core.mail import send_mail
from user.models import User
from workerorder.models import SubOrderModel


def EmailInform(user_list):
    for user_id in user_list:
        email = User.objects.filter(pk=user_id).first().email
        username = User.objects.filter(pk=user_id).first().username
        subject = '工单系统'
        message = ''
        from_email = settings.EMAIL_FROM
        recipient_list = [email]
        html_message = 'dear{},您有新的工单审批信息了!可以点击以下链接进行工单审批:<a href="http://127.0.0.1:8080/login/">审批工单</a>'.format(username)
        send_mail(subject=subject,
                  message=message,
                  from_email=from_email,
                  recipient_list=recipient_list,
                  html_message=html_message)


def sayHello():
    obj = SubOrderModel.objects.filter(suborder_status='1', action_status='1')
    for i in obj:
        if i.approve_type_id == '2':
            username = User.objects.filter(pk=i.approve_user_id).first().username
            email = User.objects.filter(pk=i.approve_user_id).first().eamil
            subject = '工单系统'
            message = ''
            from_email = settings.EMAIL_FROM
            recipient_list = [email]
            html_message = 'Dear{}, 您有新的指定工单需要审批,十万火急!'.format(username)
            send_mail(subject=subject,
                      message=message,
                      from_email=from_email,
                      recipient_list=recipient_list,
                      html_message=html_message)

2.7 utils.py

  • 一定要在函数内部导包!不然会出现报错
def addWorkerOrder(request):
    user_info = decodeToken(request)
    user_id = user_info.get('user_id')
    name = request.data.get('name')
    form = request.data.get('form')
    flowconf = FlowConf.objects.filter(name=name).first().pk
    dic = {
        'flowconf_id': flowconf,
        'create_user_id': user_id,
        'order_status': '1',
        'parameter': form
    }
    work_obj = WorkOrderModel.objects.create(**dic)
    approveconf_obj = list(NewFlowUserRoleActionConf.objects.filter(flowconf_id=work_obj.flowconf_id).values())
    time = 0
    for item in approveconf_obj:
        if item['approvetype'] == '1':
            if time == 0:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': '',
                    'approbe_user_role': Role.objects.filter(pk=item['approve_type_id']).first().zh_name,
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '1'
                }
                SubOrderModel.objects.create(**child_data)
                time += 1
            else:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': '',
                    'approbe_user_role': Role.objects.filter(pk=item['approve_type_id']).first().zh_name,
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '1',
                    'suborder_status': '3'
                }
                SubOrderModel.objects.create(**child_data)
                time += 1
        if item['approvetype'] == '2':
            if time == 0:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': User.objects.filter(pk=item['approve_type_id']).first().id,
                    'approbe_user_role': '',
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '2'
                }
                print('obj', child_data)
                SubOrderModel.objects.create(**child_data)
                time += 1
            else:
                child_data = {
                    'mainorder_id': work_obj.id,
                    'approve_user_id': User.objects.filter(pk=item['approve_type_id']).first().id,
                    'approbe_user_role': '',
                    'approve_userrole_id': item['approve_type_id'],
                    'sequence_number': item['sequence'],
                    'approve_type_id': '2',
                    'suborder_status': '3'
                }
                print('obj', child_data)
                SubOrderModel.objects.create(**child_data)
    user_list = []
    user_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_user_id
    if user_id:
        user_list.append(user_id)
    else:
        role_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_userrole_id
        user_get = UserRole.objects.filter(role_id=role_id).values('user_id')
        for i in user_get:
            user_list.append(i.get('user_id'))
    from celery_task.tasks import send_email
    send_email.delay(user_list)
    res = {'msg': '添加成功', 'code': 200, 'id': work_obj.pk, 'flowconf': work_obj.flowconf_id}
    return res

def chooseSuborder(request):
    # 子工单id
    suborder_id = request.data.get('suborder_id')
    # 子工单审批状态
    action_status = request.data.get('action_status')
    # 意见
    decision = request.data.get('decision')
    # 所属实例工单id
    mainorder = SubOrderModel.objects.filter(pk=suborder_id).first().mainorder_id
    # 子工单序号
    sequence_number = SubOrderModel.objects.filter(pk=suborder_id).first().sequence_number
    SubOrderModel.objects.filter(pk=suborder_id).update(action_status=action_status)
    SubOrderModel.objects.filter(pk=suborder_id).update(suborder_status=2)
    SubOrderModel.objects.filter(pk=suborder_id).update(approve_text=decision)
    squenue_list = []
    squenue_obj = SubOrderModel.objects.filter(mainorder_id=mainorder).values('sequence_number')
    for i in squenue_obj:
        squenue_list.append(i.get('sequence_number'))
    print('list', squenue_list)
    index = squenue_list.index(sequence_number) + 1
    print('index', index)
    if action_status == '4':
        # 4 子工单退回--------主工单 驳回
        WorkOrderModel.objects.filter(pk=mainorder).update(order_status=2)  # 主工单一旦完成,子工单不变
        # if squenue_list[index-1] != squenue_list[-1]:
        #     for m in squenue_list[index:]:
        #         SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
        #         SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
    if action_status == '3':
        # 3 子工单拒绝-------主工单 完成
        WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3)  # 主工单一旦驳回,子工单不变
        # if squenue_list[index-1] != squenue_list[-1]:
        # for m in squenue_list[index:]:
        #     SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
        #     SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
    if action_status == '2':
        # suborder_status 1 待处理 2 已经处理 3 待上一节点处理
        if squenue_list[index - 1] == squenue_list[-1]:
            WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3)
        else:
            # suborder_status 1 待处理 2 已经处理 3 待上一节点处理
            SubOrderModel.objects.filter(mainorder_id=mainorder, sequence_number=index + 1).update(suborder_status=1)
            user_list = []
            user_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_user_id
            if user_id:
                user_list.append(user_id)
            else:
                role_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_userrole_id
                user_get = UserRole.objects.filter(role_id=role_id).values('user_id')
                for i in user_get:
                    user_list.append(i.get('user_id'))
            # EmailInform(user_list)
            from celery_task.tasks import send_email
            send_email.delay(user_list)
    res = {'msg': 'ok', 'code': 200}
    return res

2.8 views.py

class WorkOrderView(APIView):
    serializer_class = WorkOrderSerializer
    page_size = 4

    def get(self, request):
        id = request.query_params.get('id')
        if id is None:
            user_info = decodeToken(request)
            queryset = getWorkerorder(user_info)
            ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
        else:
            ser = WorkOrderModel.objects.filter(id=id)
            ret = WorkOrderSerializer(ser, many=True).data
        return Response(ret)

    def post(self, request):
        res = addWorkerOrder(request)
        return Response(res)


class SubOrderView(APIView):
    serializer_class = SubOrderSerializer
    page_size = 4

    def get(self, request):
        mainorder = request.query_params.get('mainorder')
        if mainorder is None:
            user_info = decodeToken(request)
            queryset = getSuborder(user_info)
            ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
        else:
            ser = SubOrderModel.objects.filter(mainorder_id=mainorder)
            ret = SubOrderSerializer(ser, many=True).data
        return Response(ret)

    def post(self, request):
        res = chooseSuborder(request)
        return Response(res)

2.9 启动任务

# 启动worker
celery -A celery_task worker -l INFO
# 启动beat
celery -A celery_task beat -l INFO

3 报错

  • 文件夹外部启用
  • 一定要写绝对路径!不然找不到celery_task

3.1 报错

3.2 原因

# -*- coding: utf-8 -*-
# from __future__ import absolute_import, unicode_literals
#
import os, sys
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
print(sys.path)
from celery import Celery


# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
import django
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")

# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery('celery',
             broker='redis://127.0.0.1:6379/2',   # 任务存放的地方
             backend='redis://127.0.0.1:6379/3',   # 结果存放的地方
             include=['celery_task.tasks', 'celery_task.tasks_beat'])
            # 由于上面配置了路径,所以导入时需要注意

app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

from celery.schedules import crontab
# 配置定时任务
app.conf.beat_schedule = {
    'add-every-180-seconds': {
        'task': 'celery_task.tasks_beat.say_hello',
        # 'schedule': 180.0,
        # 每180秒发送一次任务
        'schedule': crontab(hour=00, minute=22, day_of_week=1),
    },
}

app.conf.timezone = 'Asia/Shanghai'

if __name__ == '__main__':
   app.start()

4 对于celery的理解

  • 本次使用的是不同于Django-celery的独立celery,应用范围更广。
  • 由于celery对不同的环境,版本要求特别多,所以不建议使用django-celery来进行开发(耦合性太高),使用纯celery实现异步发邮件,但是对于环境同样是有要求的(celery环境下是不能实现django代码的,必须导入django环境)
  • 大多数开发使用的都是django-celery,但是要更深层度地理解celery,可以使用本章内容。

5 By Teacher

  • 相关包
pip install Django==2.2
pip install celery==4.4.7
pip install redis==3.5.3

5.1 目录结构

5.2 opwf_project/celery_task文件夹

5.2.1 celery.py
# celery.py
# -*- coding: utf-8 -*-
from celery import Celery
import os,sys
import django

# 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))

# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置


# 3.celery基本配置
app = Celery('proj',
             broker='redis://localhost:6379/14',
             backend='redis://localhost:6379/15',
             include=['celery_task.tasks',
                      'celery_task.tasks2',
                      ])

# 4.实例化时可以添加下面这个属性
app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
    'add-every-5-seconds': {
        'task': 'celery_task.tasks.test_task_crontab',
        'schedule': 5.0,
        'args': (16, 16)
    },
}

# 6.添加时区配置
app.conf.timezone = 'UTC'

if __name__ == '__main__':
   app.start()

celery.py
5.2.2 tasks.py
# -*- coding: utf-8 -*-
from celery import Celery
import os,sys
import django

# 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))

# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置


# 3.celery基本配置
app = Celery('proj',
             broker='redis://localhost:6379/14',
             backend='redis://localhost:6379/15',
             include=['celery_task.tasks',
                      'celery_task.tasks2',
                      ])

# 4.实例化时可以添加下面这个属性
app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
    'add-every-5-seconds': {
        'task': 'celery_task.tasks.test_task_crontab',
        'schedule': 5.0,
        'args': (16, 16)
    },
}

# 6.添加时区配置
app.conf.timezone = 'UTC'

if __name__ == '__main__':
   app.start()
tasks.py
5.2.3 tasks2.py
# -*- coding:utf8 -*-
from .celery import app
import time,random

@app.task
def randnum(start,end):
    time.sleep(3)
    return random.randint(start,end)
tasks2.py

5.3 opwf_project/opwf/utils

# -*- coding: utf-8 -*-
# utils/rl_sms.py
from ronglian_sms_sdk import SmsSDK
from user.models import User


accId = '8a216da8747ac98201749c0de38723b7'
accToken = '86072b540b4648229b27400414150ef2'
appId = '8a216da8747ac98201749c0de45123be'


def send_message(phone, datas):
    user = User.objects.all()[0]
    print(user.username, '%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
    sdk = SmsSDK(accId, accToken, appId)
    tid = '1'  # 测试模板id为: 1. 内容为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输入。
    # mobile = '13303479527'
    # datas = ('666777', '5')  # 模板中的参数按照位置传递
    # resp = sdk.sendMessage(tid, phone, datas)
    print("##########################################")
    print('执行了这个方法 send_message')
    return ''


def test_crontab(x,y):
    print('############### 执行test_crontab测试任务 #############')
    print('############### 邮件审批超时提醒 #############')

rl_sms.py

5.4 在django中调用

# 1.导入任务
from celery_task import tasks
# 2.执行异步任务
tasks.send_sms_code.delay(18538752511,())

6 Celery 的管理

6.1 celery管理

# 单线程
celery -A celery_tasj worker -l INFO
# 一次性启动w1,w2两个worker
celery multi start w1 w2 -A celery_task -l INFO
# 查看当前有哪些worker在运行
celery -A celery_pro status
# 停止w1,w2两个worker
celery multi stop w1 w2 -A celery_task

# 项目中启动 celery worker
celery multi start celery_task -A celery_task -l debug --autoscale=50,10		# celery并发数:最多50,最少5
# 在项目中关闭 celery worker
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9

6.2 celery beat 管理

# 1.普通测试情动celery beat
celery -A celery_task beat -l INFO
# 2.在项目中后台启动celery beat(将日志放入指定位置)
celery -A celery_task beat -l debug >> /aaa/Scheduler.log 2>&1 &
# 3.停止celery beat
ps -ef | grep -E "celery -A celery_task beat" | grep -v grep|awk '{print $2}' | xargs kill -TERM &>

6.3 脚本启动(高级)

6.3.1 启动django程序
  • services.sh
#!/usr/bin/env bash

source ../env/bin/activate

export DJANGO_SETTINGS_MODULE=celery_test.settings

base_dir=`pwd`
mup_pid() {
echo `ps -ef | grep -E "(manage.py)(.*):8000" | grep -v grep| awk '{print $2}'`
}
start() {
 python $base_dir/manage.py runserver 0.0.0.0:8000 &>> $base_dir/django.log 2>&1 &
 pid=$(mup_pid)
 echo -e "e[00;31mmup is running (pid: $pid)e[00m"
}

stop() {
 pid=$(mup_pid)
 echo -e "e[00;31mmup is stop (pid: $pid)e[00m"
 ps -ef | grep -E "(manage.py)(.*):8000" | grep -v grep| awk '{print $2}' | xargs kill -9 &> /dev/null

}

restart(){
    stop
    start
}

# See how we were called.
case "$1" in
  start)
        start
        ;;
  stop)
        stop
        ;;

  restart)
        restart
        ;;

  *)
        echo $"Usage: $0 {start|stop|restart}"
        exit 2
esac

service.sh
6.3.2 启动celery的worker
  • 每台机器可以启动8个worker
  • start-celery.sh
#!/bin/bash
source ../env/bin/activate
export C_FORCE_ROOT="true"
base_dir=`pwd`


celery_pid() {
    echo `ps -ef | grep -E "celery -A celery_test worker" | grep -v grep| awk '{print $2}'`
}
start() {
    celery  multi start celery_test -A celery_test -l debug --autoscale=50,5 --logfile=$base_dir/var/celery-%I.log --pidfile=celery_test.pid
}
restart() {
    celery  multi restart celery_test -A celery_test -l debug
}
stop() {
    celery  multi stop celery_test -A celery_test -l debug
}
#restart(){
#    stop
#    start
#}


# See how we were called.
case "$1" in
  start)
        start
        ;;
  restart)
        restart
        ;;
  stop)
        stop
        ;;
  *)
        echo $"Usage: $0 {start|stop|restart}"
        exit 2
esac

#nohup celery -A celery_test worker -l debug --concurrency=10 --autoreload  & >>celery.log

start-celery.sh
6.3.3 启动celery定时运行
  • celery-croud.sh
#!/bin/bash
#celery 定时任务运行
source ../env/bin/activate
export C_FORCE_ROOT="true"
base_dir=`pwd`


celery_pid() {
    echo `ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}'`
}
start() {
    #django 调度定时任务
    #celery -A celery_test beat -l info -S django >> $base_dir/var/celery-cron.log 2>&1 &
    celery -A celery_test beat -l debug >> $base_dir/var/Scheduler.log 2>&1 &
    sleep 3
    pid=$(celery_pid)
    echo -e "e[00;31mcelery is start (pid: $pid)e[00m"
}
restart() {
    pid=$(celery_pid)
    echo -e "e[00;31mcelery is restart (pid: $pid)e[00m"
    ps auxf | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -HUP &> /dev/null
}
stop() {
    pid=$(celery_pid)
    echo -e "e[00;31mcelery is stop (pid: $pid)e[00m"
    ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -TERM &> /dev/null
}


case "$1" in
  start)
        start
        ;;
  restart)
        restart
        ;;
  stop)
        stop
        ;;
  *)
        echo $"Usage: $0 {start|stop|restart}"
        exit 2
esac

celery-crond.sh
原文地址:https://www.cnblogs.com/mapel1594184/p/14106696.html