sanic官方文档解析之Example(一)

1,示例

这部的文档是简单的示例集合,它能够帮助你快速的启动应用大部分的应用,这些应用大多事分类的,并且提供给ini工作的连接代码:

1.1,基础示例

这部分示例集成了提供简单sanic简单的代码

单一APP

一个简单的sanic应用with一个简单的异步方法通过text和json类型的响应.

from sanic import Sanic
from sanic import response as res
# 实例化Sanic对象
app = Sanic(__name__)


# 服务器访问的路由
@app.route("/")
async def test(req):
    return res.text("I am a teapot", status=418)  # 返回的是字符串格式的数据

# 开启服务
if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)
from sanic import Sanic
from sanic import response

# 实例化Sanic对象
app = Sanic(__name__)


# 服务器的访问路由
@app.route("/")
async def test(request):
    return response.json({"test": True})  # 返回的是json数据格式

# 启动服务
if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

1.2,使用sanic.view来创建app应用

展示了使用sanic.viewes.httpmethodview的简单机制,以及将其扩展为为为视图提供自定义异步行为的方法

from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text

# 实例化Sanic对象
app = Sanic("some_name")


class SimpleView(HTTPMethodView):

    def get(self, request):
        return text("I am get method")

    def post(self, request):
        return text("I am post method")

    def put(self, request):
        return text("I am put method")

    def patch(self, requets):
        return text("I am patch method")

    def delete(self, request):
        return text("I am delete method")


class SimpleAsyncView(HTTPMethodView):
    async def get(self, request):
        return text("I am async get method")
    
    async def post(self, request):
        return text("I am async post method")
    
    async def put(self, request):
        return text("I am async put method")
    
    
app.add_route(SimpleView.as_view(), "/")  # 注册视图类(后边加访问的路由)
app.add_route(SimpleAsyncView.as_view(), "/async")  # 注册视图类(后边加访问的路由)

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

1.3,URL跳转

from sanic import Sanic
from sanic import response

app = Sanic(__name__)


# 同步函数的跳转
@app.route("/")
def handle_response(request):
    return response.redirect("/redirect")


# 异步处理函数到的跳转
@app.route("/redirect")
async def test(request):
    return response.json({"Redirected": True})

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

1.4,redirection URL

Sanic提供了一种跳转页面的简单方法叫作url_for,它将会独一无二的url名字作为参数并且返回给你分配实际的路由地址.,有了url_for这个方法将帮助我们简化在不同应用跳转中请求的影响,

from sanic import Sanic
from sanic import response

app = Sanic(__name__)


@app.route("/")
async def index(request):
    # 用post_handler生成url端
    url = app.url_for("post_handler", post_id=5)
    # 提升成的url最后跳转的页面变成了"/post/5"
    return response.redirect(url)


@app.route("/posts/<post_id>")
async def post_handler(request, post_id):
    return response.text("Post- {}".format(post_id))
                         
if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

1.5,蓝图

Sanic提供了一个惊人的特性,可以将您的API和路由分组到一个逻辑集合中,这个集合可以很容易地导入并插入到您的任何Sanic应用程序中,它被称为蓝图。

from sanic import Blueprint, Sanic
from sanic.response import file,json
# 实例化Sanic对象
app = Sanic(__name__)
blueprint = Blueprint("name", url_prefix="/my_blueprint")
blueprint2 = Blueprint("name2", url_prefix="/my_blueprint2")
blueprint3 = Blueprint("name3", url_prefix="/my_blueprint3")


# blueprint蓝图的路由
@blueprint.route("/foo")
async def foo(request):
    return json({"msg": "hi from blueprint"})


# blueprint2蓝图的路由
@blueprint2.route("/foo")
async def foo2(request):
    return json({"msg": "hi from blueprint2"})


# blueprint3蓝图的路由
@blueprint3.route("/foo")
async def index(request):
    return await file("websocket.html")


# websocket的路由
@app.websocket("/foo")
async def foo3(request, ws):
    while True:
        data = "hello!"
        print("Sending:" + data)
        await ws.send(data)
        data = await ws.recv()
        print("Received:" + data)
# 注册蓝图
app.blueprint(blueprint)  
app.blueprint(blueprint2)
app.blueprint(blueprint3)

app.run(host="0.0.0.0", port=8000)

1.6,日志功能增强

from sanic import Sanic
from sanic import response
import logging

logging_format = "[%(asctime)s] % (process)d-%(Levelname)s"
logging_format += "%(module)s::%(funcName)s():1%(Lineno)d:"
logging_format += "%(message)s"

logging.basicConfig(
    format=logging_format,
    level=logging.DEBUG
)
log = logging.getLogger()

# 设置日志去重写默认的配置
sanic = Sanic()


@sanic.route("/")
def test(request):
    log.info("received request; responding with 'hey'")
    return response.text("hey")


sanic.run(host="0.0.0.0", port=8000)

下面的示例代码演示了sanic.app.sanic.middleware()的用法,以便提供一种机制为每个传入请求分配一个唯一的请求ID,并通过aiotask上下文记录它们

"""
基于https://github.com/skyscanner/aiotask-context中的示例
和“examples/override logging,run.py”。
需要https://github.com/skyscanner/aiotask-context/tree/52efbc21e2def2d52abb9a8e951f3ce5e6f690或更高版本
$pip安装git+https://github.com/skyscanner/aiotask-context.git
"""
import asyncio
import uuid
import logging
from signal import signal, SIGINT
from sanic import Sanic
from sanic import response
import uvloop  # windows下不支持uvloop
import aiotask_content as context

log = logging.getLogger(__name__)


class RequestIdFilter(logging.Filter):
    def filter(self, record):
        record.request_id = context.get("X-Request-ID")
        return True


LOG_SETTINGS = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'DEBUG',
            'formatter': 'default',
            'filters': ['requestid'],
        },
    },
    'filters': {
        'requestid': {
            '()': RequestIdFilter,
        },
    },
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s:%(lineno)d %(request_id)s | %(message)s',
        },
    },
    'loggers': {
        '': {
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True
        },
    }
}
app = Sanic(__name__, log_config=LOG_SETTINGS)
@app.route("/")
async def test(request):
    log.debug("X-Request-ID: %s", context.get("X-Request-ID"))
    log.info("Hello from test!")
    return response.json({"test": True})

if __name__ == '__main__':
    asyncio.set_event_loop(uvloop.new_event_loop())
    server = app.create_server(host="0.0.0.0", port=8000)
    loop = asyncio.get_event_loop()
    loop.set_task_factory(context.task_factory)
    task = asyncio.ensure_future(server)
    try:
        loop.run_forever()
    except:
        loop.stop()

1.7,sanic流支持

Sanic框架内置了对大文件流的支持,下面的代码解释了使用流支持设置SANIC应用程序的过程。

from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text

bp = Blueprint("blueprint_request_stream")
app = Sanic("request_stream")


class SimpleView(HTTPMethodView):
    
    @stream_decorator
    async def post(self, request):
        result = ""
        while 1:
            body = await request.stream.get()
            if not body:
                break
            result += body.decode("utf-8")
        return text(result)
    
    
@app.post("/stream", stream=True)
async def handler(request):
    async def streaming(response):
        while 1:
            body = await request.stream.get()
            if not body:
                break
            body = body.decode("utf-8").replace("1", "A")
            await response.write(body)
    return stream(streaming)


@bp.put("/bp_stream", stream=True)
async def bp_handler(request):
    result = ""
    while 1:
        body = await request.stream.get()
        if not body:
            break
        result += body.decode("utf-8").replace("1", "A")
    return text(result)


async def post_handler(request):
    result = ""
    while 1:
        body = await request.stream.get()
        if not body:
            break
        result += body.decode("utf-8")
    return text(result)


app.blueprint(bp)
app.add_route(SimpleView.as_view(), "/method_view")
view = CompositionView()
view.add(["POST"], post_handler, stream=True)
app.add_route(view, "/composition_view")

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

简单的客户端展示使用客户端代码流应用(测试一下代码)

import requests
# 警告:这是一个繁重的进程
data = ""
for i in range(1, 250000):
    data += str(i)
    
r = requests.post("http://0.0.0.0:8000/stream", data=data)

print(r.text)

1.8,sanic的并发支持 

SANIC支持通过多个工作人员支持启动应用程序。但是,为了确保高效的执行,能够限制每个进程/循环的并发性是很重要的。下面的代码部分提供了一个简单的示例,说明如何在asyncio.semaphore的帮助下限制并发性

from sanic import Sanic
from sanic.response import json

import asyncio
import aiohttp
app = Sanic(__name__)

sem = None


@app.listener("before_server_start")
def init(sanic, loop):
    global sem
    concurrency_per_work = 4
    sem = asyncio.Semaphore(concurrency_per_work,loop=loop)


async def bounded_fetch(session, url):
    # 使用session对象执行url的get请求
    async with sem, session.get(url) as response:
        return await response.json()
    
    
@app.route("/")
async def test(request):
    # 下载并且提供json例子的服务
    url = "http://api.github.com/repos/channelcat/sanic"
    
    async with aiohttp.ClientSession() as session:
        response = await bounded_fetch(session, url)
        return json(response)
    
app.run(host="0.0.0.0", port=8000, workers=2)
原文地址:https://www.cnblogs.com/ljc-0923/p/10392092.html