M-Bus Gateway
← Tilbage til blog
· python· collections· defaultdict· Counter· deque· namedtuple· IoT· SaaS· backend

Python collections — praktisk guide til IoT og SaaS backends

Python collections til IoT og SaaS: defaultdict til HCA-aggregering, Counter til alarm-statistik, deque til rolling buffer, namedtuple til MQTT-payloads og OrderedDict til konfigurationshistorik.

Af M-Bus Gateway

collections standardbiblioteket løser gentagne mønstre i IoT-backends: aggregering, buffering og prioriteret dataadgang. Her er de mønster der bruges i produktionskoden.


defaultdict — aggregering uden KeyError

# server/src/distribution/aggregator.py
from collections import defaultdict
from decimal import Decimal


def aggregate_hca_readings_by_unit(
    readings: list[dict],
) -> dict[str, Decimal]:
    """
    Aggregér HCA-aflæsninger pr. lejlighedsenhed.
    defaultdict(Decimal): Ingen KeyError ved første tilgang.
    """
    unit_totals: dict[str, Decimal] = defaultdict(Decimal)

    for reading in readings:
        unit_id = reading["unit_id"]
        hca_units = Decimal(str(reading["hca_units"]))
        unit_totals[unit_id] += hca_units

    return dict(unit_totals)    # Konvertér til plain dict ved returnering


def group_alarms_by_gateway(
    alarms: list[dict],
) -> dict[str, list[dict]]:
    """
    Gruppér alarmer pr. gateway — én liste pr. gateway_id.
    defaultdict(list): Ingen [] initialisering nødvendig.
    """
    grouped: dict[str, list[dict]] = defaultdict(list)

    for alarm in alarms:
        grouped[alarm["gateway_id"]].append(alarm)

    return dict(grouped)


def count_readings_by_meter_type(
    readings: list[dict],
) -> dict[str, int]:
    """
    Tæl aflæsninger pr. målertype.
    defaultdict(int): Start fra 0 automatisk.
    """
    counts: dict[str, int] = defaultdict(int)

    for r in readings:
        counts[r["meter_type"]] += 1

    return dict(counts)

Counter — statistik og top-N

# server/src/analytics/alarm_stats.py
from collections import Counter
from datetime import datetime, timedelta


def alarm_frequency_analysis(
    alarms: list[dict],
    top_n: int = 10,
) -> dict:
    """
    Analyser alarm-hyppighed for portefølje.
    Counter er optimeret til tælle-operationer.
    """
    # Tæl pr. alarm-type
    type_counter = Counter(a["alarm_type"] for a in alarms)

    # Tæl pr. gateway
    gateway_counter = Counter(a["gateway_id"] for a in alarms)

    # Tæl pr. fabrikat (første 3 tegn af meter_id er fabrikant-kode)
    manufacturer_counter = Counter(
        a["meter_id"][:3]
        for a in alarms
        if a.get("meter_id") and len(a["meter_id"]) >= 3
    )

    return {
        "top_alarm_types": type_counter.most_common(top_n),
        # → [("DRY_ALARM", 23), ("BATTERY_LOW", 18), ...]
        "top_gateways_by_alarms": gateway_counter.most_common(5),
        "alarm_by_manufacturer": dict(manufacturer_counter),
        "total_alarms": sum(type_counter.values()),
        "unique_alarm_types": len(type_counter),
    }


def detect_alarm_spike(
    recent_alarms: list[dict],
    baseline_rate: float = 5.0,    # Alarmer pr. time under normale forhold
) -> bool:
    """
    Detektér unormal alarm-rate via Counter.
    Returnerer True hvis seneste time har > 3× baseline.
    """
    one_hour_ago = datetime.utcnow() - timedelta(hours=1)
    recent = Counter(
        a["gateway_id"]
        for a in recent_alarms
        if datetime.fromisoformat(a["timestamp"]) > one_hour_ago
    )
    total_recent = sum(recent.values())
    return total_recent > baseline_rate * 3

deque — rolling buffer til gateway-telemetri

# gateway/src/storage/ram_buffer.py
from collections import deque
from dataclasses import dataclass
from datetime import datetime
import threading


@dataclass
class TelegramEntry:
    meter_id: str
    raw_json: str
    received_at: datetime


class TelegramRingBuffer:
    """
    RAM-buffer til dagens telegrammer.
    deque(maxlen=N): Automatisk overskrivning af ældste elementer.
    Thread-safe via threading.Lock.
    """

    def __init__(self, maxlen: int = 10000):
        self._buffer: deque[TelegramEntry] = deque(maxlen=maxlen)
        self._lock = threading.Lock()

    def push(self, entry: TelegramEntry) -> None:
        with self._lock:
            self._buffer.append(entry)

    def drain(self) -> list[TelegramEntry]:
        """Tøm bufferen og returnér indholdet som liste."""
        with self._lock:
            items = list(self._buffer)
            self._buffer.clear()
            return items

    def peek_latest(self, n: int = 10) -> list[TelegramEntry]:
        """Se de N seneste elementer uden at fjerne dem."""
        with self._lock:
            return list(self._buffer)[-n:]

    def size(self) -> int:
        with self._lock:
            return len(self._buffer)


# deque som FIFO-kommandokø til MQTT:
command_queue: deque[dict] = deque(maxlen=50)

def enqueue_command(cmd: dict) -> None:
    command_queue.appendleft(cmd)    # appendleft = prioritér nyeste

def pop_next_command() -> dict | None:
    try:
        return command_queue.pop()
    except IndexError:
        return None

namedtuple — strukturerede MQTT-payloads

# gateway/src/mqtt/payloads.py
from collections import namedtuple
from typing import NamedTuple    # Moderne alternativ med type hints


# Klassisk namedtuple:
GatewayStatus = namedtuple(
    "GatewayStatus",
    ["gateway_id", "timestamp", "rssi", "battery_pct", "reading_count"],
)

# Moderne NamedTuple med type hints (anbefalet i ny kode):
class MeterReading(NamedTuple):
    meter_id: str
    meter_type: str    # "heat" | "hca" | "water" | "electricity"
    total_kwh: float
    timestamp: str
    rssi: int | None = None
    battery_pct: int | None = None

    def is_valid(self) -> bool:
        return self.total_kwh >= 0 and bool(self.meter_id)


# NamedTuple er immutabel → hashbar → kan bruges som dict-nøgle:
readings_by_meter: dict[MeterReading, int] = {}

# Tuple-unpacking virker:
meter_id, meter_type, total_kwh, *rest = MeterReading(
    "KAM-001", "heat", 1234.5, "2026-05-24T06:00:00Z"
)

OrderedDict — konfigurationshistorik

# server/src/config/history.py
from collections import OrderedDict
from datetime import datetime


class PropertyConfigHistory:
    """
    Gem konfigurationshistorik i rækkefølge.
    OrderedDict bevarer indsættelsesrækkefølge (Python 3.7+ dict gør det også,
    men OrderedDict giver explict semantik + move_to_end).
    """

    def __init__(self, max_entries: int = 100):
        self._history: OrderedDict[str, dict] = OrderedDict()
        self._max_entries = max_entries

    def record_change(self, change_id: str, change: dict) -> None:
        self._history[change_id] = {
            **change,
            "recorded_at": datetime.utcnow().isoformat(),
        }
        # Fjern ældste hvis over grænsen
        while len(self._history) > self._max_entries:
            self._history.popitem(last=False)    # FIFO: Fjern første

    def latest(self, n: int = 10) -> list[dict]:
        """Returnér de N nyeste ændringer."""
        items = list(self._history.values())
        return items[-n:]

    def move_to_top(self, change_id: str) -> None:
        """Flyt en ændring til toppen (nyeste)."""
        self._history.move_to_end(change_id, last=True)

ChainMap — konfigurationsprioritet

# gateway/src/config/chain.py
from collections import ChainMap


def build_effective_config(
    db_config: dict,
    env_config: dict,
    defaults: dict,
) -> dict:
    """
    ChainMap: Første dict har højeste prioritet.
    Opslag søger i rækkefølge: db → env → defaults

    Eksempel:
      db_config = {"MQTT_HOST": "178.105.90.8", "LOG_LEVEL": "DEBUG"}
      env_config = {"LOG_LEVEL": "INFO"}
      defaults = {"MQTT_PORT": 8883, "LOG_LEVEL": "WARNING", "WMBUS_MODE": "c1"}

    Resultat:
      MQTT_HOST: "178.105.90.8" (fra db)
      LOG_LEVEL: "DEBUG"        (db overstyrer env og defaults)
      MQTT_PORT: 8883           (fra defaults — mangler i db og env)
      WMBUS_MODE: "c1"          (fra defaults)
    """
    return dict(ChainMap(db_config, env_config, defaults))

Konklusion

collections løser hverdagslige mønstre mere elegant end plain dict/list: defaultdict eliminerer KeyError ved aggregering, Counter giver statistik med most_common(), deque er O(1) ved begge ender (modsat list), NamedTuple giver strukturerede immutable records med type hints, og ChainMap håndterer konfigurationsprioritet rent. Alle seks typer er brugt i gatewayen og server-backendet.

Se Python functools guide eller Python asyncio patterns guide.