Django学习笔记(20)celery_tasks 异步任务初识

celery_tasks 异步任务

  当我们需要批量的去执行一些接口,如测试平台的批量运行测试用例时,如果是同步任务的话,会等待用例一个个执行完毕才有返回结果。当点击运行后,后台直接返回一条信息,由celery服务来运行用例,这就是异步

celery_tasks 工作流

  平台会通过celery提供的方法将我们的操作推到中间件如redis中,celery会启动一个服务用来监控redis,当redis中有数据时,会取数据取出来后运行完毕后,返回结果给redis,redis有结果后 就将数据传给平台

安装celery

1 pip install Celery

注意

1、在起celery服务之前,需要本地或者远程启动redis服务,可在config.py文件中配置redis地地址

2、每次修改了celery文件,都需要从命令行重新启动celery服务

celery_tasks 目录结构

              

Django工程目录

config.py     ---------------------配置中间件,以redis为例

1 # broker_url = "redis://:password@ip:port/4"
2 # result_backend = "redis://:password@ip:port/4"
3 #result_backend = "redis://ip:port/4"#没有密码
4 
5 #本地redis
6 broker_url = "redis://127.0.0.1:6379/4"  #读取数据地址
7 result_backend = "redis://127.0.0.1:6379/4"  #结果返回数据地址

main.py             ------------------启动tasks任务配置

 1 from celery import Celery
 2 app = Celery('sksystem') #创建异步对象
 3 
 4 # 获取celery的配置信息
 5 app.config_from_object('celery_tasks.config')
 6 # 提供tasks的路径,自动发现需要运行的task任务
 7 app.autodiscover_tasks(['celery_tasks.run'])
 8 
 9 # 启动celery的方法 默认以cpu的核数多进程的方式启动
10 # celery -A 应用路径 worker -l 日志级别
11 # mac同学
12 # celery -A celery_tasks.main worker -l info 普通启动
13 # celery multi start w1 -A celery_tasks.main -l info  --logfile=logs/celerylog.log --pidfile=logs/celerypid.pid 后台运行
14 # celery flower -A celery_tasks.main 打开一个web页面启动 需要提前安装下flow  安装命令:pip install flower
15 
16 # win同学
17 # pip install eventlet
18 # celery -A celery_tasks.main worker -l info -P eventlet
19 # -- * - **** ---
20 # - ** ---------- [config]
21 # - ** ---------- .> app:         sksystem:0x103d0deb8                      启动是那个app的任务
22 # - ** ---------- .> transport:   redis://10.168.100.21:6379/2              设置的broker的队列是那个
23 # - ** ---------- .> results:     redis://10.168.100.21:6379/3              设置backend的存储地址
24 # - *** --- * --- .> concurrency: 4 (prefork)                               默认启动的进程数
25 # -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
26 # --- ***** -----
27 #  -------------- [queues]
28 #                 .> celery           exchange=celery(direct) key=celery
29 
30 
31 # 在view视图中只需要导入tasks中写好的任务方法 通过任务方法调用delay()即可
32 # from celery_tasks.run.tasks import run_case
33 # 调用task任务 参数可以在delay中传递,正常调用一样
34 # run_case.delay() #此方法会返回任务task_id 同run.tasks中的 taskname.request.id

tasks.py    --------------- 定义task方法

from celery_tasks.main import app
# 在celery中使用logger
from celery.utils.log import get_task_logger
logger = get_task_logger('django_server')

#用例运行
@app.task(name='run_case')
def run_case(case_id,user_id):
    task_id = run_case.request.id  #通过方法名.request.id 获取task_id
    print("task_id",task_id)
    print(case_id)
    print(user_id)

#集合运行
@app.task(name='run_collection')
def run_collection(collecti_id,user_id):
    task_id = run_collection.request.id  #获取task_id
    print("task_id",task_id)
    print(collecti_id)
    print(user_id)

Views.py ----------------------通过delay()方法启用celery服务

 1 from celery_tasks.run.tasks import run_case
 2 
 3 #固定celery的调用方式
 4 #run_case.delay(1,2)   #delay()内部传递参数
 5 
 6 class CollectionRunView(View): #运行测试集合为例子
 7     def post(self,request):
 8         collect_id = request.POST.get('collect_id')
 9         user_id = request.POST.get('user_id')
10         for item_id in json.loads(collect_id):
11             #当页面调用异步任务时,会返回一个唯一的任务id
12             task_id = run_collection.delay(item_id,user_id) #启动异步任务,delay方法支持像函数一样传参
13             models.CaseCollection.objects.filter(id=item_id).update(report_batch=task_id,status=3)
14             print("task_id",task_id)
15         return NbResponse()
原文地址:https://www.cnblogs.com/bugoobird/p/13441380.html