Python collections — praktisk guide til IoT og SaaS backends
Python collections til IoT og SaaS: defaultdict til HCA-aggregering, Counter til alarm-statistik, deque til rolling buffer, namedtuple til MQTT-payloads og OrderedDict til konfigurationshistorik.
Af M-Bus Gateway
collections standardbiblioteket løser gentagne mønstre i IoT-backends: aggregering, buffering og prioriteret dataadgang. Her er de mønster der bruges i produktionskoden.
defaultdict — aggregering uden KeyError
# server/src/distribution/aggregator.py
from collections import defaultdict
from decimal import Decimal
def aggregate_hca_readings_by_unit(
readings: list[dict],
) -> dict[str, Decimal]:
"""
Aggregér HCA-aflæsninger pr. lejlighedsenhed.
defaultdict(Decimal): Ingen KeyError ved første tilgang.
"""
unit_totals: dict[str, Decimal] = defaultdict(Decimal)
for reading in readings:
unit_id = reading["unit_id"]
hca_units = Decimal(str(reading["hca_units"]))
unit_totals[unit_id] += hca_units
return dict(unit_totals) # Konvertér til plain dict ved returnering
def group_alarms_by_gateway(
alarms: list[dict],
) -> dict[str, list[dict]]:
"""
Gruppér alarmer pr. gateway — én liste pr. gateway_id.
defaultdict(list): Ingen [] initialisering nødvendig.
"""
grouped: dict[str, list[dict]] = defaultdict(list)
for alarm in alarms:
grouped[alarm["gateway_id"]].append(alarm)
return dict(grouped)
def count_readings_by_meter_type(
readings: list[dict],
) -> dict[str, int]:
"""
Tæl aflæsninger pr. målertype.
defaultdict(int): Start fra 0 automatisk.
"""
counts: dict[str, int] = defaultdict(int)
for r in readings:
counts[r["meter_type"]] += 1
return dict(counts)
Counter — statistik og top-N
# server/src/analytics/alarm_stats.py
from collections import Counter
from datetime import datetime, timedelta
def alarm_frequency_analysis(
alarms: list[dict],
top_n: int = 10,
) -> dict:
"""
Analyser alarm-hyppighed for portefølje.
Counter er optimeret til tælle-operationer.
"""
# Tæl pr. alarm-type
type_counter = Counter(a["alarm_type"] for a in alarms)
# Tæl pr. gateway
gateway_counter = Counter(a["gateway_id"] for a in alarms)
# Tæl pr. fabrikat (første 3 tegn af meter_id er fabrikant-kode)
manufacturer_counter = Counter(
a["meter_id"][:3]
for a in alarms
if a.get("meter_id") and len(a["meter_id"]) >= 3
)
return {
"top_alarm_types": type_counter.most_common(top_n),
# → [("DRY_ALARM", 23), ("BATTERY_LOW", 18), ...]
"top_gateways_by_alarms": gateway_counter.most_common(5),
"alarm_by_manufacturer": dict(manufacturer_counter),
"total_alarms": sum(type_counter.values()),
"unique_alarm_types": len(type_counter),
}
def detect_alarm_spike(
recent_alarms: list[dict],
baseline_rate: float = 5.0, # Alarmer pr. time under normale forhold
) -> bool:
"""
Detektér unormal alarm-rate via Counter.
Returnerer True hvis seneste time har > 3× baseline.
"""
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
recent = Counter(
a["gateway_id"]
for a in recent_alarms
if datetime.fromisoformat(a["timestamp"]) > one_hour_ago
)
total_recent = sum(recent.values())
return total_recent > baseline_rate * 3
deque — rolling buffer til gateway-telemetri
# gateway/src/storage/ram_buffer.py
from collections import deque
from dataclasses import dataclass
from datetime import datetime
import threading
@dataclass
class TelegramEntry:
meter_id: str
raw_json: str
received_at: datetime
class TelegramRingBuffer:
"""
RAM-buffer til dagens telegrammer.
deque(maxlen=N): Automatisk overskrivning af ældste elementer.
Thread-safe via threading.Lock.
"""
def __init__(self, maxlen: int = 10000):
self._buffer: deque[TelegramEntry] = deque(maxlen=maxlen)
self._lock = threading.Lock()
def push(self, entry: TelegramEntry) -> None:
with self._lock:
self._buffer.append(entry)
def drain(self) -> list[TelegramEntry]:
"""Tøm bufferen og returnér indholdet som liste."""
with self._lock:
items = list(self._buffer)
self._buffer.clear()
return items
def peek_latest(self, n: int = 10) -> list[TelegramEntry]:
"""Se de N seneste elementer uden at fjerne dem."""
with self._lock:
return list(self._buffer)[-n:]
def size(self) -> int:
with self._lock:
return len(self._buffer)
# deque som FIFO-kommandokø til MQTT:
command_queue: deque[dict] = deque(maxlen=50)
def enqueue_command(cmd: dict) -> None:
command_queue.appendleft(cmd) # appendleft = prioritér nyeste
def pop_next_command() -> dict | None:
try:
return command_queue.pop()
except IndexError:
return None
namedtuple — strukturerede MQTT-payloads
# gateway/src/mqtt/payloads.py
from collections import namedtuple
from typing import NamedTuple # Moderne alternativ med type hints
# Klassisk namedtuple:
GatewayStatus = namedtuple(
"GatewayStatus",
["gateway_id", "timestamp", "rssi", "battery_pct", "reading_count"],
)
# Moderne NamedTuple med type hints (anbefalet i ny kode):
class MeterReading(NamedTuple):
meter_id: str
meter_type: str # "heat" | "hca" | "water" | "electricity"
total_kwh: float
timestamp: str
rssi: int | None = None
battery_pct: int | None = None
def is_valid(self) -> bool:
return self.total_kwh >= 0 and bool(self.meter_id)
# NamedTuple er immutabel → hashbar → kan bruges som dict-nøgle:
readings_by_meter: dict[MeterReading, int] = {}
# Tuple-unpacking virker:
meter_id, meter_type, total_kwh, *rest = MeterReading(
"KAM-001", "heat", 1234.5, "2026-05-24T06:00:00Z"
)
OrderedDict — konfigurationshistorik
# server/src/config/history.py
from collections import OrderedDict
from datetime import datetime
class PropertyConfigHistory:
"""
Gem konfigurationshistorik i rækkefølge.
OrderedDict bevarer indsættelsesrækkefølge (Python 3.7+ dict gør det også,
men OrderedDict giver explict semantik + move_to_end).
"""
def __init__(self, max_entries: int = 100):
self._history: OrderedDict[str, dict] = OrderedDict()
self._max_entries = max_entries
def record_change(self, change_id: str, change: dict) -> None:
self._history[change_id] = {
**change,
"recorded_at": datetime.utcnow().isoformat(),
}
# Fjern ældste hvis over grænsen
while len(self._history) > self._max_entries:
self._history.popitem(last=False) # FIFO: Fjern første
def latest(self, n: int = 10) -> list[dict]:
"""Returnér de N nyeste ændringer."""
items = list(self._history.values())
return items[-n:]
def move_to_top(self, change_id: str) -> None:
"""Flyt en ændring til toppen (nyeste)."""
self._history.move_to_end(change_id, last=True)
ChainMap — konfigurationsprioritet
# gateway/src/config/chain.py
from collections import ChainMap
def build_effective_config(
db_config: dict,
env_config: dict,
defaults: dict,
) -> dict:
"""
ChainMap: Første dict har højeste prioritet.
Opslag søger i rækkefølge: db → env → defaults
Eksempel:
db_config = {"MQTT_HOST": "178.105.90.8", "LOG_LEVEL": "DEBUG"}
env_config = {"LOG_LEVEL": "INFO"}
defaults = {"MQTT_PORT": 8883, "LOG_LEVEL": "WARNING", "WMBUS_MODE": "c1"}
Resultat:
MQTT_HOST: "178.105.90.8" (fra db)
LOG_LEVEL: "DEBUG" (db overstyrer env og defaults)
MQTT_PORT: 8883 (fra defaults — mangler i db og env)
WMBUS_MODE: "c1" (fra defaults)
"""
return dict(ChainMap(db_config, env_config, defaults))
Konklusion
collections løser hverdagslige mønstre mere elegant end plain dict/list: defaultdict eliminerer KeyError ved aggregering, Counter giver statistik med most_common(), deque er O(1) ved begge ender (modsat list), NamedTuple giver strukturerede immutable records med type hints, og ChainMap håndterer konfigurationsprioritet rent. Alle seks typer er brugt i gatewayen og server-backendet.
Se Python functools guide eller Python asyncio patterns guide.