Python itertools — effektive datamønstre til IoT og SaaS backends
Python itertools til IoT og SaaS: groupby til HCA-gruppering, chain til multi-property queries, islice til paginering, pairwise til forbrugsdelta, product til batch-kombinationer og accumulate til kumulativt forbrug.
Af M-Bus Gateway
itertools er Pythons verktøjskasse til effektiv iteration — lazy evaluation og ingen unødvendig kopiering. Her er mønstrene der bruges i IoT-data processing og SaaS-backends.
groupby — gruppér HCA-aflæsninger
# server/src/distribution/grouping.py
from itertools import groupby
from operator import attrgetter, itemgetter
def group_readings_by_property(readings: list[dict]) -> dict[str, list[dict]]:
"""
Gruppér aflæsninger pr. property_id.
VIGTIGT: groupby kræver sorteret input på grupperingsnøglen.
"""
# Sorter først — groupby virker kun på konsekutive grupper
sorted_readings = sorted(readings, key=itemgetter("property_id"))
result: dict[str, list[dict]] = {}
for property_id, group in groupby(sorted_readings, key=itemgetter("property_id")):
result[property_id] = list(group)
return result
def group_alarms_by_type_and_gateway(
alarms: list[dict],
) -> dict[tuple[str, str], list[dict]]:
"""
Multi-nøgle gruppering: (alarm_type, gateway_id).
Nyttigt til alarm-statistik pr. gateway pr. type.
"""
sorted_alarms = sorted(
alarms,
key=lambda a: (a["alarm_type"], a["gateway_id"])
)
result: dict[tuple[str, str], list[dict]] = {}
for (alarm_type, gateway_id), group in groupby(
sorted_alarms,
key=lambda a: (a["alarm_type"], a["gateway_id"])
):
result[(alarm_type, gateway_id)] = list(group)
return result
chain — kombiner multiple data-kilder
# server/src/queries/multi_property.py
from itertools import chain
from typing import Iterable
def get_all_active_readings(
property_ids: list[str],
db_session,
) -> Iterable[dict]:
"""
Kombiner aflæsninger fra multiple ejendomme til én strøm.
chain(): Lazy — behandler én generator ad gangen, ingen kopi.
"""
generators = [
fetch_property_readings(pid, db_session)
for pid in property_ids
]
# chain.from_iterable er mere effektiv end chain(*generators)
return chain.from_iterable(generators)
def merge_alarm_sources(*alarm_lists: list[dict]) -> list[dict]:
"""
Sammensæt alarmer fra gateway-MQTT, OCR-scan og manuel indtastning.
chain(): Ingen liste-kopiering ved sammensætning.
"""
all_alarms = list(chain(*alarm_lists))
return sorted(all_alarms, key=itemgetter("timestamp"), reverse=True)
from itertools import chain as ichain
def get_settlement_lines_all_types(settlement_id: str, db) -> list[dict]:
"""
Kombiner varme-, vand- og el-afregningslinjer til én liste.
Nyttigt til PDF-generering der viser alle energityper.
"""
heat_lines = fetch_lines(settlement_id, "heat", db)
water_lines = fetch_lines(settlement_id, "water", db)
elec_lines = fetch_lines(settlement_id, "electricity", db)
return list(ichain(heat_lines, water_lines, elec_lines))
islice — paginering og lazy slicing
# server/src/api/pagination.py
from itertools import islice
from typing import Iterator, TypeVar
T = TypeVar("T")
def paginate_iterable(
iterable: Iterator[T],
page: int,
page_size: int,
) -> list[T]:
"""
Offset-baseret paginering uden at loade hele datasættet.
islice(it, start, stop): Skip N elementer, tag næste M.
Nyttigt ved generator-baserede DB-queries.
"""
start = (page - 1) * page_size
stop = start + page_size
return list(islice(iterable, start, stop))
def first_n_readings_above_threshold(
readings: Iterator[dict],
threshold_kwh: float,
n: int = 10,
) -> list[dict]:
"""
Find de første N aflæsninger over en grænse.
islice på en filtreret generator — stopper tidligt.
"""
filtered = (r for r in readings if r["total_kwh"] > threshold_kwh)
return list(islice(filtered, n))
pairwise — forbrugsdelta mellem aflæsninger
# server/src/analytics/consumption_delta.py
from itertools import pairwise # Python 3.10+
from decimal import Decimal
def calculate_monthly_deltas(
readings: list[dict],
) -> list[dict]:
"""
pairwise([A, B, C, D]) → [(A,B), (B,C), (C,D)]
Beregn forbrug pr. periode = Slut - Start.
Kræver at readings er sorteret kronologisk.
"""
sorted_readings = sorted(readings, key=lambda r: r["timestamp"])
deltas = []
for prev, curr in pairwise(sorted_readings):
delta_kwh = Decimal(str(curr["total_kwh"])) - Decimal(str(prev["total_kwh"]))
# Negativ delta = backwards alarm
status = "ok" if delta_kwh >= 0 else "backwards"
deltas.append({
"meter_id": curr["meter_id"],
"period_start": prev["timestamp"],
"period_end": curr["timestamp"],
"delta_kwh": delta_kwh,
"status": status,
})
return deltas
# Python 3.9 og tidligere — manuel pairwise:
def manual_pairwise(iterable):
"""Fallback til Python < 3.10."""
it = iter(iterable)
prev = next(it)
for curr in it:
yield prev, curr
prev = curr
accumulate — kumulativt forbrug
# server/src/analytics/cumulative.py
from itertools import accumulate
from decimal import Decimal
import operator
def cumulative_consumption(
monthly_deltas: list[Decimal],
) -> list[Decimal]:
"""
accumulate([1, 2, 3, 4]) → [1, 3, 6, 10]
Beregn akkumuleret forbrug måned for måned.
Bruges til forbrugsgrafer i lejer-portalen (EU EED 2027).
"""
return list(accumulate(monthly_deltas, operator.add))
def cumulative_cost_with_running_average(
monthly_costs: list[Decimal],
) -> list[tuple[Decimal, Decimal]]:
"""
Returnér (kumulativ sum, løbende gennemsnit) pr. måned.
"""
totals = list(accumulate(monthly_costs))
return [
(total, total / Decimal(i + 1))
for i, total in enumerate(totals)
]
product — batch-kombinationer
# server/src/settlement/batch_planner.py
from itertools import product
def plan_settlement_batch(
property_ids: list[str],
period_ends: list[str], # ["2026-05-31", "2025-05-31"]
) -> list[tuple[str, str]]:
"""
Kartesisk produkt: Alle kombinationer af ejendom × periode.
Bruges til at planlægge batch-generering ved bulk-kørsel.
product(["P1","P2"], ["2026","2025"]):
→ [("P1","2026"), ("P1","2025"), ("P2","2026"), ("P2","2025")]
"""
return list(product(property_ids, period_ends))
def generate_gateway_test_configs(
hosts: list[str],
ports: list[int],
modes: list[str],
) -> list[dict]:
"""
Generer testmatriks for gateway-konfigurationer.
Nyttigt til CI-tests der dækker alle kombinationer.
"""
return [
{"host": h, "port": p, "wmbus_mode": m}
for h, p, m in product(hosts, ports, modes)
]
takewhile og dropwhile — betinget iteration
# server/src/analytics/window.py
from itertools import takewhile, dropwhile
def readings_until_anomaly(
readings: list[dict],
) -> list[dict]:
"""
takewhile: Tag aflæsninger så længe ingen anomali detekteres.
Stopper ved første anomali.
"""
return list(takewhile(
lambda r: r.get("status") == "ok",
readings
))
def readings_after_installation(
readings: list[dict],
installation_date: str,
) -> list[dict]:
"""
dropwhile: Skip aflæsninger indtil installationsdato er nået.
Bruges til at filtrere historik korrekt ved målerskift.
"""
return list(dropwhile(
lambda r: r["timestamp"] < installation_date,
readings
))
Konklusion
itertools giver lazy, hukommelseseffektiv iteration: groupby til HCA-gruppering (kræver sorteret input), chain.from_iterable til multi-property-queries, islice til paginering af generatorer, pairwise til forbrugsdelta (Python 3.10+), accumulate til EU EED-forbrugskurver og product til batch-planlægning. Alle er O(1) hukommelsesforbrug ved iteration — kritisk for store porteføljer.