Celery使用

Celery实现了分布式任务队列的功能,提供异步执行,定时任务两个特性。应用首先将任务封装后发送到Broker,Celery启动多个Worker从Broker中获取任务并执行,通过Broker这一层实现异步特性;Celery提供Beat调度器进行定时任务的调度执行,从而实现定时任务功能。

基本架构图如下:

基本使用

  • 安装

    pipenv install celery
    
  • 单文件使用

    # vim tasks.py
    
    import time
    from celery import Celery
    
    broker = 'redis://192.168.2.128:6379/1'
    backend = 'redis://192.168.2.128:6379/2'
    app = Celery('my_task', broker=broker, backend=backend)
    
    
    @app.task
    def add(x, y):
        print('enter call func ...')
        time.sleep(4)
        return x + y
    

    add任务在注册时的名字为tasks.add,调用时需要from tasks import add然后才能执行成功

    # vim app.py
    
    from tasks import add
    print('start task ...')
    result = add.delay(2, 8)
    print('end task ...')
    

    如果在tasks.py下执行print(add)得到结果<@task: my_task.add of my_task at 0x101fe61d0>

    如果在app.py下导入add后执行print(add)得到结果<@task: tasks.add of my_task at 0x103135c18>

    所以如果直接在tasks.py下执行add.delay(2, 8),虽然任务发送成功,但是

    worker会报错 Received unregistered task of type 'my_task.add'

    这里留一个疑问为什么两个任务名字会有不同?

  • 模块化使用

    # ls celery_app
    
    __init__.py  			# 创建app对象并加载配置文件
    celeryconfig.py 		# 定义app配置并进行任务注册
    task1.py  task2.py		# 任务模块
    
    # vim __init__.py
    
    from celery import Celery
    
    app = Celery('demo')
    # 通过celery 实例加载配置
    app.config_from_object('celery_app.celeryconfig')
    
    # vim celeryconfig.py
    
    from datetime import timedelta
    from celery.schedules import crontab
    
    # APP配置
    BROKER_URL = 'redis://192.168.2.128:6379/1'
    CELERY_RESULT_BACKEND = 'redis://192.168.2.128:6379/2'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    # 任务注册
    CELERY_IMPORTS = (
        'celery_app.task1',
        'celery_app.task2',
    )
    
    # 定时任务
    CELERYBEAT_SCHEDULE = {
        'task1': {
            'task': 'celery_app.task1.add',
            'schedule': timedelta(seconds=10),
            'args': (2, 8)
        },
        'task2': {
            'task': 'celery_app.task2.multiply',
            'schedule': crontab(hour=20, minute=46),
            'args': (4, 5)
        }
    }
    
    # vim task1.py
    
    import time
    from celery_app import app
    
    @app.task
    def add(x, y):
        time.sleep(3)
        return x + y
    
    # vim task2.py
    
    import time
    from celery_app import app
    
    @app.task
    def multiply(x, y):
        time.sleep(4)
        return x * y
    
  • 启动命令

    celery worker -A celery_app.tasks -l INFO		# -A 指定app所在模块 -l 指定日志级别
    celery beat -A celery_app.tasks -l INFO
    
原文地址:https://www.cnblogs.com/Peter2014/p/11628588.html