M-Bus Gateway
← Tilbage til blog
· python· itertools· groupby· chain· islice· pairwise· IoT· SaaS· backend· data

Python itertools — effektive datamønstre til IoT og SaaS backends

Python itertools til IoT og SaaS: groupby til HCA-gruppering, chain til multi-property queries, islice til paginering, pairwise til forbrugsdelta, product til batch-kombinationer og accumulate til kumulativt forbrug.

Af M-Bus Gateway

itertools er Pythons verktøjskasse til effektiv iteration — lazy evaluation og ingen unødvendig kopiering. Her er mønstrene der bruges i IoT-data processing og SaaS-backends.


groupby — gruppér HCA-aflæsninger

# server/src/distribution/grouping.py
from itertools import groupby
from operator import attrgetter, itemgetter


def group_readings_by_property(readings: list[dict]) -> dict[str, list[dict]]:
    """
    Gruppér aflæsninger pr. property_id.
    VIGTIGT: groupby kræver sorteret input på grupperingsnøglen.
    """
    # Sorter først — groupby virker kun på konsekutive grupper
    sorted_readings = sorted(readings, key=itemgetter("property_id"))

    result: dict[str, list[dict]] = {}
    for property_id, group in groupby(sorted_readings, key=itemgetter("property_id")):
        result[property_id] = list(group)

    return result


def group_alarms_by_type_and_gateway(
    alarms: list[dict],
) -> dict[tuple[str, str], list[dict]]:
    """
    Multi-nøgle gruppering: (alarm_type, gateway_id).
    Nyttigt til alarm-statistik pr. gateway pr. type.
    """
    sorted_alarms = sorted(
        alarms,
        key=lambda a: (a["alarm_type"], a["gateway_id"])
    )

    result: dict[tuple[str, str], list[dict]] = {}
    for (alarm_type, gateway_id), group in groupby(
        sorted_alarms,
        key=lambda a: (a["alarm_type"], a["gateway_id"])
    ):
        result[(alarm_type, gateway_id)] = list(group)

    return result

chain — kombiner multiple data-kilder

# server/src/queries/multi_property.py
from itertools import chain
from typing import Iterable


def get_all_active_readings(
    property_ids: list[str],
    db_session,
) -> Iterable[dict]:
    """
    Kombiner aflæsninger fra multiple ejendomme til én strøm.
    chain(): Lazy — behandler én generator ad gangen, ingen kopi.
    """
    generators = [
        fetch_property_readings(pid, db_session)
        for pid in property_ids
    ]
    # chain.from_iterable er mere effektiv end chain(*generators)
    return chain.from_iterable(generators)


def merge_alarm_sources(*alarm_lists: list[dict]) -> list[dict]:
    """
    Sammensæt alarmer fra gateway-MQTT, OCR-scan og manuel indtastning.
    chain(): Ingen liste-kopiering ved sammensætning.
    """
    all_alarms = list(chain(*alarm_lists))
    return sorted(all_alarms, key=itemgetter("timestamp"), reverse=True)


from itertools import chain as ichain


def get_settlement_lines_all_types(settlement_id: str, db) -> list[dict]:
    """
    Kombiner varme-, vand- og el-afregningslinjer til én liste.
    Nyttigt til PDF-generering der viser alle energityper.
    """
    heat_lines = fetch_lines(settlement_id, "heat", db)
    water_lines = fetch_lines(settlement_id, "water", db)
    elec_lines = fetch_lines(settlement_id, "electricity", db)
    return list(ichain(heat_lines, water_lines, elec_lines))

islice — paginering og lazy slicing

# server/src/api/pagination.py
from itertools import islice
from typing import Iterator, TypeVar

T = TypeVar("T")


def paginate_iterable(
    iterable: Iterator[T],
    page: int,
    page_size: int,
) -> list[T]:
    """
    Offset-baseret paginering uden at loade hele datasættet.
    islice(it, start, stop): Skip N elementer, tag næste M.
    Nyttigt ved generator-baserede DB-queries.
    """
    start = (page - 1) * page_size
    stop = start + page_size
    return list(islice(iterable, start, stop))


def first_n_readings_above_threshold(
    readings: Iterator[dict],
    threshold_kwh: float,
    n: int = 10,
) -> list[dict]:
    """
    Find de første N aflæsninger over en grænse.
    islice på en filtreret generator — stopper tidligt.
    """
    filtered = (r for r in readings if r["total_kwh"] > threshold_kwh)
    return list(islice(filtered, n))

pairwise — forbrugsdelta mellem aflæsninger

# server/src/analytics/consumption_delta.py
from itertools import pairwise    # Python 3.10+
from decimal import Decimal


def calculate_monthly_deltas(
    readings: list[dict],
) -> list[dict]:
    """
    pairwise([A, B, C, D]) → [(A,B), (B,C), (C,D)]
    Beregn forbrug pr. periode = Slut - Start.
    Kræver at readings er sorteret kronologisk.
    """
    sorted_readings = sorted(readings, key=lambda r: r["timestamp"])

    deltas = []
    for prev, curr in pairwise(sorted_readings):
        delta_kwh = Decimal(str(curr["total_kwh"])) - Decimal(str(prev["total_kwh"]))

        # Negativ delta = backwards alarm
        status = "ok" if delta_kwh >= 0 else "backwards"

        deltas.append({
            "meter_id": curr["meter_id"],
            "period_start": prev["timestamp"],
            "period_end": curr["timestamp"],
            "delta_kwh": delta_kwh,
            "status": status,
        })

    return deltas


# Python 3.9 og tidligere — manuel pairwise:
def manual_pairwise(iterable):
    """Fallback til Python < 3.10."""
    it = iter(iterable)
    prev = next(it)
    for curr in it:
        yield prev, curr
        prev = curr

accumulate — kumulativt forbrug

# server/src/analytics/cumulative.py
from itertools import accumulate
from decimal import Decimal
import operator


def cumulative_consumption(
    monthly_deltas: list[Decimal],
) -> list[Decimal]:
    """
    accumulate([1, 2, 3, 4]) → [1, 3, 6, 10]
    Beregn akkumuleret forbrug måned for måned.
    Bruges til forbrugsgrafer i lejer-portalen (EU EED 2027).
    """
    return list(accumulate(monthly_deltas, operator.add))


def cumulative_cost_with_running_average(
    monthly_costs: list[Decimal],
) -> list[tuple[Decimal, Decimal]]:
    """
    Returnér (kumulativ sum, løbende gennemsnit) pr. måned.
    """
    totals = list(accumulate(monthly_costs))
    return [
        (total, total / Decimal(i + 1))
        for i, total in enumerate(totals)
    ]

product — batch-kombinationer

# server/src/settlement/batch_planner.py
from itertools import product


def plan_settlement_batch(
    property_ids: list[str],
    period_ends: list[str],    # ["2026-05-31", "2025-05-31"]
) -> list[tuple[str, str]]:
    """
    Kartesisk produkt: Alle kombinationer af ejendom × periode.
    Bruges til at planlægge batch-generering ved bulk-kørsel.

    product(["P1","P2"], ["2026","2025"]):
    → [("P1","2026"), ("P1","2025"), ("P2","2026"), ("P2","2025")]
    """
    return list(product(property_ids, period_ends))


def generate_gateway_test_configs(
    hosts: list[str],
    ports: list[int],
    modes: list[str],
) -> list[dict]:
    """
    Generer testmatriks for gateway-konfigurationer.
    Nyttigt til CI-tests der dækker alle kombinationer.
    """
    return [
        {"host": h, "port": p, "wmbus_mode": m}
        for h, p, m in product(hosts, ports, modes)
    ]

takewhile og dropwhile — betinget iteration

# server/src/analytics/window.py
from itertools import takewhile, dropwhile


def readings_until_anomaly(
    readings: list[dict],
) -> list[dict]:
    """
    takewhile: Tag aflæsninger så længe ingen anomali detekteres.
    Stopper ved første anomali.
    """
    return list(takewhile(
        lambda r: r.get("status") == "ok",
        readings
    ))


def readings_after_installation(
    readings: list[dict],
    installation_date: str,
) -> list[dict]:
    """
    dropwhile: Skip aflæsninger indtil installationsdato er nået.
    Bruges til at filtrere historik korrekt ved målerskift.
    """
    return list(dropwhile(
        lambda r: r["timestamp"] < installation_date,
        readings
    ))

Konklusion

itertools giver lazy, hukommelseseffektiv iteration: groupby til HCA-gruppering (kræver sorteret input), chain.from_iterable til multi-property-queries, islice til paginering af generatorer, pairwise til forbrugsdelta (Python 3.10+), accumulate til EU EED-forbrugskurver og product til batch-planlægning. Alle er O(1) hukommelsesforbrug ved iteration — kritisk for store porteføljer.

Se Python collections guide eller Python functools guide.