celery中的生产者消费者问题

celery中的生产者消费者问题

task1.py文件中:

# demo1:task.py and celery.py in one file
# run it by
from celery import Celery
import time

# 定义worker(消费者),并指定broker和backend(共享缓冲区)
# 每启动一个woker就相当于创建一个消费者(启动woker方法:celery -A 创建app的语句所在的文件名)
# 给woker指定queue的方法:
# 1、启动woker时-Q指定创建的这个消费者要从共享缓冲区中哪个队列中取出产品并消费
# 2、如果没有指定queue,则启动的woker默认从共享缓冲区的default队列中取出产品并消费
# 注意:两个不同的woker监听共享缓冲区中的同一个队列会出错(避免出错的方法:给启动的每个woker用-Q指定要监听的队列,并用@...(queue='')的方法给要在使用该woker处理的task函数指定相同的放入的队列)

app2=Celery('task2_app2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/0')

# 定义task(生产者)
# 在程序执行过程中,每调用一次add.delay就相当于生产者生产一个产品并放入共享缓冲区
@app2.task(queue='queue2')# 指定该生产者生产的产品要放入共享缓冲区中的哪个队列
# 给task指定queue的方法:
# 1、此处是直接给task指定了queue
# 2、也可以采用给task命名,然后在app.conf.update中定义name与queue的对应规则来批量给多个task指定对应的queue。
# 3、如果没有指定,则默认将task放入共享缓冲区中名为default的队列中
def add(x,y):
print('running ',x,'+',y)
print(x+y)
time.sleep(10)
return x+y

 

 

生产者:应用程序

生产动作:调用add.delay()

共享缓冲区:broker和backend

消费者:每个woker都是一个消费者

消费动作:woker启动后会自动监听并从broker中取出任务并执行(消费)

 

创建消费者:celery -A task1 worker -l info

(开启worker的实质实际上就是执行app=Celery(...)语句,可以使用 –-concurrency=个数 来限制每个消费者可以并行的线程数)

 

原文地址:https://www.cnblogs.com/zealousness/p/8757762.html