1.基础用法
1 2 3 4 5 6 7 8 9 10 11
| from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse
app = FastAPI()
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() while True: data = await websocket.receive_text() await websocket.send_text(f"Message text was: {data}")
|
- 使用
@app.websocket装饰器声明WebSocket路由,通过WebSocket类处理连接
- 必须调用
await websocket.accept()接受客户端连接请求
- 通过无限循环实现持续通信,支持
receive_text()/receive_bytes()等接收方式
流程图:
graph TD
A[客户端连接] --> B[调用accept接受连接]
B --> C{接收消息循环}
C -->|文本/二进制数据| D[处理业务逻辑]
D --> E[发送响应]
C -->|断开请求| F[关闭连接]
2.连接管理
2.1 多客户端管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket)
async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/chat/{room_id}") async def chat_room(websocket: WebSocket, room_id: str): await manager.connect(websocket) try: while True: message = await websocket.receive_text() await manager.broadcast(f"[{room_id}]: {message}") except WebSocketDisconnect: manager.disconnect(websocket)
|
- 通过自定义类管理活跃连接,支持广播消息
- 使用
WebSocketDisconnect异常处理客户端断开事件
2.2 路径参数集成
1 2 3 4
| @app.websocket("/user/{user_id}") async def user_channel(websocket: WebSocket, user_id: int): await websocket.accept()
|
3.生产环境建议
部署配置:
- 使用uvicorn作为ASGI服务器:
uvicorn main:app --reload
- 设置
--ws-ping-interval参数维持长连接
安全增强:
- 限制连接速率防止DDoS攻击
- 使用wss协议加密WebSocket通信
调试工具:
- 通过Apifox等工具测试WebSocket接口
- 浏览器开发者工具实时查看WebSocket帧
性能优化:
- 异步处理耗时操作避免阻塞事件循环
- 二进制传输替代文本格式降低带宽消耗
测试命令示例:
1 2 3 4 5
| uvicorn main:app --reload --port 8000
python -m websockets ws://localhost:8000/ws
|