celery消费任务和任务定时的操作

  • 开发环境

    ubuntu 18.04 + python3.6.9 / windows10 + python 3.6.9
    
  • 安装依赖

    # 安装django==2.2.17
    pip install django==2.2.17
    # 安装celery == 4.4.0
    pip install celery==4.4.0
    # 安装 redis == 3.5.3
    pip install redis==3.5.3
    # 安装 flower == 0.9.7
    pip install flower==0.9.7
    
  • 创建 django 项目和应用

    #  创建项目(mysite)
    django-admin startproject mysite
    # 切换到mysite中
    cd mysite
    # 创建应用 (user)
    python manage.py startapp user
    
  • mysite 目录下 新建 celery_task

  • celery_task 中新建 celery.pyconfig.py

    # config.py
    from __future__ import absolute_import  # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
    from celery.schedules import crontab
    from nest_e100.settings import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD
    
    broker_url = "redis://:{}@{}:{}/98".format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)  # 使用redis存储任务队列
    result_backend = "redis://:{}@{}:{}/99".format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)  # 使用redis存储结果
    
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    timezone = "Asia/Shanghai"  # 时区设置
    worker_hijack_root_logger = False  # celery默认开启自己的日志,可关闭自定义日志,不关闭自定义日志输出为空
    result_expires = 60 * 60 * 24  # 存储结果过期时间(默认1天)
    
    # 导入任务所在文件
    imports = [
        "celery_task.crontab.company_task",  # 导入py文件
    ]
    
    # 需要执行任务的配置
    beat_schedule = {
        "crontab_company_task": {
            "task": "celery_task.crontab.company_task.check_company_task",  # 执行的函数
            "schedule": crontab(minute="*/1"), # 每分钟执行一次
            "args": ()  # # 任务函数参数
        },
    }
    
    # celery.py
    from celery import Celery
    import django, os, sys
    # 获取根目录
    base_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "nest_e100")
    sys.path.append(base_dir)
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nest_e100.settings')
    # 实例化django项目中的每一个app实例
    django.setup()
    # 创建celery应用对象
    app = Celery("celery_demo")
    
    # 导入celery的配置信息
    app.config_from_object("celery_task.config")
    
  • 新建定时任务和任务消费

    # 在 celery_task/crontab/company_task.py 做公司定时任务
    from celery_task.celery import app
    from .data_access.cache_manager import RedisUtil
    from celery_task.tasks.company_task import e100_company_task
    from celery_task.common.mytime import cal_count_down_time
    from .utils.logger import logger
    
    # 实例化redis
    redis_util_cache = RedisUtil("openapi")
    
    # 检查商户详情信息
    def check_company_info():
        data = redis_util_cache.get_key_values("company*")
        if not data:
            logger.warning(">>>>>>>>>>>>>>>>>> redis中没有公司信息,请检查...")
            return False
        for i in data:
            key = i.get("key", "")
            if key:
                # 计算该key 剩余的时间的秒数
                time_left_timestamp = redis_util_cache.ttl(key)
                # 计算时间差是否在规定范围内,如果在,返回True,则添加队列中,否则返回False,不做任何处理
                flag = cal_count_down_time(time_left_timestamp)
                # 添加队列中
                if flag: e100_company_task.delay({"com_id": key[8:]})
        print(">>>>>>>> crontab check company detail info is end ..")
    
    
    @app.task
    def check_company_task():
        check_company_info()
    
    
    if __name__ == '__main__':
        check_company_task()
    
    
    # 在 celery_task/task/company_task.py 做公司任务消费
    from celery_task.celery import app
    from .data_access.cache_manager import RedisUtil
    from .utils.account_manager import request_to_openplatform_manage
    from .utils.external_url_request import external_system_url
    from django.conf import settings
    import json
    # 实例化redis
    openapi_redis_cache = RedisUtil("openapi")
    
    # 更新商户详情信息
    @app.task
    def e100_company_task(data):
        # 请求开发平台获取数据
        response = request_to_openplatform_manage(data, external_system_url.get_openapi_company_details_info)
        # 获取的数据
        ret = response.data.get("data", "")
        # 将数据放到cache中, key:  company_公司Id
        openapi_redis_cache.set("company_%s" % (data.get("com_id")), json.dumps(ret),
                                expire_time=settings.TOKEN_CACHE_TIME)
    
    
  • 启动任务

    # 启动django项目
    python manage.py runserver
    # 启动celery 任务, 如果是windows启动,需要在后面添加 --pool=solo
    celery -A celery_task.celery worker -l info 
    # 启动celery定时
    celery -A celery_task.celery beat
    # 启动flower
    celery -A celery_task flower
    
  • 访问 http://127.0.0.1:5555

  • 最终的项目结构为:

原文地址:https://www.cnblogs.com/wuxiaoshi/p/14913612.html