【FastAPI 学习 十】使用Redis

在FastAPI中 使用Redis

本来是想直接使用redsi的, 但是查阅资料都是使用aioredis, 所谓一步异步就得处处异步

FastAPI官网关于异步的解释描述 https://fastapi.tiangolo.com/async/
建议要使用FastAPI的人,都看看作者关于异步的描述

思路

redis_cli对象挂载到FastAPI app 对象上面,然后在视图函数中使用默认的回调参数request对象获取
参考链接
https://github.com/tiangolo/fastapi/issues/1694
https://github.com/tiangolo/fastapi/issues/1742
https://github.com/leonh/redis-streams-fastapi-chat/blob/master/chat.py

测试代码

安装aioredis

pip intsall aioredis

完整代码,新建一个test_aioredis.py文件

from aioredis import create_redis_pool, Redis

from fastapi import FastAPI, Request, Query

app = FastAPI()


async def get_redis_pool() -> Redis:
    redis = await create_redis_pool(f"redis://:root12345@172.16.137.129:6379/0?encoding=utf-8")
    return redis


@app.on_event('startup')
async def startup_event():
    """
    获取链接
    :return:
    """
    app.state.redis = await get_redis_pool()


@app.on_event('shutdown')
async def shutdown_event():
    """
    关闭
    :return:
    """
    app.state.redis.close()
    await app.state.redis.wait_closed()


@app.get("/test", summary="测试redis")
async def test_redis(request: Request, num: int=Query(123, title="参数num")):
    # 等待redis写入  await异步变同步 如果不关心结果可以不用await,但是这里下一步要取值,必须得先等存完值 后再取值
    await request.app.state.redis.set("aa", num)
    # 等待 redis读取
    v = await request.app.state.redis.get("aa")
    print(v, type(v))
    return {"msg": v}


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app='test_aioredis:app', host="127.0.0.1", port=8080, reload=True, debug=True)

上面只是一个文件, 如何在项目中组织了? 很多博客,都是单文件演示,不够友好。

FastAPI项目中组织

仿照 flask 注册挂载redis

def create_app():
    """
    生成FatAPI对象
    :return:
    """
    app = FastAPI()

    # 其余的一些全局配置可以写在这里 多了可以考虑拆分到其他文件夹
    # 跨域设置
    # 注册路由
    # 注册捕获全局异常
    # 请求拦截

    # 挂载redis
    register_redis(app)
    return app

def register_redis(app: FastAPI) -> None:
    """
    把redis挂载到app对象上面
    :param app:
    :return:
    """

    @app.on_event('startup')
    async def startup_event():
        """
        获取链接
        :return:
        """
        app.state.redis = await create_redis_pool(settings.REDIS_URL)

    @app.on_event('shutdown')
    async def shutdown_event():
        """
        关闭
        :return:
        """
        app.state.redis.close()
        await app.state.redis.wait_closed()

使用

这个就和上面例子一样,直接使用。

@app.get("/test", summary="测试redis")
async def test_redis(request: Request, num: int=Query(123, title="参数num")):
    # 等待redis写入  await异步变同步 如果不关心结果可以不用await,但是这里下一步要取值,必须得先等存完值 后再取值
    await request.app.state.redis.set("aa", num)
    # 等待 redis读取
    v = await request.app.state.redis.get("aa")
    print(v, type(v))
    return {"msg": v}

总结

最后推荐每个使用FastAPI的开发者都看下这个
FastAPI官网关于异步的解释描述 https://fastapi.tiangolo.com/async/

话说这个async await语法糖和js ES6特别像

具体完整GitHub代码

见个人网站https://www.charmcode.cn/article/2020-07-29_fastapi_redis

原文地址:https://www.cnblogs.com/CharmCode/p/14191119.html