十个可直接复制粘贴的模式,用 FastAPI 向阅读器推送数据——顺滑、安全、低延迟。

用 FastAPI 构建及时看板。十种流式模式——SSE、WebSocket、NDJSON、chunked responses、backpressure、fan-out、caching 及 security——配套可运行代码。
看板不是被“一次刷新”杀死的,而是死在无数个刷新按钮。及时不是“更快的轮询(poll)”,而是关在push(推送)。办事器端用 FastAPI,对于端是现代阅读器,你可以流式传输数值而不是整页——无需重构技能栈。下面是我经常使用的十个实用模式,让图体现于就动起来,而不是等 30 秒。
1.Server-Sent Events (SSE) - 最简朴的“单向推送”当你的看板只需要从办事器→阅读器的更新时,SSE 是最轻量的选择。运行于 HTTP/1.1 之上,对于代办署理友爱,而且自带主动重连。
# server.pyfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponseimport asyncio, json, timeapp = FastAPI()asyncdefsse_gen(): whileTrue: payload = {"t": time.time(), "cpu": 0.37} yieldf"data: {json.dumps(payload)}\n\n" await asyncio.sleep(1)@app.get("/metrics")asyncdefmetrics(): return StreamingResponse(sse_gen(), media_type="text/event-stream")
// client.jsconst ev = new EventSource("/metrics");ev.onmessage = e => { const { t, cpu } = JSON.parse(e.data); updateChart(t, cpu);};
利用场景:你想以极低成本向图表及计数器做推送。
2.WebSockets - 双向节制通道对于在筛选、及时搜刮或者协作式看板,你会但愿阅读器能措辞(回传)。
# server.pyfrom fastapi import FastAPI, WebSocketapp = FastAPI()@app.websocket("/ws")async def ws(ws: WebSocket): await ws.accept() await ws.send_json({"hello": "client"}) while True: msg = await ws.receive_json() # e.g., { "cmd": "subscribe", "ticker": "AAPL" } await ws.send_json({"ok": True, "echo": msg})
提醒:用 WebSocket 传节制动静;把重数据经由过程 SSE 或者 NDJSON(见下)来流式传输,以分散存眷点。
3.NDJSON over chunked HTTP - “布衣级 Streaming API”无需花梢和谈;只是于一个无界相应里用换行分开的 JSON。
# server.pyfrom fastapi.responses import StreamingResponseimport asyncio, jsonasyncdefndjson(): for i inrange(5_000): yield json.dumps({"i": i}) + "\n" await asyncio.sleep(0.01)@app.get("/stream")asyncdefstream(): return StreamingResponse(ndjson(), media_type="application/x-ndjson")
// client (modern Fetch streams)const res = awaitfetch("/stream");const reader = res.body.getReader();const dec = newTextDecoder();let buf = "";while (true) {const { value, done } = await reader.read();if (done) break; buf += dec.decode(value, { stream: true });let idx;while ((idx = buf.indexOf("\n")) >= 0) { const line = buf.slice(0, idx); buf = buf.slice(idx + 1); handle(JSON.parse(line)); }}
很合适:表格数据流、日记尾部(log tail)、向图表回填数据(backfill)。
4.BackgroundTasks + Stream - 不要壅闭事务轮回快速推送;逐步计较。于连结流相应的同时把重活卸载出去。
from fastapi import BackgroundTasksasyncdefcompute_and_queue(q): # put rows into an asyncio.Queue without blocking client for row inawait slow_db_scan(): await q.put(row) await q.put(None) # poison pill@app.get("/orders")asyncdeforders(bg: BackgroundTasks): q: asyncio.Queue = asyncio.Queue(maxsize=1000) asyncdefgen(): whileTrue: item = await q.get() if item isNone: break yield json.dumps(item) + "\n" bg.add_task(compute_and_queue, q) return StreamingResponse(gen(), media_type="application/x-ndjson")
收益:经由过程行列步队巨细实现不变延迟与 backpressure。
5.Redis Pub/Sub fan-out - 一处出产,多处看板让 worker 向某个通道发布;每一个已经毗连客户端都能收到统一份数据包,无需反复事情。
import aioredis, asyncio, jsonfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponseapp = FastAPI()@app.get("/prices")asyncdefprices(): asyncdefgen(): r = aioredis.from_url("redis://localhost") pub = r.pubsub() await pub.subscribe("ticks") asyncfor msg in pub.listen(): if msg["type"] == "message": yieldf"data: {msg['data'].decode()}\n\n" return StreamingResponse(gen(), media_type="text/event-stream")
模式:出产者只写一次;由 Redis fan-out 到 N 个客户端。
6.Postgres LISTEN/NOTIFY - 数据库原闹事件很是合适绑定到瓜葛事务(新定单、功课状况)的看板。
# server.pyimport asyncpg, asyncio, jsonfrom fastapi.responses import StreamingResponseasyncdefpg_events(): conn = await asyncpg.connect(dsn="postgres://user:pass@localhost/db") await conn.add_listener("order_updates", lambda *a: None) try: whileTrue: msg = await conn.connection.notifies.get() # async queue yieldf"data: {json.dumps({'payload': msg.payload})}\n\n" finally: await conn.close()@app.get("/orders/sse")asyncdeforders_sse(): return StreamingResponse(pg_events(), media_type="text/event-stream")
SQL 侧:
NOTIFY order_updates, json_build_object('id', NEW.id, 'status', NEW.status)::text;
利益:无需轮询表;数据库直接把事务推出来。
7.Backpressure & rate limiting - 让图表更顺滑你的阅读器画不动每一秒 5 万个点。要末于办事器端做撙节(throttle),要末归并事务。
# throttle generatorasync def rate_limited(gen, max_hz=20): interval = 1.0 / max_hz last = 0 async for item in gen: now = asyncio.get_event_loop().time() if now - last < interval: continue last = now yield item
可与客户端小型 ring buffer 联合,只绘制近来的 N 个点。Smooth > raw(顺滑赛过原始)。
8.Heartbeats, retries, and idle timeouts流式传输会断。要让问题显性化,并主动恢复。
# server heartbeatasync def sse_gen(): try: while True: yield "event: ping\ndata: {}\n\n" await asyncio.sleep(10) finally: # metrics, cleanup, etc. pass
// clientev.addEventListener("ping", () => markHealthy());ev.onerror = () => markUnhealthy(); // EventSource auto-reconnects
留意:把代办署理超时设置患上高在心跳距离。
一次认证,按 topic 授权。短生命周期的 JWT + 办事器端校验。
from fastapi import Depends, HTTPExceptionfrom fastapi.security import OAuth2PasswordBeareroauth2 = OAuth2PasswordBearer(tokenUrl="token")defuser_from_token(token=Depends(oauth2)): # verify & decode; return user/tenancy return {"sub": "u1", "org": "acme"}@app.get("/tenant/{topic}")asyncdeftenant_stream(topic: str, user=Depends(user_from_token)): ifnot can_read(user, topic): raise HTTPException(403, "forbidden") return StreamingResponse(sse_gen_for(topic, user["org"]), media_type="text/event-stream")
实践:按 org/project 限制定阅规模,防止数据泄露。
10.Edge caching for initial state; stream the deltas用缓存的 snapshot 即刻“灌水”看板,然后经由过程 SSE/WebSocket 推送变动。
# initial snapshot route (cacheable)@app.get("/snapshot")async def snapshot(): data = await read_aggregate() return JSONResponse(data, headers={"Cache-Control": "public, max-age=5"})
客户端流程:先哀求 /snapshot → 当即衬着 → 打开 /metrics 的 SSE 获取及时更新。
成果:更好的 LCP,同时具有真实的及时举动。
Micro-checklist(让你的流有“高级感”)按需选择和谈:SSE(单向)、WebSocket(双向)、NDJSON(批量流)。连结数据包小且有类型:偏向固定的 JSON 布局;不要“年夜杂烩”的 “any” blob。掩护事务轮回:把重活卸载到使命或者 worker;利用 backpressure。让掉败显性化:心跳 + UI 康健唆使。器量指标:记载 TTFB(time-to-first-byte)、包速度、断连次数及 p95 延迟。一个快速、真正的案例某团队上线了一个生意业务看板,每一 2 秒轮询一次,每一次都重渲整张表。咱们改成 Redis Pub/Sub → SSE,于客户端为图表插手 ring buffer,并于初次衬着时提供缓存的 snapshot。中位“首个数字呈现”时间降至 300 ms 如下,中端条记本的 CPU 占用降落 40%,带宽降低 60%——同时更新更快。用户再也不问“此刻是及时的吗?”
结语及时不是一个特征,而是一种交付气势派头。用 FastAPI,你可以从很小最先——SSE 做推送、一个心跳及一个 snapshot——然后跟着产物发展,慢慢进级到 WebSocket、fan-out 及数据库原闹事件。选择能带来顺滑运动及可猜测成本的“起码原语(primitives)”。你的图表(以和你的运维团队)城市感激你。
招呼步履(CTA):想要一个最小的起步模板,包罗 SSE、WebSocket 节制通道、Redis fan-out,以和带 Fetch streams 的 React 客户端吗?评论 “stream-kit”,我会分享堆栈链接。
本文转载自AI年夜模子不雅察站,作者:AI研究生
-本文由开云·Kaiyun(中国)官方网站-科技股份有限公司-www.kaiyun.com(kaiyun.com)技术部原创提供,更多官方资讯请认准本站(dysp777.com)。