Celery分布式任务队列

Celery任务处理:
task 任务 -- 一个Python函数, 内书写异步代码, 使之可在项目中随用随调
broker 中间人 -- 项目中调用异步代码后, 作为任务发布给中间人, 中间人进行分配调度,将任务分配入所属队列
queue 队列 -- 任务放进队列中,先进先出, 交给处理者执行.
worker 处理者 -- 每个处理者为一个独立进程, 处理者和队列关系可以根据项目情况为一对一,一对多,多对一关系.

使用流程:


0.选择并配置broker环境.


1.创建异步列表
创建用以执行异步代码的py文件

project/celery_task/tasks.py


导入celery后实例化对象并写入调用路径 以及 broker参数

from celery import Celery

# 实例化对象并传参. 此处使用redis作为broker..应当注意broker参数的书写规范
app = Celery('celery_task.tasks', broker='redis://127.0.0.1:6379/8')



2.更改代码

分离出项目中需要异步执行的代码
在tasks中def每段需要异步执行的代码

# 使用实例化对象的task方法装饰需要异步执行的代码
@app.task
def PrintSum(a, b):
    time.sleep(5)
    a += b
    print('a和b的和为:' + str(a))
    time.sleep(5)

...


改写原代码
导入tasks里对应的模块,调用delay方法将任务并传参, 使任务加入queue队列

from celery_tasks.tasks import PrintSum
...
a = 1
b = 2
PrintSum.delay(a, b)



3.环境配置
安装配置worker的必要运行环境,Django,celery,以及所分配任务 必要的环境依赖.

复制代码到worker环境内(与该任务相关文件)
worker环境内的tasks内需增加Django初始化配置

import os
import django

# 系统环境变量-设置默认(Django的设置模块, 以及自定设置)  原wsgi接口协议文件中书写. !参数应用双引号.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dailyfresh.settings")

# 手动进行django初始化
django.setup()


4.运行
启动worker

# 使用celery模块调用APP, 路径为...作为处理者

celery -A celery_tasks.tasks worker


# 使用celery模块调用APP, 路径为...作为处理者(设置消息级别为:通知级)

celery -A celery_tasks.tasks worker -l info

##

后期对任务内容修改, 也同样需要修改worker端代码

原文地址:https://www.cnblogs.com/jrri/p/11531172.html