python浅学【网络服务中间件】之Celery

一、关于Celery:

什么是任务队列:

  任务队列一般用于线程或计算机之间分配工作的一种机制。

  任务队列的输入是一个称为任务的工作单元,有专门的工作进行不断的监视任务队列,进行执行新的任务工作。

 

什么的Celery:

  Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。

  Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。

  Celery 是用 Python 编写的,但协议可以用任何语言实现。除了 Python 语言实现之外,还有Node.js的node-celery和php的celery-php

二、python对Celery的简单使用:

编写task.py

import time
from celery import Celery


app = Celery('task', broker='amqp://', backend='redis://localhost')
app.config_from_object('config')

@app.task
def worker(name):
    print(f'{name}工作正在运行')
    time.sleep(2)
    return f'{name}-ok'

执行命令:celery worker -A task --loglevel=info

task是任务文件名,worker任务角色,--loglevel=info 任务日志级别

结果:

(base) [root@localhost mywork]# celery worker -A task --loglevel=info
/root/miniconda3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@localhost.localdomain v4.4.2 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1062.18.1.el7.x86_64-x86_64-with-centos-7.7.1908-Core 2020-03-23 22:25:02
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         task_log:0x7fef6d700150
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . task_log.worker

[2020-03-23 22:25:03,019: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-03-23 22:25:03,030: INFO/MainProcess] mingle: searching for neighbors
[2020-03-23 22:25:04,054: INFO/MainProcess] mingle: all alone
[2020-03-23 22:25:04,087: INFO/MainProcess] celery@localhost.localdomain ready.

编写run.py:

from task import worker

def run(name):
    w = worker.delay(name)
    while not w.ready():
        pass
    result = w.get()
    print(result)
    return result

run('log')
run('Riy')
run('test')

执行run.py 结果如下:

[tasks]
  . task.worker
[2020-03-23 22:54:32,337: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-03-23 22:54:32,348: INFO/MainProcess] mingle: searching for neighbors
[2020-03-23 22:54:33,377: INFO/MainProcess] mingle: all alone
[2020-03-23 22:54:33,397: INFO/MainProcess] celery@localhost.localdomain ready.
[2020-03-23 22:54:37,556: INFO/MainProcess] Received task: task.worker[2731ffec-d29e-4271-b41e-ca1c58b666c6]  
[2020-03-23 22:54:37,557: WARNING/ForkPoolWorker-1] log工作正在运行
[2020-03-23 22:54:39,567: INFO/MainProcess] Received task: task.worker[d4630eb6-15a6-4535-a007-6753d5173d7e]  
[2020-03-23 22:54:39,568: INFO/ForkPoolWorker-1] Task task.worker[2731ffec-d29e-4271-b41e-ca1c58b666c6] succeeded in 2.011716676002834s: 'log-ok'
[2020-03-23 22:54:39,570: WARNING/ForkPoolWorker-1] Riy工作正在运行
[2020-03-23 22:54:41,573: INFO/ForkPoolWorker-1] Task task.worker[d4630eb6-15a6-4535-a007-6753d5173d7e] succeeded in 2.0033114409889095s: 'Riy-ok'
[2020-03-23 22:54:41,576: INFO/MainProcess] Received task: task.worker[135cc550-0141-44c1-9719-1bed5d85c0ca]  
[2020-03-23 22:54:41,577: WARNING/ForkPoolWorker-1] test工作正在运行
[2020-03-23 22:54:43,580: INFO/ForkPoolWorker-1] Task task.worker[135cc550-0141-44c1-9719-1bed5d85c0ca] succeeded in 2.0028840559971286s: 'test-ok'

  

如果您想要更好地控制任务执行的时间,例如,特定时间或一周中的某天,您可以使用crontab计划类型:

创建config.py文件如下:

from celery.schedules import crontab

CELERY_TIMEZONE = 'Asia/Shanghai'

CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 a.m
    'add_time':{
        'task':'task_log.worker',
        'schedule':crontab(hour=7, minute=30, day_of_week=1),
        'args':(16, 16)
    }
}
原文地址:https://www.cnblogs.com/riyir/p/12556001.html