· fastapi· websocket· SSE· realtid· IoT· MQTT· EventSource· React· alarm
FastAPI WebSocket og SSE — realtidsdata til IoT dashboards
FastAPI WebSocket og Server-Sent Events til IoT: gateway status live-opdatering, alarm-push, MQTT-bridge til browser, connection manager med tenant-isolation og React EventSource integration.
Af M-Bus Gateway
Gateway-status, MQTT-alarmer og live-aflæsninger kræver realtidskommunikation. Her er mønstrene for WebSocket og SSE i FastAPI med MQTT-bridge.
SSE vs. WebSocket — hvornår hvad?
Valg af realtidsprotokol:
Server-Sent Events (SSE):
✅ Envejs: Server → Browser
✅ Automatisk reconnect (browser-built-in)
✅ HTTP/1.1 kompatibel (ingen upgrade)
✅ Enkel: EventSource API i browser
✅ Firewall-venlig (standard HTTP)
→ Brug til: Alarmer, status-opdateringer, live KPI
WebSocket:
✅ Tovejs kommunikation
✅ Lav latens (ingen HTTP overhead pr. besked)
✅ Binær data understøttet
→ Brug til: OTA fremskridtsbar, interaktiv kommandokonsol, chat
→ Ulempe: Kræver WS-proxy-konfiguration (nginx)
Platform-valg:
SSE: Alarm-feed, gateway status, dashboard live-updates
WebSocket: OTA progress, remote terminal (Cloudflare Tunnel)
Polling: Dashboards (30s interval via TanStack Query)
SSE alarm-feed
# server/src/api/routers/sse.py
import asyncio
import json
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from server.src.auth.deps import get_current_user
from server.src.models import User
router = APIRouter(prefix="/api/v1/sse", tags=["SSE"])
# Alarm-kø pr. tenant_id
_alarm_queues: dict[str, list[asyncio.Queue]] = {}
def publish_alarm(tenant_id: str, alarm: dict) -> None:
"""Publicér alarm til alle SSE-klienter for en tenant."""
queues = _alarm_queues.get(tenant_id, [])
for queue in queues:
try:
queue.put_nowait(alarm)
except asyncio.QueueFull:
pass # Klient har ikke lyttet — skip
@router.get("/alarms")
async def alarm_stream(
current_user: User = Depends(get_current_user),
) -> StreamingResponse:
"""
SSE endpoint til alarm-feed.
Brug: EventSource('/api/v1/sse/alarms', { withCredentials: true })
"""
tenant_id = str(current_user.tenant_id)
queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=100)
# Registrér klientens queue
if tenant_id not in _alarm_queues:
_alarm_queues[tenant_id] = []
_alarm_queues[tenant_id].append(queue)
async def event_generator():
try:
# Send heartbeat hvert 30. sekund (forhindrer proxy timeout)
while True:
try:
alarm = await asyncio.wait_for(queue.get(), timeout=30.0)
yield f"data: {json.dumps(alarm)}\n\n"
except asyncio.TimeoutError:
yield ": heartbeat\n\n" # SSE comment — ingen event i browser
finally:
_alarm_queues.get(tenant_id, []).remove(queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx: Deaktiver buffering
},
)
MQTT → SSE bridge
# server/src/mqtt/alarm_bridge.py
# Videresend MQTT-alarmer til SSE-klienter
import json
import paho.mqtt.client as mqtt
from server.src.api.routers.sse import publish_alarm
from server.src.db.gateway_lookup import get_tenant_for_gateway
import structlog
log = structlog.get_logger()
def on_alarm_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage) -> None:
"""
Modtag alarm fra MQTT broker og videresend til SSE-klienter.
Topic format: meters/{gateway_id}/alarm
"""
try:
# Ekstraher gateway_id fra topic
parts = msg.topic.split("/")
if len(parts) != 3 or parts[0] != "meters" or parts[2] != "alarm":
return
gateway_id = parts[1]
alarm_data = json.loads(msg.payload.decode("utf-8"))
# Slå tenant_id op for gateway (cachet opslag)
tenant_id = get_tenant_for_gateway(gateway_id)
if not tenant_id:
log.warning("alarm_unknown_gateway", gateway_id=gateway_id)
return
# Publicér til SSE-klienter
publish_alarm(tenant_id, {
"type": "alarm",
"gateway_id": gateway_id,
"alarm_type": alarm_data.get("alarm_type", "unknown"),
"meter_id": alarm_data.get("meter_id"),
"message": alarm_data.get("message", ""),
"timestamp": alarm_data.get("timestamp"),
})
except Exception:
log.exception("alarm_bridge_error", topic=msg.topic)
# Subscribér ved server-opstart (i lifespan):
# client.subscribe("meters/+/alarm", qos=1)
# client.on_message = on_alarm_message
WebSocket — OTA fremskridtsbar
# server/src/api/routers/ota_ws.py
import asyncio
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from server.src.auth.deps import get_current_user_ws
from server.src.models import User
router = APIRouter(prefix="/api/v1/ws", tags=["WebSocket"])
@router.websocket("/ota/{gateway_id}")
async def ota_progress_ws(
websocket: WebSocket,
gateway_id: str,
current_user: User = Depends(get_current_user_ws),
) -> None:
"""
WebSocket til OTA-fremskridtsbar.
Client sender: {"action": "start_ota", "version": "1.2.3"}
Server sender: {"progress": 45, "status": "downloading"}
"""
await websocket.accept()
try:
# Modtag start-kommando fra browser
data = await asyncio.wait_for(websocket.receive_json(), timeout=10.0)
if data.get("action") != "start_ota":
await websocket.close(code=1008)
return
version = data.get("version", "latest")
# Simulér OTA-fremskridt (erstat med faktisk MQTT-kommando)
for progress in range(0, 101, 10):
await websocket.send_json({
"progress": progress,
"status": "downloading" if progress < 80 else "installing",
"version": version,
})
await asyncio.sleep(0.5)
await websocket.send_json({"progress": 100, "status": "complete"})
except WebSocketDisconnect:
pass # Klient disconnectede — normal afslutning
except asyncio.TimeoutError:
await websocket.close(code=1008, reason="timeout")
React EventSource integration
// ui/src/hooks/useAlarmStream.ts
import { useEffect, useRef, useCallback } from "react";
import { useQueryClient } from "@tanstack/react-query";
interface AlarmEvent {
type: string;
gateway_id: string;
alarm_type: string;
meter_id?: string;
message: string;
timestamp: string;
}
export function useAlarmStream(onAlarm?: (alarm: AlarmEvent) => void) {
const esRef = useRef<EventSource | null>(null);
const queryClient = useQueryClient();
const connect = useCallback(() => {
if (esRef.current) return;
const es = new EventSource("/api/v1/sse/alarms", { withCredentials: true });
es.onmessage = (event) => {
try {
const alarm: AlarmEvent = JSON.parse(event.data);
onAlarm?.(alarm);
// Invalider alarm-cache så badge opdateres
queryClient.invalidateQueries({ queryKey: ["alarms"] });
} catch {
// Parse-fejl — ignorer
}
};
es.onerror = () => {
es.close();
esRef.current = null;
// Browser reconnector automatisk efter kort pause
setTimeout(connect, 5000);
};
esRef.current = es;
}, [onAlarm, queryClient]);
useEffect(() => {
connect();
return () => {
esRef.current?.close();
esRef.current = null;
};
}, [connect]);
}
// Brug i PortalLayout:
// useAlarmStream((alarm) => toast.warning(`Alarm: ${alarm.message}`));
Nginx-konfiguration til SSE og WebSocket
# nginx/conf.d/mbus.conf — Nginx proxy til SSE og WS
upstream mbus_server {
server server:8000;
}
server {
# SSE endpoint — vigtig: deaktiver buffering
location /api/v1/sse/ {
proxy_pass http://mbus_server;
proxy_http_version 1.1;
proxy_set_header Connection ""; # Keep-alive
proxy_set_header X-Real-IP $remote_addr;
proxy_buffering off; # KRITISK for SSE
proxy_cache off;
proxy_read_timeout 3600s; # Lang timeout (ikke 60s default)
chunked_transfer_encoding on;
}
# WebSocket endpoint — kræver Upgrade header
location /api/v1/ws/ {
proxy_pass http://mbus_server;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade; # KRITISK for WS
proxy_set_header Connection "upgrade";
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 3600s;
}
# Alle andre endpoints — standard proxy
location /api/ {
proxy_pass http://mbus_server;
proxy_http_version 1.1;
proxy_set_header X-Real-IP $remote_addr;
}
}
Konklusion
SSE er det rigtige valg til envejs alarm-push og live-status — simpelt, firewall-venligt og med automatisk browser-reconnect. WebSocket bruges kun til tovejs-interaktion (OTA, remote terminal). MQTT→SSE bridge i serveren kobler gateway-telegrammer direkte til browser-klienter. Nginx-konfiguration med proxy_buffering off er kritisk for SSE at fungere bag en reverse proxy.