M-Bus Gateway
← Tilbage til blog
· fastapi· websocket· SSE· realtid· IoT· MQTT· EventSource· React· alarm

FastAPI WebSocket og SSE — realtidsdata til IoT dashboards

FastAPI WebSocket og Server-Sent Events til IoT: gateway status live-opdatering, alarm-push, MQTT-bridge til browser, connection manager med tenant-isolation og React EventSource integration.

Af M-Bus Gateway

Gateway-status, MQTT-alarmer og live-aflæsninger kræver realtidskommunikation. Her er mønstrene for WebSocket og SSE i FastAPI med MQTT-bridge.


SSE vs. WebSocket — hvornår hvad?

Valg af realtidsprotokol:

Server-Sent Events (SSE):
  ✅ Envejs: Server → Browser
  ✅ Automatisk reconnect (browser-built-in)
  ✅ HTTP/1.1 kompatibel (ingen upgrade)
  ✅ Enkel: EventSource API i browser
  ✅ Firewall-venlig (standard HTTP)
  → Brug til: Alarmer, status-opdateringer, live KPI

WebSocket:
  ✅ Tovejs kommunikation
  ✅ Lav latens (ingen HTTP overhead pr. besked)
  ✅ Binær data understøttet
  → Brug til: OTA fremskridtsbar, interaktiv kommandokonsol, chat
  → Ulempe: Kræver WS-proxy-konfiguration (nginx)

Platform-valg:
  SSE:       Alarm-feed, gateway status, dashboard live-updates
  WebSocket: OTA progress, remote terminal (Cloudflare Tunnel)
  Polling:   Dashboards (30s interval via TanStack Query)

SSE alarm-feed

# server/src/api/routers/sse.py
import asyncio
import json
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from server.src.auth.deps import get_current_user
from server.src.models import User

router = APIRouter(prefix="/api/v1/sse", tags=["SSE"])

# Alarm-kø pr. tenant_id
_alarm_queues: dict[str, list[asyncio.Queue]] = {}


def publish_alarm(tenant_id: str, alarm: dict) -> None:
    """Publicér alarm til alle SSE-klienter for en tenant."""
    queues = _alarm_queues.get(tenant_id, [])
    for queue in queues:
        try:
            queue.put_nowait(alarm)
        except asyncio.QueueFull:
            pass    # Klient har ikke lyttet — skip


@router.get("/alarms")
async def alarm_stream(
    current_user: User = Depends(get_current_user),
) -> StreamingResponse:
    """
    SSE endpoint til alarm-feed.
    Brug: EventSource('/api/v1/sse/alarms', { withCredentials: true })
    """
    tenant_id = str(current_user.tenant_id)
    queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=100)

    # Registrér klientens queue
    if tenant_id not in _alarm_queues:
        _alarm_queues[tenant_id] = []
    _alarm_queues[tenant_id].append(queue)

    async def event_generator():
        try:
            # Send heartbeat hvert 30. sekund (forhindrer proxy timeout)
            while True:
                try:
                    alarm = await asyncio.wait_for(queue.get(), timeout=30.0)
                    yield f"data: {json.dumps(alarm)}\n\n"
                except asyncio.TimeoutError:
                    yield ": heartbeat\n\n"    # SSE comment — ingen event i browser
        finally:
            _alarm_queues.get(tenant_id, []).remove(queue)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",    # Nginx: Deaktiver buffering
        },
    )

MQTT → SSE bridge

# server/src/mqtt/alarm_bridge.py
# Videresend MQTT-alarmer til SSE-klienter

import json
import paho.mqtt.client as mqtt
from server.src.api.routers.sse import publish_alarm
from server.src.db.gateway_lookup import get_tenant_for_gateway
import structlog

log = structlog.get_logger()


def on_alarm_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage) -> None:
    """
    Modtag alarm fra MQTT broker og videresend til SSE-klienter.
    Topic format: meters/{gateway_id}/alarm
    """
    try:
        # Ekstraher gateway_id fra topic
        parts = msg.topic.split("/")
        if len(parts) != 3 or parts[0] != "meters" or parts[2] != "alarm":
            return

        gateway_id = parts[1]
        alarm_data = json.loads(msg.payload.decode("utf-8"))

        # Slå tenant_id op for gateway (cachet opslag)
        tenant_id = get_tenant_for_gateway(gateway_id)
        if not tenant_id:
            log.warning("alarm_unknown_gateway", gateway_id=gateway_id)
            return

        # Publicér til SSE-klienter
        publish_alarm(tenant_id, {
            "type": "alarm",
            "gateway_id": gateway_id,
            "alarm_type": alarm_data.get("alarm_type", "unknown"),
            "meter_id": alarm_data.get("meter_id"),
            "message": alarm_data.get("message", ""),
            "timestamp": alarm_data.get("timestamp"),
        })

    except Exception:
        log.exception("alarm_bridge_error", topic=msg.topic)


# Subscribér ved server-opstart (i lifespan):
# client.subscribe("meters/+/alarm", qos=1)
# client.on_message = on_alarm_message

WebSocket — OTA fremskridtsbar

# server/src/api/routers/ota_ws.py
import asyncio
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from server.src.auth.deps import get_current_user_ws
from server.src.models import User

router = APIRouter(prefix="/api/v1/ws", tags=["WebSocket"])


@router.websocket("/ota/{gateway_id}")
async def ota_progress_ws(
    websocket: WebSocket,
    gateway_id: str,
    current_user: User = Depends(get_current_user_ws),
) -> None:
    """
    WebSocket til OTA-fremskridtsbar.
    Client sender: {"action": "start_ota", "version": "1.2.3"}
    Server sender: {"progress": 45, "status": "downloading"}
    """
    await websocket.accept()

    try:
        # Modtag start-kommando fra browser
        data = await asyncio.wait_for(websocket.receive_json(), timeout=10.0)
        if data.get("action") != "start_ota":
            await websocket.close(code=1008)
            return

        version = data.get("version", "latest")

        # Simulér OTA-fremskridt (erstat med faktisk MQTT-kommando)
        for progress in range(0, 101, 10):
            await websocket.send_json({
                "progress": progress,
                "status": "downloading" if progress < 80 else "installing",
                "version": version,
            })
            await asyncio.sleep(0.5)

        await websocket.send_json({"progress": 100, "status": "complete"})

    except WebSocketDisconnect:
        pass    # Klient disconnectede — normal afslutning
    except asyncio.TimeoutError:
        await websocket.close(code=1008, reason="timeout")

React EventSource integration

// ui/src/hooks/useAlarmStream.ts
import { useEffect, useRef, useCallback } from "react";
import { useQueryClient } from "@tanstack/react-query";

interface AlarmEvent {
  type: string;
  gateway_id: string;
  alarm_type: string;
  meter_id?: string;
  message: string;
  timestamp: string;
}

export function useAlarmStream(onAlarm?: (alarm: AlarmEvent) => void) {
  const esRef = useRef<EventSource | null>(null);
  const queryClient = useQueryClient();

  const connect = useCallback(() => {
    if (esRef.current) return;

    const es = new EventSource("/api/v1/sse/alarms", { withCredentials: true });

    es.onmessage = (event) => {
      try {
        const alarm: AlarmEvent = JSON.parse(event.data);
        onAlarm?.(alarm);
        // Invalider alarm-cache så badge opdateres
        queryClient.invalidateQueries({ queryKey: ["alarms"] });
      } catch {
        // Parse-fejl — ignorer
      }
    };

    es.onerror = () => {
      es.close();
      esRef.current = null;
      // Browser reconnector automatisk efter kort pause
      setTimeout(connect, 5000);
    };

    esRef.current = es;
  }, [onAlarm, queryClient]);

  useEffect(() => {
    connect();
    return () => {
      esRef.current?.close();
      esRef.current = null;
    };
  }, [connect]);
}

// Brug i PortalLayout:
// useAlarmStream((alarm) => toast.warning(`Alarm: ${alarm.message}`));

Nginx-konfiguration til SSE og WebSocket

# nginx/conf.d/mbus.conf — Nginx proxy til SSE og WS

upstream mbus_server {
    server server:8000;
}

server {
    # SSE endpoint — vigtig: deaktiver buffering
    location /api/v1/sse/ {
        proxy_pass http://mbus_server;
        proxy_http_version 1.1;
        proxy_set_header Connection "";          # Keep-alive
        proxy_set_header X-Real-IP $remote_addr;
        proxy_buffering off;                     # KRITISK for SSE
        proxy_cache off;
        proxy_read_timeout 3600s;                # Lang timeout (ikke 60s default)
        chunked_transfer_encoding on;
    }

    # WebSocket endpoint — kræver Upgrade header
    location /api/v1/ws/ {
        proxy_pass http://mbus_server;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;  # KRITISK for WS
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_read_timeout 3600s;
    }

    # Alle andre endpoints — standard proxy
    location /api/ {
        proxy_pass http://mbus_server;
        proxy_http_version 1.1;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

Konklusion

SSE er det rigtige valg til envejs alarm-push og live-status — simpelt, firewall-venligt og med automatisk browser-reconnect. WebSocket bruges kun til tovejs-interaktion (OTA, remote terminal). MQTT→SSE bridge i serveren kobler gateway-telegrammer direkte til browser-klienter. Nginx-konfiguration med proxy_buffering off er kritisk for SSE at fungere bag en reverse proxy.

Se MQTT QoS guide eller React TanStack Query guide.