分布式框架Celery(转)

一.简介

    Celery是一个异步任务的调度工具。

    Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

    在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

    这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。
我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。 Celery(芹菜)是一个异步任务队列
/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。
它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,
使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

二.Celery 的架构

  

  Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。我们需要一个消息队列来下发我们的任务。
首先要有一个消息中间件,此处选择rabbitmq (也可选择 redis 或 Amazon Simple Queue Service(SQS)消息队列服务)。推荐 选择 rabbitmq 。使用RabbitMQ是官方特别推荐的方式。它的架构组成如下图:

Celery_framework

可以看到,Celery 主要包含以下几个模块:

  • 任务模块 Task

    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

  • 消息中间件 Broker

    Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

  • 任务执行单元 Worker

    Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

  • 任务结果存储 Backend

    Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。

各个中间件的比较:

  

三.安装及初步使用

pip install celery #安装celery 

  小试牛刀:1. 定义任务函数。2. 运行celery服务。3. 客户应用程序的调用。

  编写一个tasks.py的文件

# encoding: utf-8
from celery import Celery
#任务调度名称,指定中间件和结果存储的位置
app = Celery('tasks',broker="redis://127.0.0.1:6379/0",backend='redis://127.0.0.1:6379/1')

@app.task
def add(x,y):
    return x + y

  上述代码导入了celery,然后创建了celery 实例 app,实例化的过程中指定了任务名tasks(和文件名一致),传入了broker和backend。然后创建了一个任务函数add

  运行:

celery -A task worker -l info -P eventlet

  这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别,-P这个参数是celery4.x之后要使用的,否则会报错。可以使用 celery –help 命令来查看celery命令的帮助文档。执行命令后,

worker界面展示信息如下:

  

  如果你不想使用 celery 命令来启动 worker,可直接使用文件来驱动,修改task.py (增加入口函数main)

if __name__ == '__main__':
    app.start()

  再执行

python task.py worker

   终端调用(需要在tasks.py同级目录下运行):

D:	omcelery
λ python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> r=add.delay(3,4)
>>> print(r.get())
7
>>> print(r.result)
7
>>> print(r.ready())

  到redis看一下结果:

另外一种方法:

  启用任务和的调度都使用脚本 

  功能:模拟一个耗时操作,并打印 worker 所在机器的 IP 地址,中间人和结果存储都使用 redis 数据库

#encoding=utf-8
#filename my_first_celery.py
from celery import Celery
import time
import socket

app = Celery('tasks', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' )

def get_host_ip():
    """
    查询本机ip地址
    :return: ip
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip


@app.task
def add(x, y):
    time.sleep(3) # 模拟耗时操作
    s = x + y
    print("主机IP {}: x + y = {}".format(get_host_ip(),s))
    return s 

  启动这个 worker:

celery -A my_first_celery worker -l info -P eventlet

  这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别。可以使用 celery –help 命令来查看celery命令的帮助文档。执行命令后,worker界面展示信息如下:

  

  调用任务:

  在 my_first_celery.py 的同级目录下编写如下脚本 start_task.py如下。

  

#encoding=utf-8
from my_first_celery import add #导入我们的任务函数add
import time
result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行

while not result.ready():# 循环检查任务是否执行完毕
    print(time.strftime("%H:%M:%S"))
    time.sleep(1)

print(result.get()) #获取任务的返回结果
print(result.successful()) #判断任务是否成功执行

  执行

python start_task.py

  结果如下所示:

  

  发现等待了大约3秒钟后,任务返回了结果24,并且是成功完成,此时worker界面增加的信息如下:

  

  这里的信息非常详细,其中2004447e-1183-4d65-ae5d-bd8ead70e216是taskid,只要指定了 backend,根据这个 taskid 可以随时去 backend 去查找运行结果,使用方法如下:

>>> from my_first_celery import add
>>> taskid='2004447e-1183-4d65-ae5d-bd8ead70e216'
>>> add.AsyncResult(taskid).get()
24
>>> 


#或者
>>> from celery.result import AsyncResult
>>> AsyncResult(taskid).get()
24

  重要说明:如果想远程执行 worker 机器上的作业,请将 my_first_celery.py 和 start_tasks.py 复制到远程主机上(需要安装
celery),修改 my_first_celery.py 指向同一个中间人和结果存储,再执行 start_tasks.py 即可远程执行 worker 机器上的作业。my_first_celery.add函数的代码不是必须的,你也要以这样调用任务:

from my_first_celery import app
app.send_task("my_first_celery.add",args=(1,3))

四.第一个 celery 项目

  在生产环境中往往有大量的任务需要调度,单独一个文件是不方便的,celery 当然支持模块化的结构,我这里写了一个用于学习的 Celery 小型工程项目,含有队列操作,任务调度等实用操作,目录树如下所示:

 
    
  其中 init.py是空文件,目的是告诉 Python myCeleryProj 是一个可导入的包.
  app.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#author tom

from celery import Celery

app = Celery("myCeleryProj", include=["myCeleryProj.tasks"])

app.config_from_object("myCeleryProj.settings")

if __name__ == "__main__":
    app.start()

  settings.py

#!/usr/bin/env python
# -*- codin
# g: utf-8 -*-
#author tom
from kombu import Queue
import re
from datetime import timedelta
from celery.schedules import crontab


CELERY_QUEUES = (  # 定义任务队列
    Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
    Queue("tasks_A", routing_key="A.#"),  # 路由键以“A.”开头的消息都进tasks_A队列
    Queue("tasks_B", routing_key="B.#"),  # 路由键以“B.”开头的消息都进tasks_B队列
)

CELERY_TASK_DEFAULT_QUEUE = "default"  # 设置默认队列名为 default
CELERY_TASK_DEFAULT_EXCHANGE = "tasks"
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"

CELERY_ROUTES = (
    [
        (
            re.compile(r"myCeleryProj.tasks.(taskA|taskB)"),
            {"queue": "tasks_A", "routing_key": "A.import"},
        ),  # 将tasks模块中的taskA,taskB分配至队列 tasks_A ,支持正则表达式
        (
            "myCeleryProj.tasks.add",
            {"queue": "default", "routing_key": "task.default"},
        ),  # 将tasks模块中的add任务分配至队列 default
    ],
)


# CELERY_ROUTES = (
#    [
#        ("myCeleryProj.tasks.*", {"queue": "default"}), # 将tasks模块中的所有任务分配至队列 default
#    ],
# )

# CELERY_ROUTES = (
#    [
#        ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
#        ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
#        ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
#    ],
# )

BROKER_URL = "redis://127.0.0.1:6379/0"  # 使用redis 作为消息代理

CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"  # 任务结果存在Redis

CELERY_RESULT_SERIALIZER = "json"  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显


CELERYBEAT_SCHEDULE = {
    "add": {
        "task": "myCeleryProj.tasks.add",
        "schedule": timedelta(seconds=10),
        "args": (10, 16),
    },
    "taskA": {
        "task": "myCeleryProj.tasks.taskA",
        "schedule": crontab(hour=21, minute=10),
    },
    "taskB": {
        "task": "myCeleryProj.tasks.taskB",
        "schedule": crontab(hour=21, minute=12),
    },
}

  readme.txt

#启动 worker 
#分别在三个终端窗口启动三个队列的worker,执行命令如下所示:
celery -A myCeleryProj.app worker -Q default -l info
celery -A myCeleryProj.app worker -Q tasks_A -l info
celery -A myCeleryProj.app worker -Q tasks_B -l info
#当然也可以一次启动多个队列,如下则表示一次启动两个队列tasks_A,tasks_B。
celery -A myCeleryProj.app worker -Q tasks_A,tasks_B -l info
#则表示一次启动两个队列tasks_A,tasks_B。
#最后我们再开启一个窗口来调用task: 注意观察worker界面的输出
>>> from myCeleryProj.tasks import *
>>> add.delay(4,5);taskA.delay();taskB.delay() #同时发起三个任务
<AsyncResult: 21408d7b-750d-4c88-9929-fee36b2f4474>
<AsyncResult: 737b9502-77b7-47a6-8182-8e91defb46e6>
<AsyncResult: 69b07d94-be8b-453d-9200-12b37a1ca5ab>
#也可以使用下面的方法调用task
>>> from myCeleryProj.app import app
>>> app.send_task(myCeleryProj.tasks.add,args=(4,5)
>>> app.send_task(myCeleryProj.tasks.taskA)
>>> app.send_task(myCeleryProj.tasks.taskB)

转自:https://blog.csdn.net/somezz/article/details/82343346

五.管理与监控

  Celery管理和监控功能是通过flower组件实现的,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理。

  安装使用

  

pip3 install flower

  

  启动

 flower -A project --port=5555   
# -A :项目目录
#--port 指定端口

  访问http:ip:5555

    

    api使用,例如获取woker信息  

    

curl http://127.0.0.1:5555/api/workers

    结果:

    

  更多api参考:https://flower.readthedocs.io/en/latest/api.html

原文地址:https://www.cnblogs.com/tjp40922/p/11345525.html