Python generatorer og iteratorer — memory-efficient IoT databehandling
Python generatorer til IoT-databehandling: yield, async generators, infinite streams fra MQTT, chunked database reads, lazy evaluation og sammenligning med list comprehensions.
Af M-Bus Gateway
Generatorer processerer millioner af aflæsninger uden at loade dem i RAM. Her er de praktiske mønstre til IoT-datastrømme og database-queries.
Generatorer vs. lists
# Problem: 1 million aflæsninger i RAM:
all_readings = [row for row in db.query("SELECT * FROM readings")] # 2+ GB RAM
# Løsning: Generator — processerer én ad gangen:
def reading_generator(db):
for row in db.stream("SELECT * FROM readings"):
yield Reading(**row) # Kun én reading i RAM ad gangen
# Forbrug:
for reading in reading_generator(db):
process(reading) # Process én, slip én
Async generators til MQTT-streams
# gateway/src/mqtt/listener.py
async def wmbus_telegram_stream(
device_path: str = "/dev/wmbus",
) -> AsyncIterator[dict]:
"""
Async generator: Lytter på wM-Bus telegrams uendeligt.
Klienten itererer og afbryder ved behov.
"""
proc = await asyncio.create_subprocess_exec(
"wmbusmeters", "--no-conf", "--json",
"--listento=c1t1", device_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
try:
async for line in proc.stdout:
if line:
try:
telegram = json.loads(line.decode().strip())
yield telegram
except json.JSONDecodeError:
continue # Ignorer ikke-JSON output
finally:
proc.terminate()
await proc.wait()
# Forbrug:
async def listen_and_store():
async for telegram in wmbus_telegram_stream():
reading = parse_telegram(telegram)
await store_reading(reading)
# Afbryd ved shutdown-signal:
if shutdown_event.is_set():
break
Chunked database reads
# server/src/readings/repository.py
async def stream_readings_for_settlement(
meter_installation_id: uuid.UUID,
period_start: date,
period_end: date,
session: AsyncSession,
chunk_size: int = 1000,
) -> AsyncIterator[Reading]:
"""
Stream aflæsninger i chunks — undgå at loade hele perioden i RAM.
Typisk: 365 dage × 48 målere × 2 aflæsninger/dag = 35.040 rows
"""
offset = 0
while True:
stmt = (
select(Reading)
.where(
Reading.meter_installation_id == meter_installation_id,
Reading.timestamp >= period_start,
Reading.timestamp <= period_end,
)
.order_by(Reading.timestamp)
.limit(chunk_size)
.offset(offset)
)
rows = (await session.execute(stmt)).scalars().all()
if not rows:
break
for row in rows:
yield row
if len(rows) < chunk_size:
break # Sidste chunk
offset += chunk_size
# Forbrug i afregningsmodul:
async def calculate_settlement(meter_installation_id, period, session):
total_kwh = Decimal("0")
reading_count = 0
async for reading in stream_readings_for_settlement(
meter_installation_id, period.start, period.end, session
):
total_kwh += reading.value
reading_count += 1
return total_kwh, reading_count
Generator pipelines
# Kæd generatorer som pipeline:
def raw_telegram_reader(file_path: str):
"""Kilde: Læs rå telegrams fra log-fil."""
with open(file_path) as f:
for line in f:
yield line.strip()
def parse_telegrams(raw_lines):
"""Filter: Parser JSON-linjer, springer ugyldige over."""
for line in raw_lines:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def filter_by_meter(telegrams, meter_id: str):
"""Filter: Kun telegrams fra specifik måler."""
for telegram in telegrams:
if telegram.get("id") == meter_id:
yield telegram
def extract_values(telegrams):
"""Transformer: Udtrækker kun nødvendige felter."""
for telegram in telegrams:
yield {
"meter_id": telegram["id"],
"value": telegram.get("total_m3") or telegram.get("heat_kwh"),
"timestamp": telegram.get("timestamp"),
}
# Pipeline — ét element processeres komplet fra kilde til slut:
def process_meter_log(log_path: str, meter_id: str):
pipeline = extract_values(
filter_by_meter(
parse_telegrams(
raw_telegram_reader(log_path)
),
meter_id=meter_id,
)
)
for reading in pipeline:
yield reading # Lazy — ingen af de ovenstående trin er startet endnu
itertools til aflæsningsanalyse
import itertools
from statistics import mean
# Gruppér aflæsninger efter dato:
from itertools import groupby
def group_readings_by_date(readings: list[Reading]) -> dict:
sorted_readings = sorted(readings, key=lambda r: r.timestamp.date())
result = {}
for date_key, group in groupby(sorted_readings, key=lambda r: r.timestamp.date()):
result[date_key] = list(group)
return result
# Find løb af manglende aflæsninger (> 48t gap):
def find_reading_gaps(readings: list[Reading]) -> list[tuple]:
gaps = []
for r1, r2 in itertools.pairwise(sorted(readings, key=lambda r: r.timestamp)):
hours_gap = (r2.timestamp - r1.timestamp).total_seconds() / 3600
if hours_gap > 48:
gaps.append((r1.timestamp, r2.timestamp, hours_gap))
return gaps
# Sliding window for moving average:
def moving_average(readings: list[float], window: int = 7) -> list[float]:
return [
mean(readings[i:i+window])
for i in range(len(readings) - window + 1)
]
Celery task med generator
# Batch-eksport til CSV via generator (ingen RAM-spike):
@shared_task(name="exports.readings_csv")
def export_readings_csv(property_id: str, period_start: str, period_end: str):
"""Eksportér aflæsninger til S3 som CSV via streaming generator."""
import csv
import io
from server.src.db import get_sync_session
with get_sync_session() as session:
def reading_rows():
yield ["meter_id", "timestamp", "value", "unit"] # Header
for reading in _stream_readings_sync(session, property_id,
period_start, period_end):
yield [
str(reading.meter_installation_id),
reading.timestamp.isoformat(),
str(reading.value),
reading.unit,
]
# Stream til S3 via multipart (ingen komplet CSV i RAM):
buffer = io.StringIO()
writer = csv.writer(buffer)
s3_key = f"exports/{property_id}/readings-{period_start}.csv"
with s3_multipart_writer(s3_key) as s3_writer:
for row in reading_rows():
writer.writerow(row)
if buffer.tell() > 5 * 1024 * 1024: # 5MB chunks
s3_writer.write(buffer.getvalue().encode("utf-8-sig"))
buffer.seek(0)
buffer.truncate()
if buffer.tell() > 0:
s3_writer.write(buffer.getvalue().encode("utf-8-sig"))
Konklusion
Generatorer processerer IoT-datastrømme fra MQTT og database-queries uden RAM-overhead. Async generators til wmbusmeters-subprocess giver en clean infinite stream med automatisk cleanup ved break eller exception. Pipeline-chaining med filtre og transformers holder koden modulær. itertools.pairwise er den effektive løsning til gap-detection i tidsseriedata. Celery CSV-export via streaming generator med S3 multipart-upload undgår RAM-spikes selv ved millioner af aflæsninger.