0%

FastAPI自学教程(39) - OAuth2密码哈希与JWT令牌验证

1. 基础集成配置

1
2
3
4
5
6
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
  • 使用OAuth2PasswordBearer创建OAuth2密码流认证方案,自动处理Authorization头的Bearer Token
  • 密钥生成:推荐使用openssl rand -hex 32生成高强度密钥
  • JWT配置:设置HS256签名算法与30分钟令牌有效期

流程图:

graph TD
    A[客户端请求] --> B{携带有效JWT?}
    B -->|是| C[解码验证签名]
    B -->|否| D[返回401错误]
    C --> E{令牌未过期?}
    E -->|是| F[提取用户信息]
    E -->|否| D
    F --> G[执行路径操作]

2. 密码哈希处理

1
2
3
4
5
6
7
8
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_pwd, hashed_pwd):
return pwd_context.verify(plain_pwd, hashed_pwd)

def get_password_hash(password):
return pwd_context.hash(password)
  • 使用passlib库的Bcrypt算法实现密码哈希
  • 哈希验证流程:
    1. 用户注册时生成hashed_password存入数据库
    2. 登录时通过verify_password比对明文密码与哈希值
    3. 防止密码明文泄露,即使数据库被盗也无法逆向破解

3. JWT令牌生成与验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime, timedelta, timezone
import jwt

def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
return get_user(fake_db, username)
except InvalidTokenError:
raise HTTPException(401, "无效令牌")
  • 令牌生成:包含用户标识(sub)、签发时间(iat)、过期时间(exp)等标准声明
  • 验证机制:自动检查签名有效性、过期时间和用户状态
  • 用户注入:通过依赖项将解码后的用户对象注入路径操作函数

4. 完整认证流程实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_db, form_data.username, form_data.password)
if not user:
raise HTTPException(401, "用户名或密码错误")
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
  • 登录流程:验证凭证 → 生成JWT → 返回Bearer令牌
  • 用户状态检查:通过get_current_active_user验证用户是否激活
  • OpenAPI集成:自动生成Swagger UI的Authorize按钮和测试表单

5. 生产环境注意事项

  1. 密钥管理:使用环境变量存储密钥,禁止硬编码
  2. 传输安全:必须启用HTTPS防止中间人攻击
  3. 令牌存储:客户端应使用HttpOnly Secure Cookie存储
  4. 刷新令牌:建议实现令牌刷新机制,减少长期令牌风险
  5. 监控审计:记录异常登录尝试和令牌验证失败事件

测试命令示例:

1
2
3
4
5
# 获取令牌
curl -X POST -d "username=johndoe&password=secret" http://localhost:8000/token

# 访问受保护端点
curl -H "Authorization: Bearer YOUR_TOKEN" http://localhost:8000/users/me