Flask 流式响应

背景

在 web 场景下,经常会碰到下载文件的需求,通常小文件我们会采用 Flask send_file 或者 send_from_directory的方式,下载,但是当下载的文件是一个大压缩文件(>1GiB)时,这种方式就显得不友好了,我们需要采用流式下载的方式返回给客户端。

流式下载

简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from flask import Response


def (file_path):
def generate():
if not os.path.exists(file_path):
raise "File not found."
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size=10 * 1024 * 1024)
if not chunk:
break
yield chunk

return Response(generate(), content_type="application/octet-stream")
```

运行 Flask app,可以正确下载文件,但是下载只有实时速度,没有文件总大小,导致无法知道下载进度,也没有文件类型,这些我们都可以通过增加 header 字段实现:
```python
response = Response(generate(), mimetype='application/gzip')
response.headers['Content-Disposition'] = 'attachment; filename={}.tar.gz'.format("download_file")
response.headers['content-length'] = os.stat(str(file_path)).st_size
return response

这样,我们下载文件就可以看到文件类型、文件总大小及已下载大小了,其中 mimetype 根据实际压缩文件类型修改匹配即可。


转发流式下载

当我们下载本地节点文件,可以通过上述方法实现,但是如果我们的产品是集群形式的,要求在集群中的任一节点均可下载集群中所有节点的指定文件,我们就需要支持将流式下载转发并实时下载,避免访问节点占用太多内存。

如果是单节点转发流式请求,我们可以通过 flask 的 stream_with_context 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import (
Flask,
Response,
stream_with_context
)
import requests

app = Flask(__name__)

@app.route("/download/<file_path>", method=["GET"])
def (file_path):
url_prefix = "http://1.1.1.1/"
remote_url = url_prefix + file_path
req = requests.get(remote_url, stream = True)
return Response(stream_with_context(req.iter_content()),
content_type = req.headers['content-type'])

if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True)

在我们访问 http://localhost:5000/download/file_name 时,通过 requests 访问远端节点 1.1.1.1 的地址,并将请求通过流式的方式转发至客户端,实现下载。

如果是转发多节点流式请求,我们该如何保证多个请求最终 merge 后是一个正确的文件呢?
通过查询资料,排除了标准库中的 tarfile 和 zipfile 打包压缩方式,最终采用 zipstream(https://github.com/allanlei/python-zipstream) 第三方库实现。


zipstream 支持通过迭代器的方式写入文件,并可实时压缩读取,官方示例如下:

1
2
3
4
5
6
7
8
9
10
def iterable():
for _ in xrange(10):
yield b'this is a byte stringx01n'

z = zipstream.ZipFile()
z.write_iter('my_archive_iter', iterable())

with open('zipfile.zip', 'wb') as f:
for data in z:
f.write(data)

根据上述特性,我们结合转发单节点请求,实现同时请求多节点并实时压缩下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@app.route("/cluster_download/<file_path>", method=["GET"])
def cluster_download(reqs):
def generate(req):
z = zipstream.ZipFile(mode="w", compression=zipstream.ZIP_DEFLATED)
for req in reqs:
host = req.raw._fp.fp._sock.getpeername()[0]
z.write_iter("%s.tar.gz" % host, req.iter_content(chunk_size=10 * 1024 * 1024)
for chunk in z:
yield chunk

def get_file_size(reqs):
size = 0
for req in reqs:
size += int(req.headers.get("content-length"))
return size

remote_hosts = ["1.1.1.1", "2.2.2.2"]
reqs = []
for host in remote_hosts:
req = requests.get("http://%s/%s" % (host, file_path), timeout=5, stream=True)
if req.status_code == 200:
reqs.append(req)
response = Response(generate(reqs))
response.headers['mimetype'] = 'application/zip'
response.headers['Content-Disposition'] = 'attachment; filename=cluster_logs.zip)
response.hreads['content-length'] = get_file_size(reqs)

当我们访问 http://localhost/cluster_download/file_name 时,会先去 remote_hosts 中各个节点下载该文件,并通过 write_iter 的方式写入到 zip 文件中,Flask Response 返回的是 zip 文件中的数据块。


如果我们要在 zip 文件中增加某些运行过程中产生的数据,我们可以通过再定义一个生成器的方式:

1
2
3
4
def generate_file(content):
yield content

z.write_iter("running_status", generate_file)

这样我们就可以在最终的 zip 文件中,包含一个名为 running_status 的文件,文件内容为 content 的内容。

总结

这个需求在日常使用中是很常见的,跟下载类似,上传文件的话我们也可以采用类似的方式实现。

Streaming Contents

Sometimes you want to send an enormous amount of data to the client, much more than you want to keep in memory. When you are generating the data on the fly though, how do you send that back to the client without the roundtrip to the filesystem?

The answer is by using generators and direct responses.

Basic Usage

This is a basic view function that generates a lot of CSV data on the fly. The trick is to have an inner function that uses a generator to generate data and to then invoke that function and pass it to a response object:

from flask import Response

@app.route('/large.csv')
def generate_large_csv():
    def generate():
        for row in iter_all_rows():
            yield ','.join(row) + '
'
    return Response(generate(), mimetype='text/csv')

Each yield expression is directly sent to the browser. Note though that some WSGI middlewares might break streaming, so be careful there in debug environments with profilers and other things you might have enabled.

Streaming from Templates

The Jinja2 template engine also supports rendering templates piece by piece. This functionality is not directly exposed by Flask because it is quite uncommon, but you can easily do it yourself:

from flask import Response

def stream_template(template_name, **context):
    app.update_template_context(context)
    t = app.jinja_env.get_template(template_name)
    rv = t.stream(context)
    rv.enable_buffering(5)
    return rv

@app.route('/my-large-page.html')
def render_large_template():
    rows = iter_all_rows()
    return Response(stream_template('the_template.html', rows=rows))

The trick here is to get the template object from the Jinja2 environment on the application and to call stream() instead of render() which returns a stream object instead of a string. Since we’re bypassing the Flask template render functions and using the template object itself we have to make sure to update the render context ourselves by calling update_template_context(). The template is then evaluated as the stream is iterated over. Since each time you do a yield the server will flush the content to the client you might want to buffer up a few items in the template which you can do with rv.enable_buffering(size)5 is a sane default.

Streaming with Context

Changelog

Note that when you stream data, the request context is already gone the moment the function executes. Flask 0.9 provides you with a helper that can keep the request context around during the execution of the generator:

from flask import stream_with_context, request, Response

@app.route('/stream')
def streamed_response():
    def generate():
        yield 'Hello '
        yield request.args['name']
        yield '!'
    return Response(stream_with_context(generate()))

Without the stream_with_context() function you would get a RuntimeError at that point.

 
 
 
@app.route('/')
def aws_api_route_puppet_apply(ip=None):
    output = somemethod(var1,var2,var3)
    return Response(json.dumps(output), mimetype='application/json')

有没有办法使用flask和HTML将某些方法流式传输到浏览器或者我是否需要使用javascript?

 
就像文档所说的那样,只需创建一个生成器并生成要返回给客户端的每一行.

如果输出为10行,则以下内容将打印到客户端的十行(因为它们可用)中的每一行:

@app.route('/')
def aws_api_route_puppet_apply(ip=None):
    def generate():
        for row in somemethod(var1,var2,var3):
            yield row + '
'
    return Response(generate(),  mimetype='application/json')

原文地址:https://www.cnblogs.com/ExMan/p/14298905.html