来个芹菜,Celery的使用

1、写在前面

最近的flask项目遇到一些需要并发的情况,开始图省事,直接搞上了多线程,但是多线程的并发量很小,不利于扩展。

所以学习一下Celery的使用

2、基础概念

Celery是一个强大的分布式任务队列,他可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常用来实现异步任务和定时任务。异步任务比如发送邮件,文件上传图像处理等;定时任务就是需要在特定时间执行的任务。


 
 
  • 任务队列
    任务队列是一种跨线程,跨机器工作的一种机制,任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获取新的任务并处理。
  • 任务模块
    包含异步任务和定时任务。异步任务通常在业务逻辑中被触发并发往任务队列;定时任务由Celery Beat进程周期性地将任务发往任务队列。
  • 消息中间件Broker
    Broker,就是任务调度队列,接收任务生产者发来的消息(任务),将任务存入到队列。Celery本身不提供队列服务,官方推荐使用RabbitMQ和Redis等。
  • 任务执行单元Worker
    Worker是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
  • 任务结果存储Backend
  • Backend用于存储任务的执行结果,以供查询。同消息中间件一样,也可使用RabbitMQ和Redis,MySql等。

3、demo代码

3.1 异步任务

 1 # -*- coding: utf-8 -*-
 2 
 3 """
 4 Celery主类
 5 启动文件名必须为celery.py!!!
 6 """
 7 
 8 from __future__ import absolute_import  # 为兼容Python版本,绝对引入
 9 from celery import Celery, platforms
10 
11 app = Celery(
12     main='celery_task',  # celery启动包名称
13     broker='redis://127.0.0.1:6379/1',
14     backend='redis://127.0.0.1:6379/2',
15     include=['celery_task.tasks', ]  # celery所有任务
16 )
17 
18 if __name__ == '__main__':
19     app.start()
celery.py
 1 # -*- coding: utf-8 -*-
 2 
 3 """
 4 定义任务
 5 """
 6 
 7 from __future__ import absolute_import
 8 from celery import Celery, group
 9 from .celery import app
10 from time import sleep
11 
12 
13 @app.task
14 def add(x, y):
15     sleep(5)
16     return x + y
17 
18 
19 @app.task
20 def substract(x, y):
21     sleep(5)
22     return x - y
tasks.py
1 # -*- coding: utf-8 -*-
2 
3 from celery_task.tasks import add,substract
4 
5 # 立即告知celery去执行test_celery任务,并传入一个参数
6 result1 = add.delay(5,5)
7 print(result1.id)
8 result2 = substract.delay(5,5)
9 print(result2.id)
send_tasks.py
 1 # -*- coding: utf-8 -*-
 2 
 3 from celery.result import AsyncResult
 4 from celery_task.celery import app
 5 
 6 '''
 7 获取任务结果,但要想办法获取到task_id
 8 '''
 9 
10 async = AsyncResult(id='8d04fc47-0f72-49a1-a58a-b74e28eb9a41', app=app)
11 
12 if async.successful():
13     result = async.get()
14     print(result)
15     # result.forget() # 将结果删除,执行完成,结果不会自动删除
16     # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
17     # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
18 elif async.failed():
19     print('执行失败')
20 elif async.status == 'PENDING':
21     print('任务等待中被执行')
22 elif async.status == 'RETRY':
23     print('任务异常后正在重试')
24 elif async.status == 'STARTED':
25     print('任务已经开始被执行')
get_result.py

执行步骤

  • 在celery_test目录下cmd执行celery worker -A celery_task  -l info -P eventlet,开启worker,异步等待任务到来
  • 执行send_tasks.py,将发布任务(任务写入borker)
  • 等待一段时间后,可以在celery命令行看到执行结果
  • 执行get_result.py获取执行结果

3.2 定时任务

 

1 # -*- coding: utf-8 -*-
2 
3 # 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
4 # 该代码中,名字是不一样的,最好也要不一样
5 from __future__ import absolute_import
6 from celery import Celery
7 
8 app = Celery('celery_tasks')
9 app.config_from_object('celery_test2.config')
celery_test2.__init__.py
 1 # -*- coding: utf-8 -*-
 2 
 3 from __future__ import absolute_import
 4 from celery.schedules import crontab
 5 from datetime import timedelta
 6 
 7 # 使用redis存储任务队列
 8 broker_url = 'redis://127.0.0.1:6379/7'
 9 # 使用redis存储结果
10 result_backend = 'redis://127.0.0.1:6379/8'
11 
12 task_serializer = 'json'
13 result_serializer = 'json'
14 accept_content = ['json']
15 # 时区设置
16 timezone = 'Asia/Shanghai'
17 # celery默认开启自己的日志
18 # False表示不关闭
19 worker_hijack_root_logger = False
20 # 存储结果过期时间,过期后自动删除
21 # 单位为秒
22 result_expires = 60 * 60 * 24
23 
24 # 导入任务所在文件
25 imports = [
26     'celery_test2.celery_task2.task1',
27     'celery_test2.celery_task2.task2'
28 ]
29 
30 # 需要执行任务的配置
31 beat_schedule = {
32     'task1': {
33         # 具体需要执行的函数
34         # 该函数必须要使用@app.task装饰
35         'task': 'celery_test2.celery_task2.task1.add',
36         # 定时时间
37         # 每分钟执行一次,不能为小数
38         'schedule': crontab(minute='*/1'),
39         # 或者这么写,每小时执行一次
40         # "schedule": crontab(minute=0, hour="*/1")
41         # 执行的函数需要的参数
42         'args': (5,5)
43     },
44     'task2': {
45         'task': 'celery_test2.celery_task2.task2.substract',
46         # 设置定时的时间,10秒一次
47         'schedule': timedelta(seconds=10),
48         'args': (5,5)
49     }
50 }
config.py
1 # -*- coding: utf-8 -*-
2 from .. import app
3 from time import sleep
4 
5 @app.task
6 def add(x, y):
7     sleep(5)
8     print('add')
9     return x + y
task1
1 # -*- coding: utf-8 -*-
2 from .. import app
3 from time import sleep
4 
5 @app.task
6 def substract(x, y):
7     sleep(5)
8     print(substract)
9     return x - y
task2

执行步骤

  • 在celery_test2的同级目录下cmd执行celery -A celery_test2 worker -l info -P eventlet,开启worker,异步等待任务到来
  • 在celery_test2的同级目录下cmd执行celery -A celery_test2 beat,发送定时任务,定时像borker推送任务
  • 可在backend中查看执行结果,task_id

4、应用代码

4.1 Flask结合Celery,实现进度下载进度条

 

 1 #coding=utf-8
 2 from flask import Flask,url_for,jsonify,render_template
 3 from celery import Celery
 4 import random,time
 5 
 6 app = Flask(__name__)
 7 
 8 app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/1'
 9 app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/2'
10 
11 #celery初始化
12 celery_obj = Celery(app.name,
13                     broker=app.config['CELERY_BROKER_URL'],
14                     backend=app.config['CELERY_RESULT_BACKEND'] )
15 
16 @app.route('/')
17 def hello_world():
18     return 'Hello World!'
19 
20 @app.route('/nanobar')
21 def nanobar():
22     return render_template('nanobar.html')
23 
24 @app.route('/longtask', methods=['POST'])
25 def longtask():
26     #发布任务,类似于long_task.delay()
27     task1 = long_task.apply_async()
28     # 返回 202,与Location头,构建/status/task_id路由
29     return jsonify({}), 202, {'Location': url_for('taskstatus',task_id=task1.id)}
30 
31 #通过task_id获取任务结果
32 @app.route('/status/<task_id>')
33 def taskstatus(task_id):
34     task = long_task.AsyncResult(task_id)
35     if task.state == 'PENDING':
36         response = {
37             'state': task.state,
38             'current': 0,
39             'total': 1,
40             'status': 'Pending...'
41         }
42     elif task.state != 'FAILURE':
43         response = {
44             'state': task.state,
45             'current': task.info.get('current', 0),
46             'total': task.info.get('total', 1),
47             'status': task.info.get('status', '')
48         }
49         if 'result' in task.info:
50             response['result'] = task.info['result']
51     else:
52         # something went wrong in the background job
53         response = {
54             'state': task.state,
55             'current': 1,
56             'total': 1,
57             'status': str(task.info),  # this is the exception raised
58         }
59     return jsonify(response)
60 
61 
62 # 通过 celery_obj.task 装饰器装饰耗时任务对应的函数
63 # bind为True,会传入self给被装饰的方法
64 #Celery有很多内建状态比如 STARTED ,  SUCCESS 等等,当然Celery也允许程序员自定义状态。
65 # 本例子中使用的是自定义状态, PROGRESS 。与 PROGRESS 一起的还有 metadata 。
66 # metadata 是一个字典,包含当前进度,任务大小,以及消息。
67 #当循环跳出时,返回字典,字典中包含任务的执行结果。
68 @celery_obj.task(bind=True)
69 def long_task(self):
70     """随机列表"""
71     list1 = ['列表1-1', '列表1-2', '列表1-3', '列表1-4', '列表1-5']
72     list2 = ['列表2-1', '列表2-2', '列表2-3', '列表2-4', '列表2-5']
73     list3 = ['列表3-1', '列表3-2', '列表3-3', '列表3-4', '列表3-5']
74     message = ''
75     #模拟一个随机耗时的任务
76     total = random.randint(10, 50)
77     for i in range(total):
78         if not message or random.random() < 0.25:
79             message = '{0} {1} {2}...'.format(random.choice(list1),
80                                               random.choice(list2),
81                                               random.choice(list3))
82         #每次将状态更新
83         self.update_state(state='PROGRESS',
84                           meta={'current': i, 'total': total,'status': message})
85         time.sleep(1)
86     #跳出循环,代表任务结束
87     return {'current': 100, 'total': 100, 'status': '任务完成!','result': 666}
88 
89 
90 
91 if __name__ == '__main__':
92     app.run(debug=True)
app.py
 1 <!DOCTYPE html>
 2 <html lang="en">
 3 <head>
 4     <meta charset="UTF-8">
 5     <title>Title</title>
 6     <script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
 7 <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
 8 </head>
 9 <body>
10     <h2>进度条测试</h2>
11     <button id="start-bg-job" onclick="start_long_task()">点击开始</button><br><br>
12     <div id="progress"></div>
13 </body>
14 
15 <script>
16 //点击后添加进度条,并开始异步任务
17 function start_long_task() {
18         // 添加进度条
19         div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div>&nbsp;</div></div><hr>');
20         $('#progress').append(div);
21 
22         // 进度条
23         var nanobar = new Nanobar({
24             bg: '#44f',
25             target: div[0].childNodes[0]
26         });
27 
28         // 向后端/longtask发送请求,发布任务,获得task_id,开始更新进度条
29         $.ajax({
30             type: 'POST',
31             url: '/longtask',
32             success: function(data, status, request) {
33                 status_url = request.getResponseHeader('Location');
34                 update_progress(status_url, nanobar, div[0]);
35             },
36             error: function() {
37                 alert('Unexpected error');
38             }
39         });
40     };
41 
42     function update_progress(status_url, nanobar, status_div) {
43         // 请求后端
44         $.getJSON(status_url, function(data) {
45             // 更新进度条
46             percent = parseInt(data['current'] * 100 / data['total']);
47             nanobar.go(percent);
48             $(status_div.childNodes[1]).text(percent + '%');
49             $(status_div.childNodes[2]).text(data['status']);
50             //任务完成,添加结果
51             if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
52                 if ('result' in data) {
53                     // show result
54                     $(status_div.childNodes[3]).text('Result: ' + data['result']);
55                 }
56                 else {
57                     // 没有result代表任务异常
58                     $(status_div.childNodes[3]).text('Result: ' + data['state']);
59                 }
60             }
61             else {
62                 // 任务未完成,每1秒递归执行函数
63                 setTimeout(function() {
64                     update_progress(status_url, nanobar, status_div);
65                 }, 1000);
66             }
67         });
68     };
69 </script>
70 </html>
nanobar.html

执行步骤

  • 在flask_tesk的同级目录下cmd执行celery -A app.celery_obj worker -l info -P eventlet,开启worker,异步等待任务到来
  • python app.py run开启web应用
  • 前端点击按钮,访问/longtask,发布任务,获取task_id,执行update_progress函数
  • update_progress访问/status/task_id,获取任务结果,没有结束则每一秒递归运行
  • 任务结束后,展示任务结果
原文地址:https://www.cnblogs.com/cx59244405/p/12233496.html