FastAPI安全系列(二) 基于Password和Bearer Token的OAuth2 .0认证

一、获取username和password

 后台获取前台提交的username和password,可以使用FastAPI的安全实用性工具获取username和password。

 OAuth2规定客户端必需将username和password字段作为表单数据发送(不可使用JSON)。而且规范了字段必须这样命名。

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    pass

OAuth2PasswordRequestForm是一个类依赖项,声明了如下的请求表单:

  • username
  • password
  • 可选scope字段,是一个由空格分隔的字符串组成的大字符串
  • 可选的grant_type字段
  • 可选的client_id字段
  • 可选的 client_secret字段

现在使用表单中的username从(伪)数据库中获取用户数据,如果没有这个用户就返回一个错误。若是获取到这个用户,先进行Pydantic UserInDB模型,然后进行密码密文校验。

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# 伪数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


def fake_hash_password(password: str):
    return "fakehashed" + password


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)  # 从数据库中取出用户信息
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    user = UserInDB(**user_dict)  # 进行用户模型校验
    hashed_password = fake_hash_password(form_data.password)  # 将表单密码进行hash加密
    if not hashed_password == user.hashed_password:  # 表单加密后的密码与数据库中的密码进行比对
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    return {"access_token": user.username, "token_type": "bearer"}

最后返回一个JSON数据的token令牌,该JSON对象包含:

  •  token_type   因为实例中使用了Bearer令牌,所以令牌类型为Bearer
  • access_token   令牌字符串

二、获取活跃用户信息

这里已经完成了令牌的获取功能,与之前的获取当前用户的功能进行合并:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

oauth2_schema = OAuth2PasswordBearer(tokenUrl="/token") # 执行生成token的地址

# 伪数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


def fake_hash_password(password: str):
    return "fakehashed" + password


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)  # 从数据库中取出用户信息
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    user = UserInDB(**user_dict)  # 进行用户模型校验
    hashed_password = fake_hash_password(form_data.password)  # 将表单密码进行hash加密
    if not hashed_password == user.hashed_password:  # 表单加密后的密码与数据库中的密码进行比对
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    return {"access_token": user.username, "token_type": "bearer"}


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token: str):
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_schema)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authenticate credentials",
            headers={"WWW-Authentiacate": "Bearer"}
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
    return current_user


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

在生成token的基础上进行活跃用户信息的获取:

  • 首先需要将生成token的地址进行指向实际生成token的路径操作函数
  • 获取活跃用户请求时就会去获取认证的token
  • 一旦获取到token就与正常的API请求一样

具体的表现是在交互文档中:

  • 先点击右上角的Authorize按钮进行认证,此时后台会生成一个JSON对象,内含access_token字段
  • 前台获取到 这个access_token,将其存到某处
  • 点击下面的路径操作,如获取活跃用户的API,会在请求头中携带该token进行认证,以及获取相应的用户信息
作者:iveBoy
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/shenjianping/p/14868391.html