Fastapi之OAuth2登录认证

登录认证概述

登录是很多系统的基本功能, 有些页面(如用户信息页面)需要登录之后才能进行访问. 实现这一功能的方案大体为:

  1. 首先进行登录, 登录成功后, 给前端(浏览器)返回一个值"xxxx"(session或者token)
  2. 前端(浏览器)去访问需要登录的页面(如用户信息页面)时, 会带上上面值"xxxx"
  3. (后端)服务器根据传入的值"xxxx"获取到这个值对应的用户是哪一个, 那么就返回这个用户的信息

上面的方案再根据 返回值"xxx" 的方式不同, 可以细分为两种:

  • session:
  1. 用户登录后, 在服务器这一方会生成一个随机值session_id, 把session_id和用户的唯一标识(如user_id)映射起来保存, 可以保存在数据库中或者内存中, 然后给前端返回这个session_id
  2. 前端下次请求时, 带上这个session_id, 后端可以根据这个session_id找到对应的user_id, 即可知道请求的是哪个用户的信息了

但是这种方式由于需要服务器端保存这个session_id, 因此会衍生出一些问题, 如:

a. 保存session_id需要耗费服务器资源

b. 当业务量比较大需要多台机器进行负载均衡, 统一提供服务时, 多台机器的session_id需要进行同步, 否则跨机器访问时就获取不到用户了

于是就慢慢发展出了第二种方式: token

  • token:
  1. 用户登录后, 服务器端根据user_id和秘钥(盐值)进行签名加密, 直接返回给前端(浏览器), 不进行保存操作
  2. 前端下次请求时, 带上这个token, 服务器端对这个token进行解密, 获取到解密后的user_id, 即可知道请求的是哪个用户的信息了

基于token的登录认证解决了基于session方式带来的问题, 且扩张性更强, 已经是现在的首选方案了

OAuth2

既然采用了上述的token方案, 那么可以考虑的再具体一点, 如何进行token的加密和解密? 后端返回token的格式是什么? 前端访问时token是放在url参数中, 还是请求头中或者请求体中? 前端携带token访问时的格式是什么?

OAuth2就是对上述具体问题的一套规范的解决方案, 当然它不只是解决上面的问题, 也可以解决第三方应用的授权问题等.

在FastAPI中, 提供了多种认证解决方案工具, 其中也包括了OAuth2, 可以使用OAuth2PasswordBearer类来实现OAuth2的功能, 使用的是OAuth2中的一种认证方案, 通过bearer token来携带token, 具体做法就是:

在请求头中添加参数Authorization, 其值为Bearertoken中间使用空格连接形成的字符串, 如Bearer your_token_string, 注意, AuthorizationBearer都是规范中固定的写法, 不可修改

OAuth2PasswordBearer

简单使用如下:

from fastapi import FastAPI, Depends
# 导入OAuth2PasswordBearer
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
# 实例化oauth2, tokenUrl暂时随便给一个值, 后面会讲解其用法
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='xxx')

# 定义一个API, 这个API需要登录后才可以进行访问
# 那么就需要设置一个参数, 这个参数依赖于上面的oauth2_scheme
@app.get('/')
async def test(s: str = Depends(oauth2_scheme)):
    return {'hello': s}

启动项目, 来到FastAPI的文档页面http://127.0.0.1:8000/docs

image-20210516082837964

可以看到多了这个testapi后面多了一把锁, 直接执行访问这个api, 返回401错误

image-20210516083202640

在响应头中, 也可以看到需要Bearer认证
点击fastapi文档网页中的锁, 会弹出一个账户密码的输入框, 这是fastapi文档集成的一个类似于登录页面的界面

image-20210516205223985

在这个界面可以看到Token URLxxx, 即为上面实例化OAuth2PasswordBearer时传入的tokenUrl参数

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='xxx')

这个参数的作用是当输入完用户密码后, 点击Authorize按钮后, 会请求127.0.0.1:8000/xxx这个网址, 把用户密码通过Form表单的形式传递给这个请求, 即这个/xxx就是我们网站的后台登录接口.

一般登录接口我们可以通过json格式将用户名密码放在请求体中传入, 但是OAuth2规范中要求用户名密码需要通过Form表单格式application/x-www-form-urlencoded传入, 当然项目中可以结合实际情况来决定通过哪种形式来传递用户名密码.

我们随便输入用户名密码, 点击Authorize按钮, 发现它报错说Auth Error Error: Not Found, 这是因为我们还没有实现这个表单格式的登录接口, 查看后台日志可以看到确实也请求了/xxx这个接口

INFO:     127.0.0.1:53600 - "POST /xxx HTTP/1.1" 404 Not Found

也就是说这个参数tokenUrl只是为了方便fastapi的文档网页的认证使用, 在前后端实际项目中并不会起到作用. 我们这里先跳过这个登录接口的实现, 继续回到接口认证的逻辑.我们这里先跳过这个登录接口的实现, 继续回到接口认证的逻辑.

前面试了直接访问需要登录才能访问的接口时, 会报401错误, 是因为我们还没有带上认证的信息, 而OAuth2 Bearer的认证信息需要放到请求头中,

由于在fastapi文档网页中不能修改请求头, 我们可以来到postman或者其他api调试工具中, 例如, 在postman中, 提供了一个专门的认证选项(Authorization), 提供了一些常见的认证可供选择

image-20210516083824792

这里我们选择Bearer Token认证, 在右边会出现Token的输入框, 我们随便输入一个值, 点击发送, 可以看到返回的不再是401, 而是接口的正常返回值

image-20210516094248990

我们也可以来到请求头的标签页, 可以看到postman自动添加了一个参数Authorization, 其值为Bearer hahahadadw, 说明在前面的认证选项中设置了认证后, postman会自动帮我们转换成对应实际格式

我们如果不使用postman的认证选项, 自己手动在请求头中添加相同的Authorization参数, 也是可以正常访问的

image-20210516094200193

回顾一下我们的api代码

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='xxx')

@app.get('/')
async def test(s: str = Depends(oauth2_scheme)):
    return {'hello': s}

结合输出, 我们可以看到oauth2_scheme帮我们做的事情就是把请求头中的Authorization中的token字符串提取出来, 并返回, api参数中的s接收这个返回值

函数中接下来的操作就是首先对这个s值进行验证, 如果s是有效的, 即token是正确的, 那么就继续进行后续的逻辑, 如果token不正确, 则返回错误信息.

那么这个token的生成规则和验证规则, 不属于OAuth2的范畴, 我们一般使用JWT

JWT

JWT全称是Json Web Token, 具体原理可以自行百度或者查看我的博客:

python-JWT(Json Web Token)-pyjwt - Alex-GCX - 博客园 (cnblogs.com)

在python中我们一般使用pyjwt这个包操作JWT, 我上面的博客使用的就是这个包, Fastapi官网使用的是python-jose这个包, 因为它提供了 PyJWT 的所有功能,以及之后与其他工具进行集成时你可能需要的一些其他功能。

Python-jose需要一个额外的加密后端。这里我们使用的是推荐的后端:pyca/cryptography

pip install python-jose[cryptography]

登录接口

token一般都是在登录之后生成的, 因此我们需要做一个简单的登录接口, 在登录接口中生成jwt token, 这里我们还是使用请求体json形式传入用户名和密码

from datetime import datetime, timedelta

from fastapi import FastAPI, Depends, Body
from typing import Optional
from jose import JWTError, jwt

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "ed970259a19edfedf1010199c7002d183bd15bcaec612481b29bac1cb83d8137"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def get_user_id(user_name: str, password: str):
    return 123

def create_jwt_token(data: dict, expire_delta: Optional[timedelta] = None):
    # 如果传入了过期时间, 那么就是用该时间, 否则使用默认的时间
    expire = datetime.utcnow() + expire_delta if expire_delta else datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 需要加密的数据data必须为一个字典类型, 在数据中添加过期时间键值对, 键exp的名称是固定写法
    data.update({'exp': expire})
    # 进行jwt加密
    token = jwt.encode(claims=data, key=SECRET_KEY, algorithm=ALGORITHM)
    return token

@app.post('/login/')
async def login(user_name: str = Body(...), password: str = Body(...)):
    # 校验用户密码逻辑暂时省略, 这里我们不校验, 认为都是用户密码都是对的, 返回一个固定user_id
    user_id = get_user_id(user_name, password)
    # 使用user_id生成jwt token
    data = {'user_id': user_id}
    token = create_jwt_token(data)
    return {'token': token}

登录接口为/login/

  1. 接受两个参数user_namepassword, 这两个参数从请求体中传入
  2. 通过get_user_id函数获取对应的user_id, 这里简单返回一个固定值
  3. user_id创建成一个字典, 调用create_jwt_token生成jwt token
  4. 返回token给前端

create_jwt_token方法中介绍了jwt的简单使用:

  1. 加密的数据必须为字典类型, 如果需要设置token的有效时间, 那么需要在字典中添加一个键值对, 键名为固定的exp, 值为有效期的截止时间, 是一个日期类型.
  2. 使用jwt.encode()方法进行加密, 该方法需要传入三个参数:
    • claims: 需要加密的字典类型的数据
    • key: 加密需要使用的秘钥, 也叫做盐值
    • algorithm: 加密的算法, 默认为ALGORITHMS.HS256

在fastapi的文档页面进行测试, 在请求体中随便输入一个用户名密码, 能够返回一个token字符串

image-20210516171922691

验证前端传入的token

在上面OAuth2PasswordBearer这一节中, 定义了一个需要用户登录才能访问的接口

@app.get('/')
async def test(s: str = Depends(oauth2_scheme)):
    return {'hello': s}

接下来需要对其进行改造, 添加验证token的逻辑

from fastapi import FastAPI, Depends, Body, HTTPException, status
from jose import JWTError, jwt

@app.get('/')
async def test(token: str = Depends(oauth2_scheme)):
    # 定义一个验证异常的返回
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="认证失败",
        # 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
        headers={'WWW-Authenticate': "Bearer"}
    )
    # 验证token
    try:
        # 解密token, 返回被加密的字典
        payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
        print(f'payload: {payload}')
        # 从字典中获取user_id数据
        user_id = payload.get('user_id')
        print(f'user_id: {user_id}')
        # 若没有user_id, 则返回认证异常
        if not user_id:
            raise credentials_exception
    except JWTError as e:
        print(f'认证异常: {e}')
        # 如果解密过程出现异常, 则返回认证异常
        raise credentials_exception
    # 解密成功, 返回token中包含的user_id
    return {'hello': user_id}

使用jwt.decode()进行解密, 获取加密前的数据payload, 在payload中获取user_id

在postman中进行测试, 首先再随便传入一个token

image-20210516173804331

可以看到返回认证失败, 后台打印结果为

认证异常: Not enough segments

说明token的格式是错误的, 这次传入上面登录后生成的token

image-20210516174346787

可以看到正常返回了结果, 后台打印为:

payload: {'user_id': 123, 'exp': 1621159181}
user_id: 123

至此就完成了最简单的OAuth2登录认证流程

小结

  1. 登录认证分为基于session和基于token, 目前最常用的是基于token的方式

  2. OAuth2中有一个基于token认证的规范, 常用的是Bearer token方式, 即在请求头中添加参数Authorization, 其值为Bearertoken中间使用空格连接形成的字符串, 如Bearer your_token_string, 注意, AuthorizationBearer都是规范中固定的写法, 不可修改

  3. Fastapi中集成了OAuth的Bearer方式, 通过OAuth2PasswordBearer类实现, 这个类主要做的就是把请求头中的token信息提取出来, 具体对token的加密和解密需要我们自己实现

  4. 一般使用JWTtoken进行加密jwt.encode(claims=data, key=SECRET_KEY, algorithm=ALGORITHM)和解密jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])

  5. 在OAuth2中还有一些规范, 如

    • 登录接口传入用户名和密码时, 不能使用json格式, 而是使用表单形式application/x-www-form-urlencoded进行提交, 并且用户名必须为username, 密码必须为password, fastapi官网的案例也是使用表单形式提交, 需要安装python-multipart这个包. 在登录接口参数中, 使用了OAuth2PasswordRequestForm这个类接收用户名和密码
    • 登录接口返回token时的json格式为: {"access_token": xxxxx, "token_type": "bearer"}
    • 如果token验证失败, 需要在请求头中添加键值对: {'WWW-Authenticate': "Bearer"}
    • 这些规范我们可以看实际情况选择是否遵守

    下面是官方案例的简单登录接口:

    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    
    @app.post("/token")
    async def login(form_data: OAuth2PasswordRequestForm = Depends()):
        # 从form_data中获取用户名和密码
        user_name = form_data.username
        password = form_data.password
        # 校验用户名和密码校验是否正确
        .....
    	# 返回token
        return {"access_token": 'xxxxxx', "token_type": "bearer"}
    
原文地址:https://www.cnblogs.com/gcxblogs/p/14774897.html