Celery--配置(1)

基本设置

accept_content
设置确保传给broker的数据类型
accept_content = ['json']
accept_content = ['application/json']
 
result_accept_content
设置确保传给存储结果的数据类型
result_accept_content = ['json']
result_accept_content = ['application/json']
 
timezone
设置时区
timezone='Asia/Shanghai' # 指定时区,默认是 UTC
 
 
Task设置
task_annotations
可用于重写配置中的任何任务属性
task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
task_annotations = {'*': {'rate_limit': '10/s'}}
 
def my_on_failure(self, exc, task_id, args, kwargs, einfo):
print('Oh no! Task failed: {0!r}'.format(exc))
task_annotations = {'*': {'on_failure': my_on_failure}}
 
task_compression
压缩任务消息
task_compression=gzip
 
task_serializer
任务序列化
task_serializer=json
task_serializer=msgpack
 
task_publish_retry
任务发布重试
task_publish_retr=Enabled
 
task_remote_tracebacks
远程任务错误时的堆栈信息,开启需要安装$ pip install celery[tblib]
task_remote_tracebacks=Disabled
 
task_ignore_result
是否禁用存储任务返回值
task_ignore_result=Disabled
 
task_store_errors_even_if_ignored
即使禁用存储错误, 仍然存储任务返回值
task_store_errors_even_if_ignored=Disabled
 
task_track_started
是否报告任务已启动
task_track_started=Disabled
 
task_time_limit 秒
超过此值时当前worke被终止,替换为新的工作进程。
task_time_limit=30
 
task_soft_time_limit
任务软时间限制
 
task_acks_on_failure_or_timeout
在任务执行后是否确认
task_acks_on_failure_or_timeou=Enabled
 
task_reject_on_worker_lost
设置为true将允许消息重新排队,以便任务将由同一个工作线程或另一个工作进程再次执行。
task_reject_on_worker_lost=Disabled
 
task_default_rate_limit
任务的全局默认速率限制。
 
 
Task result backend设置
result_backend
设置结果存储
result_backend='redis://10.9.16.178:6379/4'
 
result_backend_always_retry
如果启用,后端将尝试在发生可恢复异常时重试
result_backend_always_retry=False
 
result_serializer
result_serializer=json
 
result_compression
result_compression=gzip
 
result_extended
如果为True,将拓展(name, args, kwargs, worker, retries, queue, delivery_info)
 
result_expires 秒
结果过期时间 ,默认1天
result_expires= 3600
 
database_engine_options
app.conf.database_engine_options = {'echo': True}
 
database_table_schemas
当SQLAlchemy配置为结果后端时,Celery会自动创建两个表来存储任务的结果元数据。此设置允许您自定义表的架构
database_table_schemas = { 'task': 'celery', 'group': 'celery', }
 
database_table_names
当SQLAlchemy配置为结果后端时,Celery会自动创建两个表来存储任务的结果元数据。此设置允许您自定义表名:
database_table_names = { 'task': 'myapp_taskmeta', 'group': 'myapp_groupmeta', }
 
 
Redis配置
当使用TLS连接(协议是redis://)时,可以传入broker“use”ssl中的所有值作为查询参数。到证书的路径必须是URL编码的,并且ssl证书请求是必需的。
result_backend = 'rediss://:password@host:port/db?
    ssl_cert_reqs=required
    &ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem                  # /var/ssl/myca.pem
    &ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem     # /var/ssl/redis-server-cert.pem
    &ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem'   # /var/ssl/private/worker-key.pem
 
redis_backend_use_ssl
redis_backend_use_ssl=Disabled
 
redis_max_connections
redis_socket_connect_timeout
redis_socket_timeout
默认120s
 
redis_retry_on_timeout
默认false
 
redis_socket_keepalive
默认false
 
 
Message Routing设置
task_queues
大多数用户不希望指定此设置,而应该使用自动路由功能。
如果确实要配置高级路由,则此设置应该是康布列队工作线程将使用的对象。
 
task_routes
task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video).tasks..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media',
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})

那么myapp.tasks.route_task could be:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}
if apply_async() has these arguments:   Task.apply_async(immediate=False, exchange='video', routing_key='video.compress')
 
task_queue_max_priority
brokers RabbitMQ
Default: None.
 
task_default_priority
brokersRabbitMQ, Redis
Default: None.
 
task_default_queue
Default: "celery".
 
task_default_exchange
Default: Uses the value set for task_default_queue.
 
task_default_routing_key
Default: Uses the value set for task_default_queue.
 
task_default_delivery_mode
Default: "persistent".
 
 

Broker设置
broker_url
broker_url='amqp://admin:admin@10.9.16.178:5672/'
 
broker_read_url / broker_write_url
broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'
 
broker_heartbeat
默认值:120.0秒
注意:此值仅由工作线程使用,客户端此时不使用心跳。
 
broker_heartbeat_checkrate
如果heartbeat为10.0且速率为默认值2.0,则每5秒执行一次检查(心跳发送速率的两倍)。
 
broker_use_ssl
import ssl
broker_use_ssl = {
   'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}
 
broker_pool_limit
默认10
 
broker_connection_timeout
默认4
 
broker_connection_max_retries
默认100
 
 
Worker设置
imports
工作程序启动时要导入的模块序列。
imports=( 'celery_app.tasks', )
 
include
此模块在imports中的模块之后导入
 
worker_concurrency
默认值:CPU核心数。
执行任务的并发工作进程/线程/绿色线程的数量。
 
worker_prefetch_multiplier
默认值:4。
一次预取的消息数乘以并发进程数。默认值为4(每个进程有四条消息)。默认设置通常是一个不错的选择,但是,如果队列中等待很长时间的运行任务,并且必须启动工作线程,请注意,第一个启动的工作进程将收到最初消息数的四倍。因此,任务可能不会公平地分配给工人。
要禁用预取,请将worker_prefetch_multiplier设置为1。将该设置更改为0将允许工作进程继续使用所需的任意数量的消息。
 
worker_max_tasks_per_child
可以执行的最大任务数
 
worker_max_memory_per_child
可以执行的最大占用内存
worker_max_memory_per_child = 12000 # 12MB
 
worker_disable_rate_limits
默认Disabled
 
worker_timer_precision
设置ETA计划程序在重新检查计划之间可以休眠的最长时间(秒)。
 
worker_enable_remote_control
是否启用远程控制

原文地址:https://www.cnblogs.com/absoluteli/p/14016870.html