1. 基础集成配置
1 2 3 4 5 6
| from fastapi.security import OAuth2PasswordBearer oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
- 使用
OAuth2PasswordBearer创建OAuth2密码流认证方案,自动处理Authorization头的Bearer Token
- 密钥生成:推荐使用
openssl rand -hex 32生成高强度密钥
- JWT配置:设置HS256签名算法与30分钟令牌有效期
流程图:
graph TD
A[客户端请求] --> B{携带有效JWT?}
B -->|是| C[解码验证签名]
B -->|否| D[返回401错误]
C --> E{令牌未过期?}
E -->|是| F[提取用户信息]
E -->|否| D
F --> G[执行路径操作]
2. 密码哈希处理
1 2 3 4 5 6 7 8
| from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_pwd, hashed_pwd): return pwd_context.verify(plain_pwd, hashed_pwd)
def get_password_hash(password): return pwd_context.hash(password)
|
- 使用
passlib库的Bcrypt算法实现密码哈希
- 哈希验证流程:
- 用户注册时生成
hashed_password存入数据库
- 登录时通过
verify_password比对明文密码与哈希值
- 防止密码明文泄露,即使数据库被盗也无法逆向破解
3. JWT令牌生成与验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from datetime import datetime, timedelta, timezone import jwt
def create_access_token(data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") return get_user(fake_db, username) except InvalidTokenError: raise HTTPException(401, "无效令牌")
|
- 令牌生成:包含用户标识(sub)、签发时间(iat)、过期时间(exp)等标准声明
- 验证机制:自动检查签名有效性、过期时间和用户状态
- 用户注入:通过依赖项将解码后的用户对象注入路径操作函数
4. 完整认证流程实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_db, form_data.username, form_data.password) if not user: raise HTTPException(401, "用户名或密码错误") access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me") async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user
|
- 登录流程:验证凭证 → 生成JWT → 返回Bearer令牌
- 用户状态检查:通过
get_current_active_user验证用户是否激活
- OpenAPI集成:自动生成Swagger UI的Authorize按钮和测试表单
5. 生产环境注意事项
- 密钥管理:使用环境变量存储密钥,禁止硬编码
- 传输安全:必须启用HTTPS防止中间人攻击
- 令牌存储:客户端应使用HttpOnly Secure Cookie存储
- 刷新令牌:建议实现令牌刷新机制,减少长期令牌风险
- 监控审计:记录异常登录尝试和令牌验证失败事件
测试命令示例:
1 2 3 4 5
| curl -X POST -d "username=johndoe&password=secret" http://localhost:8000/token
curl -H "Authorization: Bearer YOUR_TOKEN" http://localhost:8000/users/me
|