0%

FastAPI自学教程(44) - 后台任务机制

1.基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Notification sent in background"}
  • 使用BackgroundTasks参数声明后台任务容器,通过add_task()添加任务函数
  • 请求流程:
      graph TD
      A[客户端请求] --> B[执行路径操作函数]
      B --> C[添加后台任务]
      C --> D[返回HTTP响应]
      D --> E[异步执行后台任务]
  • 特点:
    • 任务在HTTP响应返回后异步执行
    • 支持任意参数传递(如message传递给write_log
    • 自动处理函数同步/异步类型

2.依赖注入组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from typing import Annotated
from fastapi import Depends

def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"Query: {q}\n"
background_tasks.add_task(write_log, message)
return q

@app.post("/advanced-notification/{email}")
async def advanced_notification(
email: str,
background_tasks: BackgroundTasks,
query: Annotated[str, Depends(get_query)]
):
background_tasks.add_task(write_log, f"Email: {email}\n")
return {"status": "multi-task executed"}
  • 在依赖项中预添加任务,实现任务分层管理
  • 支持类型注解Annotated声明依赖关系链
  • 典型应用场景:
    • 请求预处理日志记录
    • 参数验证后触发关联任务
    • 多模块任务聚合执行

3.高级配置技巧

3.1 类方法集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class NotificationService:
@staticmethod
def sms_task(phone: str, msg: str):
# 模拟短信发送逻辑
with open("sms_log.txt", "a") as f:
f.write(f"{phone}:{msg}\n")

@app.post("/send-sms")
async def send_sms(
phone: str,
msg: str,
tasks: BackgroundTasks
):
tasks.add_task(NotificationService.sms_task, phone, msg)
return {"result": "短信任务已排队"}
  • 支持类静态方法作为任务函数
  • 可通过self参数访问实例属性(需初始化类实例)

3.2 异常处理机制

1
2
3
4
5
6
7
8
9
10
11
12
def safe_write_log(message: str):
try:
with open("log.txt", "a") as f:
f.write(message)
except Exception as e:
with open("error.log", "a") as f:
f.write(f"Log Error: {str(e)}\n")

@app.post("/safe-notify")
async def safe_notify(tasks: BackgroundTasks):
tasks.add_task(safe_write_log, "安全日志条目\n")
return {"status": "error-handled"}
  • 任务函数内部需自行实现异常捕获
  • 错误日志建议写入独立文件避免循环失败

4.生产环境注意事项

  1. 任务时效性:后台任务应在合理时间内完成,避免堆积
  2. 资源限制
    • 文件操作需注意并发写入冲突
    • 数据库连接建议使用独立连接池
  3. 监控建议
    • 记录任务开始/结束时间戳
    • 使用APM工具监控任务执行状态
  4. 分布式扩展
    • 单机任务队列适用于轻量级场景
    • 高并发需求建议集成Celery

测试命令示例:

1
curl -X POST "http://localhost:8000/send-notification/user@example.com"