celery & flower

Celery 的使用场景

  异步任务:将耗时操作任务提交给Celery去异步执行。
  定时任务:比如每日数据统计

Celery 的基本结构

task producer:任务生产者,产生任务并交给任务队列

celery beat:任务调度器,周期性地将配置中需要执行的任务发送给任务队列(定时任务)

borker:任务队列,介于生产者和消费者的中间人,就是生产者存放和消费者拿取任务的地方

celery 本身不提供队列服务,一般用 redis 来充当 broker 的角色

worker:任务消费者,它会实时监控队列,如果有任务就拿出来执行

result:储存任务结果的地方

import time
from celery import Celery
from celery import group

# 配置celery backend and broker
broker = "redis://localhost:6379/14"
backend = 'redis://localhost:6379/15'
# 创建Celery实例
app = Celery('my_task', broker=broker, backend=backend)


@app.task
def add(x, y):
    print('enter func...')
    time.sleep(5)  # 模拟耗时操作
    return x + y


@app.task(trail=True)
def A(how_many):
    return group(B.s(i) for i in range(how_many))()


@app.task(trail=True)
def B(i):
    return pow2.delay(i)


@app.task(trail=True)
def pow2(i):
    return i ** 2
import time
from celery.result import ResultBase

import task_1


def trigger():
    t1 = time.time()
    print('start task...')
    print(task_1.add.name)
    data = task_1.add.delay(4, 11)  # 这里需要用 celery 提供的接口 delay 进行调用
    print('end task...')
    t2 = time.time()
    print('共耗时:%s' % str(t2 - t1))



def trigger1(num: int):
    print("tringger1 ...")
    result_1 = task_1.A.delay(num)
    for v in result_1.collect():
        print(v)



if __name__ == '__main__':
    # celery worker -A task_1 -l INFO -E &
    # celery flower --port=555 --broker=redis://127.0.0.1:6379/14
    trigger()
    trigger1(11)
原文地址:https://www.cnblogs.com/Mint-diary/p/14511194.html