M-Bus Gateway
← Tilbage til blog
· Python· generator· iterator· async· IoT· MQTT· memory· backend

Python generatorer og iteratorer — memory-efficient IoT databehandling

Python generatorer til IoT-databehandling: yield, async generators, infinite streams fra MQTT, chunked database reads, lazy evaluation og sammenligning med list comprehensions.

Af M-Bus Gateway

Generatorer processerer millioner af aflæsninger uden at loade dem i RAM. Her er de praktiske mønstre til IoT-datastrømme og database-queries.


Generatorer vs. lists

# Problem: 1 million aflæsninger i RAM:
all_readings = [row for row in db.query("SELECT * FROM readings")]  # 2+ GB RAM

# Løsning: Generator — processerer én ad gangen:
def reading_generator(db):
    for row in db.stream("SELECT * FROM readings"):
        yield Reading(**row)       # Kun én reading i RAM ad gangen

# Forbrug:
for reading in reading_generator(db):
    process(reading)              # Process én, slip én

Async generators til MQTT-streams

# gateway/src/mqtt/listener.py

async def wmbus_telegram_stream(
    device_path: str = "/dev/wmbus",
) -> AsyncIterator[dict]:
    """
    Async generator: Lytter på wM-Bus telegrams uendeligt.
    Klienten itererer og afbryder ved behov.
    """
    proc = await asyncio.create_subprocess_exec(
        "wmbusmeters", "--no-conf", "--json",
        "--listento=c1t1", device_path,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.DEVNULL,
    )

    try:
        async for line in proc.stdout:
            if line:
                try:
                    telegram = json.loads(line.decode().strip())
                    yield telegram
                except json.JSONDecodeError:
                    continue    # Ignorer ikke-JSON output
    finally:
        proc.terminate()
        await proc.wait()


# Forbrug:
async def listen_and_store():
    async for telegram in wmbus_telegram_stream():
        reading = parse_telegram(telegram)
        await store_reading(reading)
        
        # Afbryd ved shutdown-signal:
        if shutdown_event.is_set():
            break

Chunked database reads

# server/src/readings/repository.py

async def stream_readings_for_settlement(
    meter_installation_id: uuid.UUID,
    period_start: date,
    period_end: date,
    session: AsyncSession,
    chunk_size: int = 1000,
) -> AsyncIterator[Reading]:
    """
    Stream aflæsninger i chunks — undgå at loade hele perioden i RAM.
    Typisk: 365 dage × 48 målere × 2 aflæsninger/dag = 35.040 rows
    """
    offset = 0

    while True:
        stmt = (
            select(Reading)
            .where(
                Reading.meter_installation_id == meter_installation_id,
                Reading.timestamp >= period_start,
                Reading.timestamp <= period_end,
            )
            .order_by(Reading.timestamp)
            .limit(chunk_size)
            .offset(offset)
        )

        rows = (await session.execute(stmt)).scalars().all()

        if not rows:
            break

        for row in rows:
            yield row

        if len(rows) < chunk_size:
            break    # Sidste chunk

        offset += chunk_size


# Forbrug i afregningsmodul:
async def calculate_settlement(meter_installation_id, period, session):
    total_kwh = Decimal("0")
    reading_count = 0

    async for reading in stream_readings_for_settlement(
        meter_installation_id, period.start, period.end, session
    ):
        total_kwh += reading.value
        reading_count += 1

    return total_kwh, reading_count

Generator pipelines

# Kæd generatorer som pipeline:

def raw_telegram_reader(file_path: str):
    """Kilde: Læs rå telegrams fra log-fil."""
    with open(file_path) as f:
        for line in f:
            yield line.strip()


def parse_telegrams(raw_lines):
    """Filter: Parser JSON-linjer, springer ugyldige over."""
    for line in raw_lines:
        try:
            yield json.loads(line)
        except json.JSONDecodeError:
            continue


def filter_by_meter(telegrams, meter_id: str):
    """Filter: Kun telegrams fra specifik måler."""
    for telegram in telegrams:
        if telegram.get("id") == meter_id:
            yield telegram


def extract_values(telegrams):
    """Transformer: Udtrækker kun nødvendige felter."""
    for telegram in telegrams:
        yield {
            "meter_id": telegram["id"],
            "value": telegram.get("total_m3") or telegram.get("heat_kwh"),
            "timestamp": telegram.get("timestamp"),
        }


# Pipeline — ét element processeres komplet fra kilde til slut:
def process_meter_log(log_path: str, meter_id: str):
    pipeline = extract_values(
        filter_by_meter(
            parse_telegrams(
                raw_telegram_reader(log_path)
            ),
            meter_id=meter_id,
        )
    )

    for reading in pipeline:
        yield reading   # Lazy — ingen af de ovenstående trin er startet endnu

itertools til aflæsningsanalyse

import itertools
from statistics import mean

# Gruppér aflæsninger efter dato:
from itertools import groupby

def group_readings_by_date(readings: list[Reading]) -> dict:
    sorted_readings = sorted(readings, key=lambda r: r.timestamp.date())
    result = {}
    for date_key, group in groupby(sorted_readings, key=lambda r: r.timestamp.date()):
        result[date_key] = list(group)
    return result


# Find løb af manglende aflæsninger (> 48t gap):
def find_reading_gaps(readings: list[Reading]) -> list[tuple]:
    gaps = []
    for r1, r2 in itertools.pairwise(sorted(readings, key=lambda r: r.timestamp)):
        hours_gap = (r2.timestamp - r1.timestamp).total_seconds() / 3600
        if hours_gap > 48:
            gaps.append((r1.timestamp, r2.timestamp, hours_gap))
    return gaps


# Sliding window for moving average:
def moving_average(readings: list[float], window: int = 7) -> list[float]:
    return [
        mean(readings[i:i+window])
        for i in range(len(readings) - window + 1)
    ]

Celery task med generator

# Batch-eksport til CSV via generator (ingen RAM-spike):

@shared_task(name="exports.readings_csv")
def export_readings_csv(property_id: str, period_start: str, period_end: str):
    """Eksportér aflæsninger til S3 som CSV via streaming generator."""
    import csv
    import io

    from server.src.db import get_sync_session

    with get_sync_session() as session:
        def reading_rows():
            yield ["meter_id", "timestamp", "value", "unit"]   # Header
            for reading in _stream_readings_sync(session, property_id,
                                                  period_start, period_end):
                yield [
                    str(reading.meter_installation_id),
                    reading.timestamp.isoformat(),
                    str(reading.value),
                    reading.unit,
                ]

        # Stream til S3 via multipart (ingen komplet CSV i RAM):
        buffer = io.StringIO()
        writer = csv.writer(buffer)

        s3_key = f"exports/{property_id}/readings-{period_start}.csv"
        with s3_multipart_writer(s3_key) as s3_writer:
            for row in reading_rows():
                writer.writerow(row)
                if buffer.tell() > 5 * 1024 * 1024:    # 5MB chunks
                    s3_writer.write(buffer.getvalue().encode("utf-8-sig"))
                    buffer.seek(0)
                    buffer.truncate()

            if buffer.tell() > 0:
                s3_writer.write(buffer.getvalue().encode("utf-8-sig"))

Konklusion

Generatorer processerer IoT-datastrømme fra MQTT og database-queries uden RAM-overhead. Async generators til wmbusmeters-subprocess giver en clean infinite stream med automatisk cleanup ved break eller exception. Pipeline-chaining med filtre og transformers holder koden modulær. itertools.pairwise er den effektive løsning til gap-detection i tidsseriedata. Celery CSV-export via streaming generator med S3 multipart-upload undgår RAM-spikes selv ved millioner af aflæsninger.

Se asyncpg guide eller Celery Redis guide.