M-Bus Gateway
← Tilbage til blog
· Python· async· context managers· asyncio· FastAPI· SQLAlchemy· MQTT· IoT

Python async context managers — mønstre til IoT og SaaS

Async context managers i Python: asynccontextmanager decorator, __aenter__/__aexit__, database transactions, MQTT sessions, HTTP client pooling og resource cleanup.

Af M-Bus Gateway

Async context managers er fundamentale til korrekt ressourcehåndtering i asyncio-baserede IoT-platforme. Her er mønstrene brugt i M-Bus Gateway.


Grundlæggende: asynccontextmanager decorator

from contextlib import asynccontextmanager
from typing import AsyncGenerator

@asynccontextmanager
async def managed_resource(config: dict) -> AsyncGenerator[Resource, None]:
    """
    Opret ressource, yield til caller, ryd op uanset undtagelse.
    """
    resource = await Resource.create(config)
    try:
        yield resource
    finally:
        await resource.close()

# Brug:
async with managed_resource({"timeout": 30}) as r:
    await r.do_something()
# resource.close() kaldes altid — selv ved undtagelse

Database transactions med rollback

# server/src/db/session.py

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine

@asynccontextmanager
async def transaction(session: AsyncSession):
    """
    Wrap i atomisk transaktion med automatisk rollback ved fejl.
    Nested transactions bruger savepoints (SQLAlchemy håndterer det).
    """
    async with session.begin():
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        # commit() kaldes automatisk ved normal exit

# Brug i service:
async def create_property_with_units(session, property_data, units_data):
    async with transaction(session):
        prop = Property(**property_data)
        session.add(prop)
        await session.flush()  # Få prop.id uden commit

        for unit_data in units_data:
            unit = Unit(property_id=prop.id, **unit_data)
            session.add(unit)
        # Alt committes eller alt rulles tilbage

MQTT client lifecycle

# gateway/src/mqtt/client.py

import asyncio
from contextlib import asynccontextmanager
import paho.mqtt.client as mqtt

@asynccontextmanager
async def mqtt_session(host: str, port: int, client_id: str):
    """MQTT forbindelse med TLS og automatisk disconnect."""
    client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5)
    client.tls_set(
        ca_certs="/etc/mbus-gateway/ca.crt",
        certfile=f"/etc/mbus-gateway/{client_id}.crt",
        keyfile=f"/etc/mbus-gateway/{client_id}.key",
    )

    loop = asyncio.get_event_loop()
    connected = asyncio.Event()

    def on_connect(c, userdata, flags, rc, properties):
        if rc == 0:
            connected.set()

    client.on_connect = on_connect
    client.connect(host, port, keepalive=60)
    client.loop_start()

    try:
        await asyncio.wait_for(connected.wait(), timeout=10.0)
        yield client
    finally:
        client.loop_stop()
        client.disconnect()

# Brug:
async with mqtt_session("178.105.90.8", 8883, "GW-0001") as client:
    client.publish("meters/GW-0001/data", payload, qos=1)

HTTP client pool (aiohttp)

# server/src/http/client.py

import aiohttp
from contextlib import asynccontextmanager

@asynccontextmanager
async def http_client(
    timeout_seconds: int = 30,
    max_connections: int = 10,
):
    """aiohttp session med connection pool og timeout."""
    timeout = aiohttp.ClientTimeout(total=timeout_seconds)
    connector = aiohttp.TCPConnector(limit=max_connections, ssl=True)
    async with aiohttp.ClientSession(
        timeout=timeout,
        connector=connector,
        headers={"User-Agent": "mbus-gateway/1.0"},
    ) as session:
        yield session

# Brug i Celery task:
async def fetch_dmi_data(zip_code: str) -> dict:
    async with http_client(timeout_seconds=15) as session:
        async with session.get(
            f"https://dmigw.govcloud.dk/v2/climateData/collections/10kmDKhourlyValues/items",
            params={"parameter": "heating_degree_days", "period": "latest"},
            ssl=True,
        ) as response:
            response.raise_for_status()
            return await response.json()

Cloudflare Tunnel on-demand session

# gateway/src/tunnel/manager.py

import asyncio
import subprocess
from contextlib import asynccontextmanager
from datetime import datetime, timezone, timedelta

MAX_SESSION_SECONDS = 1800  # 30 min

@asynccontextmanager
async def cloudflare_tunnel_session(token: str, gateway_id: str):
    """
    Start Cloudflare Tunnel process, yield URL, stop efter timeout eller exit.
    """
    proc = await asyncio.create_subprocess_exec(
        "cloudflared", "tunnel", "run", "--token", token,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    started_at = datetime.now(timezone.utc)
    deadline = started_at + timedelta(seconds=MAX_SESSION_SECONDS)

    try:
        # Giv tunnel 10 sekunder til at starte
        await asyncio.sleep(10)
        yield {
            "gateway_id": gateway_id,
            "started_at": started_at.isoformat(),
            "expires_at": deadline.isoformat(),
        }

        # Auto-expire: vent til timeout
        remaining = (deadline - datetime.now(timezone.utc)).total_seconds()
        if remaining > 0:
            await asyncio.sleep(remaining)

    finally:
        proc.terminate()
        try:
            await asyncio.wait_for(proc.wait(), timeout=5.0)
        except asyncio.TimeoutError:
            proc.kill()

aenter / aexit klasse-baseret

# server/src/workers/batch_processor.py

class BatchProcessor:
    """
    Samler readings i buffer og flusher til DB atomisk.
    Brugt som context manager for at garantere flush ved exit.
    """

    def __init__(self, session: AsyncSession, batch_size: int = 100):
        self._session = session
        self._batch_size = batch_size
        self._buffer: list = []

    async def __aenter__(self) -> "BatchProcessor":
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
        if exc_type is None:
            # Normal exit: flush resterende buffered items
            await self._flush()
        else:
            # Fejl: discard buffer (DB transaction rulles tilbage af caller)
            self._buffer.clear()
        return False  # Propager undtagelse

    async def add(self, reading: dict) -> None:
        self._buffer.append(reading)
        if len(self._buffer) >= self._batch_size:
            await self._flush()

    async def _flush(self) -> None:
        if not self._buffer:
            return
        self._session.add_all([Reading(**r) for r in self._buffer])
        await self._session.flush()
        self._buffer.clear()

# Brug:
async with transaction(session):
    async with BatchProcessor(session, batch_size=50) as processor:
        for raw_reading in incoming_readings:
            await processor.add(raw_reading)
    # flush sker automatisk ved exit fra BatchProcessor
# commit sker automatisk ved exit fra transaction

Nested context managers med AsyncExitStack

# server/src/workers/tasks/daily_send.py

from contextlib import AsyncExitStack

async def daily_data_send(gateway_id: str) -> None:
    """
    Kombinér database session, MQTT og HTTP klient i ét atomisk flow.
    AsyncExitStack rydder op i LIFO-rækkefølge.
    """
    async with AsyncExitStack() as stack:
        session = await stack.enter_async_context(get_session())
        client = await stack.enter_async_context(
            mqtt_session(MQTT_HOST, MQTT_PORT, gateway_id)
        )
        http = await stack.enter_async_context(http_client())

        # Alle ressourcer er nu tilgængelige
        readings = await fetch_readings_for_today(session, gateway_id)
        payload = encode_payload(readings)
        client.publish(f"meters/{gateway_id}/data", payload, qos=1)

        # Rapportér success til platform API
        await http.post(
            f"{PLATFORM_URL}/api/v1/gateways/{gateway_id}/heartbeat",
            json={"status": "sent", "count": len(readings)},
        )
    # Alle ressourcer lukkes i LIFO rækkefølge: http → mqtt → session

Test med async context managers

# server/tests/conftest.py

import pytest_asyncio
from contextlib import asynccontextmanager

@pytest_asyncio.fixture
async def db_session():
    """Test fixture: session med automatisk rollback."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
        async with AsyncSession(bind=conn) as session:
            yield session
            await session.rollback()  # Aldrig commit i tests

@asynccontextmanager
async def mock_mqtt():
    """In-memory MQTT mock til unit tests."""
    messages = []
    class MockClient:
        def publish(self, topic, payload, qos=0):
            messages.append({"topic": topic, "payload": payload})
    yield MockClient(), messages

Konklusion

Async context managers via @asynccontextmanager eller __aenter__/__aexit__ garanterer ressourceoprydning selv ved exceptions. AsyncExitStack kombinerer multiple context managers i LIFO-rækkefølge. I M-Bus Gateway platformen bruges mønstret til: database transactions med rollback, MQTT session lifecycle, Cloudflare Tunnel on-demand og batch-processing med automatisk flush.

Se FastAPI multi-tenant guide eller asyncpg guide.