sanic官方文档解析之Example(二)

1,通过docker部署sanic项目

通过Docker和Docker Compose部署SANIC应用程序是一项很容易实现的任务,下面的示例提供了示例simple_server.py的部署

FROM python:3.5
MAINTAINER Channel Cat <channelcat@gmail.com>

ADD . /code
RUN pip3 install git+https://github.com/channelcat/sanic

EXPOSE 8000

WORKDIR /code

CMD ["python", "simple_server.py"]
version: '2'
services:
  sanic:
    build: .
    ports:
      - "8000:8000"

2,监控和错误处理

Sanic通过sanic.handlers.errorhandler提供全局异常处理程序的可扩展的最低限度的实现。此示例演示如何扩展它以启用某些自定义行为

"""
使用SANIC的错误处理程序框架截取未捕获异常的示例。
这可能对希望使用哨兵、气闸等的开发者有用。
或者自定义系统来记录和监视生产中的意外错误。
首先,我们创建自己的类继承自sanic.exceptions中的处理程序,
当我们创建我们的理智的实例时,传递它的一个实例。在这里面
类的默认处理程序,我们可以执行任何操作,包括将异常发送到
外部服务
"""
from sanic.handlers import ErrorHandler
from sanic.exceptions import SanicException
"""
与CustomHandler类相关的导入和代码

(通常情况下,这将在单独的文件中)
"""


class CustomHandler(ErrorHandler):

    def default(self, request, exception):
        # 在这里,我们可以访问异常对象
        # 并且可以用它做任何事情(日志、发送到外部服务等)
        if not isinstance(exception, SanicException):
            print(exception)
        # 那么,我们必须通过返回来完成异常处理
        # 我们对客户的回应
        # 为此,我们可以调用超级类的默认处理程序
        return super().default(request, exception)


"""
这是一个普通的sanic服务器,除了
服务器的错误处理程序到CustomHandler的一个实例
"""

from sanic import Sanic

app = Sanic(__name__)

handler = CustomHandler()
app.error_handler = handler


@app.route("/")
async def test(request):
    # 这里,发生了一些导致意外异常的事情
    # 这个异常将流向我们的自定义处理程序。
    raise SanicException("You Broke It!")

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=True)

3,使用外部服务提供监控

import logging
import socket
from os import getenv
from platform import node
from uuid import getnode as get_mac

from logdna import LogDNAHandler
from sanic import Sanic
from sanic.response import json
from sanic.request import Request

log = logging.getLogger("logdna")
log.setLevel(logging.INFO)


def get_my_ip_address(remote_server="google".com):
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.connect((remote_server, 80))
        return s.getsockname()[0]


def get_mac_address():
    h = iter(hex(get_mac())[2:].zfill(12))
    return ":".join(i + next(h) for i in h)


logdna_options = {
    "app": __name__,
    "index_meta": True,
    "hostname": node(),
    "ip": get_my_ip_address(),
    "mac": get_mac_address()
}

logdna_handler = LogDNAHandler(getenv("LOGDNA_API_KEY"), options=logdna_options)

logdna = logging.getLogger(__name__)
logdna.setLevel(logging.INFO)
logdna.addHandler(logdna_handler)

app = Sanic(__name__)


@app.middleware
def log_request(request: Request):
    logdna.info("I was Here with a new Request to URL: {}".format(request.url))


@app.route("/")
def default(request):
    return json({
        "response": "I was here"
    })


if __name__ == "__main__":
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )

4,RayGun

from os import getenv

from raygun4py.raygunprovider import RaygunSender

from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.handlers import ErrorHandler


class RaygunExceptionReporter(ErrorHandler):

    def __init__(self, raygun_api_key=None):
        super().__init__()
        if raygun_api_key is None:
            raygun_api_key = getenv("RAYGUN_API_KEY")

        self.sender = RaygunSender(raygun_api_key)

    def default(self, request, exception):
        self.sender.send_exception(exception=exception)
        return super().default(request, exception)


raygun_error_reporter = RaygunExceptionReporter()
app = Sanic(__name__, error_handler=raygun_error_reporter)


@app.route("/raise")
async def test(request):
    raise SanicException('You Broke It!')


if __name__ == '__main__':
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )

5,Rollbar

import rollbar

from sanic.handlers import ErrorHandler
from sanic import Sanic
from sanic.exceptions import SanicException
from os import getenv

rollbar.init(getenv("ROLLBAR_API_KEY"))


class RollbarExceptionHandler(ErrorHandler):

    def default(self, request, exception):
        rollbar.report_message(str(exception))
        return super().default(request, exception)


app = Sanic(__name__, error_handler=RollbarExceptionHandler())


@app.route("/raise")
def create_error(request):
    raise SanicException("I was here and I don't like where I am")


if __name__ == "__main__":
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )

6,Sentry

from os import getenv

from sentry_sdk import init as sentry_init
from sentry_sdk.integrations.sanic import SanicIntegration

from sanic import Sanic
from sanic.response import json

sentry_init(
    dsn=getenv("SENTRY_DSN"),
    integrations=[SanicIntegration()],
)

app = Sanic(__name__)


# noinspection PyUnusedLocal
@app.route("/working")
async def working_path(request):
    return json({
        "response": "Working API Response"
    })


# noinspection PyUnusedLocal
@app.route("/raise-error")
async def raise_error(request):
    raise Exception("Testing Sentry Integration")


if __name__ == '__main__':
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )

7,安全

下面的示例代码展示一个简单的基于认证和授权机制的装饰器,且在你的sanicAPI端设置安全

# -*- coding: utf-8 -*-

from sanic import Sanic
from functools import wraps
from sanic.response import json

app = Sanic()


def check_request_for_authorization_status(request):
    # 注意:定义检查,例如cookie、会话.
    flag = True
    return flag


def authorized():
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            # 运行检查请求的方法
            # 客户授权状态
            is_authorized = check_request_for_authorization_status(request)

            if is_authorized:
                # 用户被授权.
                # 运行处理程序方法并返回响应
                response = await f(request, *args, **kwargs)
                return response
            else:
                #用户未经授权
                return json({'status': 'not_authorized'}, 403)
        return decorated_function
    return decorator


@app.route("/")
@authorized()
async def test(request):
    return json({'status': 'authorized'})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

8,sanic的Websocket(提供了一种增加路由的方法)

<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket demo</title>
    </head>
    <body>
        <script>
            var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'),
                messages = document.createElement('ul');
            ws.onmessage = function (event) {
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode('Received: ' + event.data);
                message.appendChild(content);
                messages.appendChild(message);
            };
            document.body.appendChild(messages);
            window.setInterval(function() {
                data = 'bye!'
                ws.send(data);
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode('Sent: ' + data);
                message.appendChild(content);
                messages.appendChild(message);
            }, 1000);
        </script>
    </body>
</html>
from sanic import Sanic
from sanic.response import file

app = Sanic(__name__)


@app.route('/')
async def index(request):
    return await file('websocket.html')


@app.websocket('/feed')
async def feed(request, ws):
    while True:
        data = 'hello!'
        print('Sending: ' + data)
        await ws.send(data)
        data = await ws.recv()
        print('Received: ' + data)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

9主机托管:

from sanic import response
from sanic import Sanic
from sanic.blueprints import Blueprint

# Usage
# curl -H "Host: example.com" localhost:8000
# curl -H "Host: sub.example.com" localhost:8000
# curl -H "Host: bp.example.com" localhost:8000/question
# curl -H "Host: bp.example.com" localhost:8000/answer

app = Sanic()
bp = Blueprint("bp", host="bp.example.com")


@app.route('/', host=["example.com",
                      "somethingelse.com",
                      "therestofyourdomains.com"])
async def hello(request):
    return response.text("Some defaults")


@app.route('/', host="sub.example.com")
async def hello(request):
    return response.text("42")


@bp.route("/question")
async def hello(request):
    return response.text("What is the meaning of life?")


@bp.route("/answer")
async def hello(request):
    return response.text("42")

app.blueprint(bp)

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

10,支持并行测试运行的单元测试

下面的示例演示如何使用Pytest xdist插件提供的并行测试执行支持来启动和运行单元测试SANIC应用程序

"""

SANIC服务器的pytest xdist示例

安装测试工具:

$pip安装pytest pytest xdist

使用xdist参数运行:

$pytest示例/pytest_xdist.py-n 8 8名工人

"""
import re
from sanic import Sanic
from sanic.response import text
from sanic.testing import PORT as PORT_BASE, SanicTestClient
import pytest


@pytest.fixture(scope="session")
def test_port(worker_id):
    m = re.search(r'[0-9]+', worker_id)
    if m:
        num_id = m.group(0)
    else:
        num_id = 0
    port = PORT_BASE + int(num_id)
    return port


@pytest.fixture(scope="session")
def app():
    app = Sanic()

    @app.route('/')
    async def index(request):
        return text('OK')

    return app


@pytest.fixture(scope="session")
def client(app, test_port):
    return SanicTestClient(app, test_port)


@pytest.mark.parametrize('run_id', range(100))
def test_index(client, run_id):
    request, response = client._sanic_endpoint_test('get', '/')
    assert response.status == 200
    assert response.text == 'OK'

11,修改请求对象

 

Sanic中的请求对象是一种dict对象,这意味着request对象可以作为常规dict对象进行操作。

from sanic import Sanic
from sanic.response import text
from random import randint

app = Sanic()


@app.middleware('request')
def append_request(request):
    # Add new key with random value
    request['num'] = randint(0, 100)


@app.get('/pop')
def pop_handler(request):
    # Pop key from request object
    num = request.pop('num')
    return text(num)


@app.get('/key_exist')
def key_exist_handler(request):
    # Check the key is exist or not
    if 'num' in request:
        return text('num exist in request')

    return text('num does not exist in reqeust')


app.run(host="0.0.0.0", port=8000, debug=True)
原文地址:https://www.cnblogs.com/ljc-0923/p/10392095.html