四、Celery 路由学习篇

0、Celery路由方式

1、自动路由:定义好规则,调用任务的时候,无需指定路由,调用任务的函数delay【celery启动的时候,需要指定队列名字,才可以进行正常的运行】
2、手动路由:调用方法的时候,指定路由和队列或路由的key,调用任务函数apply_async 【celery启动的时候,无需指定队列名字,随worker一起启动】

演示代码下载地址

https://github.com/ygbh/celery_django_project

1、自动路由

1.1、自动路由的逻辑概念

自动路由,即需要提前定义好路由规则,直接使用delay或apply_async方法来调用任务

1.2、项目演示代码准备

创建2个app,不同app路由到不同队列和交换接口进行任务的处理
这里演示的app为:
app01:主要做运算处理,例如求和
app_log:主要做log的处理

1.3、app01的应用views.py 代码:

def async_route_add_task(request):
    """
      使用调用apply_async,相乘的任务,主要演示路由分发
    :param request:
    :return:
    """
    arg1 = 1
    arg2 = 2
    result = add.apply_async(args=(arg1, arg2,),
                             queue='app01',
                             priority=0,
                             exchange='app01')
    task_status = AsyncResult(result.task_id, app=result.app)
    return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})


def sync_route_add_task(request):
    """
        使用delay,调用相乘的任务,主要演示路由分发
    :param request:
    :return:
    """
    arg1 = 1
    arg2 = 2
    result = add.delay(arg1, arg2)
    task_status = AsyncResult(result.task_id, app=result.app)
    return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})

1.4、app_log应用 views.py 代码 

def async_route_handler_log(request):
    log_content = 'async_route_handler_log'
    result = handler_log.apply_async(args=(log_content,),
                                     queue='app_log',
                                     priority=0,
                                     exchange='app_log')
    task_status = AsyncResult(result.task_id, app=result.app)
    return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})


def sync_route_handler_log(request):
    log_content = 'async_route_handler_log'
    result = handler_log.delay(log_content)
    task_status = AsyncResult(result.task_id, app=result.app)
    return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})

1.5、配置Django url路由

urlpatterns = [
path('async_add/', views.async_add_task), # 主演示指定exchange+queue+routing key,来任务调用,函数传普通参数
path('sync_add/', views.sync_add_task),# 主演示,直接任务调用,函数传普通参数
path('async_xsum/', views.async_xsum_task),
path('sync_xsum/', views.sync_xsum_task),
path('app01/async_add/', views.async_route_add_task), # app01,主要演示手动路由
path('app01/sync_add/', views.sync_route_add_task), # app01,主要演示自动路由
path('app_log/async_log/', log_views.async_route_handler_log), # app_log,主要演示手动路由
path('app_log/sync_log/', log_views.sync_route_handler_log), # app_log,主要演示自动路由
]

1.6、在celery_settings.py 配置路由规则

# 定义自动路由规则,主要是给delay或apply_async函数,调用任务的时候使用,【元组方式定义】,顺序匹配执行
# 参考官方文档:https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types
# CELERY_TASK_ROUTES = ([
#                           ('app01.tasks.*', {"queue": "app01"}),
#                           ('app_log.tasks.*', {"queue": "app_log"}),
#                       ],)

# 定义自动路由规则,主要是给delay或apply_async函数,调用任务的时候使用,【字典方式定义】 
CELERY_TASK_ROUTES = { 
'app01.tasks.*': {"queue": "app01"},
'app_log.tasks.*': {"queue": "app_log"}
}

1.7、指定队列启动celery worker服务

# 启动只接收队列app01的数据处理任务服务
celery -A django_celery_project worker -Q app01 -p eventlet -n app01 -l info

# 启动只接收队列app_log的数据处理任务服务
celery -A django_celery_project worker -P eventlet -l info -n app_log -Q app_log

# 启动只接收队列celery【即是默认队列】的数据处理任务服务
celery -A django_celery_project worker -P eventlet -l info -n celery

1.8、测试效果

1.8.1、app01,接收任务和处理任务正常

 

1.8.2、app_log,接收任务和处理任务正常

 

1.8.3、默认的celery,接收任务和处理任务正常

 

2、手动路由

2.1、自动路由的逻辑概念

主要是调度路由的时候,要指定队列,交换接口,路由的键[key],由rabbitMQ自动路由到指定的队列,
该监听函数的队列,监听到有数据的时候,即接收任务处理。

2.2、项目演示代码准备

创建2个应用,不同应用根据指定队列,交换接口,路由的键[key],选择路由任务的调度处理

这里演示的app为:
app01:主要做运算处理,例如:求和 
app_log:主要做log的处理

2.3、app01的应用views.py 代码:

def async_route_add_task(request):
    """
      使用调用apply_async,相乘的任务,主要演示路由分发
    :param request:
    :return:
    """
    arg1 = 1
    arg2 = 2
    result = add.apply_async(args=(arg1, arg2,),
                             queue='feed_tasks',
                             routing_key='task.add',
                             priority=0,
                             exchange='default', )
    task_status = AsyncResult(result.task_id, app=result.app)
    return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})

2.4、app_log应用 views.py 代码 

def async_route_handler_log(request):
    log_content = 'async_route_handler_log'
    result = handler_log.apply_async(args=(log_content,))
    task_status = AsyncResult(result.task_id, app=result.app)
    return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})

2.5、配置Django url路由

urlpatterns = [
    path('async_add/', views.async_add_task),  # 主演示指定exchange+queue+routing key,来任务调用,函数传普通参数
    path('sync_add/', views.sync_add_task),  # 主演示,直接任务调用,函数传普通参数
    path('sync_add/', views.sync_add_task),  # 主演示,直接任务调用,函数传普通参数
    path('async_xsum/', views.async_xsum_task),  # 主演示,直接任务调用,函数传列表参数
    path('sync_xsum/', views.sync_xsum_task),  # 主演示,直接任务调用,函数传列表参数
    path('app01/async_add/', views.async_route_add_task),  # app01,主要演示手动路由
    path('app01/sync_add/', views.sync_route_add_task),  # app01,主要演示自动路由
    path('app_log/async_log/', log_views.async_route_handler_log),  # app_log,主要演示手动路由
    path('app_log/sync_log/', log_views.sync_route_handler_log),  # app_log,主要演示自动路由
]

 2.6、在celery_settings.py 配置路由规则[注意:这里需要把自动路由的信息注释或删除]

# 定义队列规则,主要是给apply_async函数,调用任务的时候使用
CELERY_TASK_QUEUES = {
    Queue("celery", Exchange("celery"), routing_key="celery.default"),  # 默认队列
    Queue("feed_tasks", Exchange(name="default", type='topic'), routing_key="task.#"),
    # 定义队列feed_tasks,从交换接口:default接收,并且过滤路由的key,主要演示手动路由的机制
    Queue("add_queue", Exchange("compute_node"), routing_key="add_task"),  # 定义队列:add_queue,绑定交换机:compute_node
    Queue("mul_queue", Exchange("compute_node"), routing_key="mul_task"),  # 定义队列:mul_queue,绑定交换机:compute_node
    Queue("xsum_queue", Exchange("compute_node"), routing_key="xsum_task")  # 定义队列:xsum_queue,绑定交换机:compute_node
}

 2.7、指定队列启动celery worker服务

# 启动worker 服务
celery -A django_celery_project worker -P eventlet -l info -n celery

 2.8、测试效果

 2.9、手动路由配置完成

3、路由的优先级选项【这里不在演示介绍,只是设置一下参数而且】

3.1、规则

优先级限制:0-9
数字越来,越优先消费
默认值是:0

3.2、配置的方法

3.2.1、在celery配置文件里面设置

可以通过设置x-max-priority参数将队列配置为支持优先级 

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10}),
]

3.2.2、在调用任务函数的时候,进行设置优先级

task.apply_async(priority=0)

注意:
    调用时设置,会覆盖默认的配置

4、Celery广播路由【因为比较少用,这里直接复制官方的设置方法,不做演示】

4.1、定义广播路由的配置

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),) # 定义队列的名字
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks', # 路由的队列名字
        'exchange': 'broadcast_tasks' #交换的接口
    }
}

4.2、设置一个周期运行任务【周期任务一般在关闭运行结果,因为输出结果对业务没有什么作用:@shared_task(ignore_result=False)】

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

5、Celery的优化

默认:Celery对数据都是进行持久化【即:delivery_mode=0】,就算重启MQ,数据也不会丢失。
如果数据允许丢失的话,可以关闭持久化,从而达到提升性能【即:delivery_mode=1】

5.1、优化的方法

设置方法1、
from
kombu import Exchange, Queue task_queues = ( Queue('celery', routing_key='celery'), Queue('transient', Exchange('transient', delivery_mode=1), routing_key='transient', durable=False), ) 设置方法2: task_routes = { 'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'} } 设置方法3: ask.apply_async(args, queue='transient')
原文地址:https://www.cnblogs.com/ygbh/p/13651887.html