Celery--Worker

1.启动worker
可以在同一台计算机上启动多个工作线程,但是请确保通过使用--hostname参数指定节点名称来命名每个单独的工作线程:
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
该hostname参数可以扩展以下变量:
  • %h:主机名,包括域名。
  • %n:仅主机名。
  • %d:仅域名。
2.停止worker
由于进程无法覆盖KILL信号,因此worker将无法收割其子代;
此命令通常可以解决问题:
$ pkill -9 -f 'celery worker'
如果系统上没有pkill命令,则可以使用稍长的版本:
$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
 
3.重启worker
$ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid
 
4.文件路径参数
--logfile, --pidfile以及 --statedb可以包含变量
节点名称替换
  • %p:完整的节点名称。
  • %h:主机名,包括域名。
  • %n:仅主机名。
  • %d:仅域名。
  • %i:Prefork池进程索引;如果为MainProcess,则为0。
  • %I:带分隔符的Prefork池过程索引。
例如,当前主机名是george@foo.example.com则它们将扩展为:
  • --logfile=%p.log -> george@foo.example.com.log
  • --logfile=%h.log -> foo.example.com.log
  • --logfile=%n.log -> george.log
  • --logfile=%d.log -> example.com.log
 
5.并发
可以使用--concurrency参数更改工作进程/线程 数,默认为计算机上可用的CPU数。
简写是-c选项
 
6.速率限制
在运行时更改速率限制
每分钟最多执行200个该类型的任务:
app.control.rate_limit('myapp.mytask', '200/m')
上面没有指定目的地,因此更改请求将影响集群中的所有工作程序实例。如果只想影响特定的工人列表,则可以包含以下destination参数:
app.control.rate_limit('myapp.mytask', '200/m', destination=['celery@worker1.example.com'])
 
7.队列
通过为-Q选项提供逗号分隔的队列列表,可以指定启动时要消耗的队列:
$ celery -A proj worker -l info -Q foo,bar,baz
 
1.添加消费者
要告诉集群中的所有工作人员从名为“ foo”的队列开始消耗:
$ celery -A proj control add_consumer foo
 
如果要指定特定工作者,则可以使用以下 --destination参数:
$ celery -A proj control add_consumer foo -d celery@worker1.local
 
还可以使用app.control.add_consumer()方法动态地完成相同操作:
app.control.add_consumer('foo', reply=True)
#[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}] app.control.add_consumer('foo', reply=True, destination=['worker1@example.com']) # [{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]
 
如果需要更多控制,还可以指定exchange,routing_key甚至其他选项:
app.control.add_consumer( ... queue='baz', ... exchange='ex', ... exchange_type='topic', ... routing_key='media.*', ... options={ ... 'queue_durable': False, ... 'exchange_durable': False, ... }, ... reply=True, ... destination=['w1@example.com', 'w2@example.com'])
 
2.取消消费者
强制集群中的所有工作人员从队列中取消消费,可以使用celery控制程序:
$ celery -A proj control cancel_consumer foo
 
--destination参数可用于指定要执行命令的工作程序或工作程序列表:
$ celery -A proj control cancel_consumer foo -d celery@worker1.local
 
还可以使用以下app.control.cancel_consumer()方法以编程方式取消使用者 :
>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]
 
8.远程关机
此命令将优雅地远程关闭工作进程:
app.control.broadcast('shutdown') # shutdown all workers
app.control.broadcast('shutdown', destination='worker1@example.com')
 
9.ping
worker用字符串'pong'答复,仅此而已。除非您指定自定义超时,否则它将使用默认的一秒超时进行答复:
app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
{'worker2.example.com': 'pong'},
{'worker3.example.com': 'pong'}
]
支持destination参数,因此可以指定要ping的worker:
ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'}, {'worker3.example.com': 'pong'}]
 
10.自定义远程控制命令
这是一个示例控制命令,用于增加任务预取计数:
from celery.worker.control import control_command
@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}
确保将此代码添加到worker导入的模块中:这可以与定义Celery应用程序的模块相同,也可以将模块添加到imports设置中。
重新启动工作程序,以便注册控制命令:
$ celery -A proj control increase_prefetch_count 3
 
还可以将操作添加到celery检查程序中,例如读取当前预取计数的操作:
from celery.worker.control import inspect_command
@inspect_command()
def current_prefetch_count(state):
  return {'prefetch_count': state.consumer.qos.value}
重新启动工作程序后,可以使用celery inspect程序查询此值 :
$ celery -A proj inspect current_prefetch_count
 
 

原文地址:https://www.cnblogs.com/absoluteli/p/14017080.html