WebSockets 接口¶
FastAPI 支持使用 WebSocket。
WebSocket 客户端¶
生产环境¶
生产环境下,您可能要使用 React、Vue.js、Angular 等现代前端框架。
并且使用 WebSocket 与后端通信时,还可能要使用前端工具。
本地移动应用也有可能在本地代码(native code)中使用 WebSocket 与后端直接通信。
当然您还有可能以其他方式与 WebSocket 端点通信。
但在本例中,我们使用非常简单的 HTML 文档,该文档只包含了一些 JavaScript,所有内容都在一个长字符串中。
这种方式肯定不好,在生产环境中最好不要这样做。
生产环境下要使用前端框架等方式。
但本例的这种方式可以用最简单的形式,让我们集中精力了解服务器端的 WebSocket,而且还能正常运行:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
创建 websocket
¶
在 FastAPI 应用中创建 websocket
:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
技术细节
您也可以使用 from starlette.websockets import WebSocket
。
FastAPI 的 WebSocket
只是为开发者提供的快捷方式,但它其实直接继承自 Starlette。
等待信息与发送信息¶
在 WebSocket 路由里可以使用 await
接收或发送信息。
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
并可以接收和发送二进制、文本、JSON 等格式的数据。
运行¶
如果文件名为 main.py
,则以如下命令运行应用:
$ uvicorn main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
打开浏览器 http://127.0.0.1:8000。
可以看到如下简单页面:
在输入框中输入并发送信息:
FastAPI 应用使用 WebSocket 返回如下响应:
发送与接收信息:
所有操作都使用同一个 WebSocket 连接。
使用 Depends
等函数¶
在 WebSocket 端点中,从 fastapi
导入并使用以下函数:
Depends
Security
Cookie
Header
Path
Query
此处与其他 FastAPI 端点/路径操作的工作方式相同:
from typing import Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Union[str, None] = Cookie(default=None),
token: Union[str, None] = Query(default=None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
说明
Websocket 不会真的触发 HTTPException
,因此最好直接关闭 WebSocket 连接。
此时,可以使用规范定义的有效代码中的关闭代码。
今后, 还有可能添加可在任意位置触发的 WebSocketException
及其异常处理器。但这些都取决于 Starlette 的 PR #527。
WebSocket 与依赖项¶
如果文件名为 main.py
,则以如下命令运行服务:
$ uvicorn main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
打开浏览器 http://127.0.0.1:8000。
设置:
- Item ID, 在路径中使用
- Token,用作查询参数
提示
注意,查询 token
由依赖项处理。
使用该依赖项可以连接 WebSocket,然后发送与接收信息:
处理断开连接与多个客户端¶
WebSocket 连接关闭时, await websocket.receive_text()
触发 WebSocketDisconnect
异常,然后就可以像本例一样捕获和处理异常。
from typing import List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
尝试以下操作:
- 用多个浏览器标签页打开应用
- 在多个标签页中写入信息
- 然后关闭其中一个标签页
此时会触发 WebSocketDisconnect
异常,其他客户端会接收到如下信息:
Client #1596980209979 left the chat
提示
上述应用是最简单的例子,只是为了演示如何处理与广播信息给多个 WebSocket 连接。
注意,所有操作都是在内存的单列表中处理,因此它只在进程运行时工作,且只使用单进程工作。
如果需要为 FastAPI 集成一些由 Redis、PostgreSQL 等数据库支持的、且更成熟稳定的功能,请参阅编码/广播。
更多信息¶
更多选项详见 Starlette 文档: