Python contextlib — avancerede context managers til IoT og SaaS
Python contextlib mønstre til IoT og SaaS: @contextmanager, AsyncExitStack, suppress, nullcontext, redirect_stdout og closing — med eksempler fra gateway, MQTT og database-håndtering.
Af M-Bus Gateway
contextlib giver kraftfulde mønstre til ressourcehåndtering. with-statements er hverdagskost, men AsyncExitStack, suppress og nullcontext løser avancerede IoT-mønstre elegant.
@contextmanager — enkel resource-håndtering
# gateway/src/mqtt/connection.py
from contextlib import contextmanager, asynccontextmanager
import paho.mqtt.client as mqtt
import structlog
log = structlog.get_logger()
@contextmanager
def mqtt_connection(host: str, port: int, client_id: str):
"""
Synkron MQTT context manager.
Garanterer disconnect ved fejl eller normal afslutning.
"""
client = mqtt.Client(client_id=client_id, clean_session=False)
try:
client.connect(host, port, keepalive=60)
client.loop_start()
log.info("mqtt_connected", host=host, port=port)
yield client
finally:
client.loop_stop()
client.disconnect()
log.info("mqtt_disconnected")
# Brug:
# with mqtt_connection("178.105.90.8", 8883, "GW-0001") as client:
# client.publish("meters/GW-0001/status", payload)
@asynccontextmanager
async def managed_db_transaction(db):
"""
Asynkron database transaction context manager.
Commit ved succes, rollback ved exception.
"""
async with db.begin() as transaction:
try:
yield db
# Commit sker automatisk ved exit fra begin()
except Exception:
await transaction.rollback()
raise
AsyncExitStack — dynamisk ressourcehåndtering
# gateway/src/main.py
# AsyncExitStack: Opret og frigiv et variabelt antal ressourcer
import asyncio
from contextlib import AsyncExitStack
from gateway.src.mqtt.client import MQTTClient
from gateway.src.wmbus.listener import WmbusListener
from gateway.src.api.server import APIServer
from gateway.src.cellular.manager import CellularManager
async def run_gateway(config: dict) -> None:
"""
AsyncExitStack håndterer lifespan for alle gateway-komponenter.
Alle ressourcer frigives i omvendt rækkefølge (LIFO) ved shutdown.
"""
async with AsyncExitStack() as stack:
# 4G-forbindelse (valgfri — kun ved SIM-kort tilgængeligt)
if config.get("use_cellular"):
cellular = await stack.enter_async_context(
CellularManager(config["apn"])
)
# wM-Bus listener (altid aktiv)
listener = await stack.enter_async_context(
WmbusListener(device="/dev/wmbus", mode="c1")
)
# MQTT klient
mqtt = await stack.enter_async_context(
MQTTClient(
host=config["mqtt_host"],
port=config["mqtt_port"],
gateway_id=config["gateway_id"],
)
)
# Lokal API server (tekniker-adgang)
api = await stack.enter_async_context(
APIServer(port=8080)
)
# Registrér cleanup callback (ingen context manager)
stack.callback(lambda: log.info("gateway_shutdown_complete"))
# Alt er startet — kør til shutdown-signal
await asyncio.get_event_loop().create_future()
# Alle ressourcer er frigivet i rækkefølge: api, mqtt, listener, cellular
suppress — ignorer forventede exceptions
# gateway/src/storage/cleanup.py
from contextlib import suppress
from pathlib import Path
def cleanup_old_logs(log_dir: Path, keep_days: int = 7) -> int:
"""
Slet gamle logfiler. Ignorer fejl hvis fil allerede er slettet.
suppress(FileNotFoundError): Race condition ved parallel cleanup.
"""
from datetime import datetime, timedelta
cutoff = datetime.utcnow() - timedelta(days=keep_days)
deleted = 0
for log_file in log_dir.glob("*.log"):
# stat() kan fejle hvis filen er slettet af anden process
with suppress(FileNotFoundError, PermissionError):
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime < cutoff:
log_file.unlink()
deleted += 1
return deleted
# Server-side: Supprimér velkendte harmløse fejl
from contextlib import suppress
import asyncio
async def send_heartbeat(mqtt_client) -> None:
"""Send heartbeat — ignorer fejl hvis MQTT midlertidigt offline."""
with suppress(Exception):
await mqtt_client.publish("meters/GW-0001/status", b'{"status": "ok"}')
# Programmet fortsætter uanset om publish lykkedes
nullcontext — betinget context manager
# server/src/tracing/context.py
from contextlib import nullcontext
from typing import AsyncContextManager
import structlog
log = structlog.get_logger()
async def process_telegram(
telegram: dict,
trace: bool = False,
) -> None:
"""
nullcontext: Brug tracing-context manager kun i trace-mode.
Undgår if/else-duplikering af den faktiske logik.
"""
# I prod (trace=False): nullcontext = ingen overhead
# I debug (trace=True): OpenTelemetry span oprettes
tracer_ctx: AsyncContextManager
if trace:
from opentelemetry import trace as otel_trace
tracer = otel_trace.get_tracer("mbus-gateway")
tracer_ctx = tracer.start_as_current_span("process_telegram")
else:
tracer_ctx = nullcontext()
async with tracer_ctx:
await parse_and_store(telegram)
# Anden typisk brug: Betinget lock
async def update_reading(reading_id: str, acquire_lock: bool = True) -> None:
lock_ctx = some_async_lock() if acquire_lock else nullcontext()
async with lock_ctx:
await do_update(reading_id)
closing — wrap objekter der mangler exit
# server/src/integrations/legacy_client.py
from contextlib import closing, asynccontextmanager
import socket
def check_mqtt_port(host: str, port: int, timeout: float = 3.0) -> bool:
"""
closing(): Wrapper til objekter der har close() men ikke __exit__.
socket-objektet har ingen context manager — closing løser det.
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(timeout)
try:
sock.connect((host, port))
return True
except (socket.timeout, ConnectionRefusedError, OSError):
return False
# Generator-baseret brug (closing wraps generator):
def read_large_csv(file_path: str):
"""Stream store CSV-filer uden at loade alt i RAM."""
import csv
with closing(open(file_path, newline="", encoding="utf-8")) as f:
reader = csv.DictReader(f)
for row in reader:
yield row
redirect_stdout — capture output fra CLI-tools
# server/src/tools/capture.py
# redirect_stdout: Fang stdout fra kode der skriver direkte til sys.stdout
from contextlib import redirect_stdout
from io import StringIO
def capture_wmbusmeters_output(args: list[str]) -> str:
"""
Kør wmbusmeters-diagnostik og fang output til streng.
Bruges i Tech PWA diagnostik-endpoint.
"""
import subprocess
output = StringIO()
# subprocess skriver til pipe (ikke stdout) — eksempel med print():
with redirect_stdout(output):
print("=== wmbusmeters diagnostik ===")
result = subprocess.run(
["wmbusmeters", "--version"],
capture_output=True,
text=True,
)
print(result.stdout)
print("Return code:", result.returncode)
return output.getvalue()
# Brug i test:
def test_distribution_summary_output():
"""Test at fordelingsresumé printes korrekt."""
from server.src.distribution.formatter import print_summary
output = StringIO()
with redirect_stdout(output):
print_summary({"U-01": "1200.00", "U-02": "980.00"})
assert "1200.00" in output.getvalue()
ExitStack — synkron variant
# server/src/export/batch_exporter.py
from contextlib import ExitStack
from pathlib import Path
import csv
import tempfile
def export_portfolio_csv(
data: list[dict],
output_dir: Path,
) -> list[Path]:
"""
ExitStack: Åbn et variabelt antal filer og luk dem alle korrekt.
Bruges ved batch-eksport til revisor: én CSV pr. ejendom.
"""
property_files: dict[str, Path] = {}
property_writers: dict[str, csv.DictWriter] = {}
output_files: list[Path] = []
with ExitStack() as stack:
for row in data:
prop_id = row["property_id"]
if prop_id not in property_files:
out_path = output_dir / f"{prop_id}.csv"
f = stack.enter_context(
open(out_path, "w", newline="", encoding="utf-8-sig")
)
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
property_files[prop_id] = out_path
property_writers[prop_id] = writer
output_files.append(out_path)
property_writers[prop_id].writerow(row)
# Alle filer er lukket korrekt — også ved exception
return output_files
Konklusion
contextlib løser fem kritiske mønstre: @asynccontextmanager til MQTT/DB lifecycle, AsyncExitStack til dynamisk antal ressourcer (variere med konfiguration), suppress til velkendte harmløse fejl, nullcontext til betingede context managers uden if/else-duplikering, og ExitStack til batch-fileksport. Alle er i brug i gateway og server.
Se Python async context managers guide eller Python asyncio patterns guide.