· Python· async· context managers· asyncio· FastAPI· SQLAlchemy· MQTT· IoT
Python async context managers — mønstre til IoT og SaaS
Async context managers i Python: asynccontextmanager decorator, __aenter__/__aexit__, database transactions, MQTT sessions, HTTP client pooling og resource cleanup.
Af M-Bus Gateway
Async context managers er fundamentale til korrekt ressourcehåndtering i asyncio-baserede IoT-platforme. Her er mønstrene brugt i M-Bus Gateway.
Grundlæggende: asynccontextmanager decorator
from contextlib import asynccontextmanager
from typing import AsyncGenerator
@asynccontextmanager
async def managed_resource(config: dict) -> AsyncGenerator[Resource, None]:
"""
Opret ressource, yield til caller, ryd op uanset undtagelse.
"""
resource = await Resource.create(config)
try:
yield resource
finally:
await resource.close()
# Brug:
async with managed_resource({"timeout": 30}) as r:
await r.do_something()
# resource.close() kaldes altid — selv ved undtagelse
Database transactions med rollback
# server/src/db/session.py
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine
@asynccontextmanager
async def transaction(session: AsyncSession):
"""
Wrap i atomisk transaktion med automatisk rollback ved fejl.
Nested transactions bruger savepoints (SQLAlchemy håndterer det).
"""
async with session.begin():
try:
yield session
except Exception:
await session.rollback()
raise
# commit() kaldes automatisk ved normal exit
# Brug i service:
async def create_property_with_units(session, property_data, units_data):
async with transaction(session):
prop = Property(**property_data)
session.add(prop)
await session.flush() # Få prop.id uden commit
for unit_data in units_data:
unit = Unit(property_id=prop.id, **unit_data)
session.add(unit)
# Alt committes eller alt rulles tilbage
MQTT client lifecycle
# gateway/src/mqtt/client.py
import asyncio
from contextlib import asynccontextmanager
import paho.mqtt.client as mqtt
@asynccontextmanager
async def mqtt_session(host: str, port: int, client_id: str):
"""MQTT forbindelse med TLS og automatisk disconnect."""
client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5)
client.tls_set(
ca_certs="/etc/mbus-gateway/ca.crt",
certfile=f"/etc/mbus-gateway/{client_id}.crt",
keyfile=f"/etc/mbus-gateway/{client_id}.key",
)
loop = asyncio.get_event_loop()
connected = asyncio.Event()
def on_connect(c, userdata, flags, rc, properties):
if rc == 0:
connected.set()
client.on_connect = on_connect
client.connect(host, port, keepalive=60)
client.loop_start()
try:
await asyncio.wait_for(connected.wait(), timeout=10.0)
yield client
finally:
client.loop_stop()
client.disconnect()
# Brug:
async with mqtt_session("178.105.90.8", 8883, "GW-0001") as client:
client.publish("meters/GW-0001/data", payload, qos=1)
HTTP client pool (aiohttp)
# server/src/http/client.py
import aiohttp
from contextlib import asynccontextmanager
@asynccontextmanager
async def http_client(
timeout_seconds: int = 30,
max_connections: int = 10,
):
"""aiohttp session med connection pool og timeout."""
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
connector = aiohttp.TCPConnector(limit=max_connections, ssl=True)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={"User-Agent": "mbus-gateway/1.0"},
) as session:
yield session
# Brug i Celery task:
async def fetch_dmi_data(zip_code: str) -> dict:
async with http_client(timeout_seconds=15) as session:
async with session.get(
f"https://dmigw.govcloud.dk/v2/climateData/collections/10kmDKhourlyValues/items",
params={"parameter": "heating_degree_days", "period": "latest"},
ssl=True,
) as response:
response.raise_for_status()
return await response.json()
Cloudflare Tunnel on-demand session
# gateway/src/tunnel/manager.py
import asyncio
import subprocess
from contextlib import asynccontextmanager
from datetime import datetime, timezone, timedelta
MAX_SESSION_SECONDS = 1800 # 30 min
@asynccontextmanager
async def cloudflare_tunnel_session(token: str, gateway_id: str):
"""
Start Cloudflare Tunnel process, yield URL, stop efter timeout eller exit.
"""
proc = await asyncio.create_subprocess_exec(
"cloudflared", "tunnel", "run", "--token", token,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
started_at = datetime.now(timezone.utc)
deadline = started_at + timedelta(seconds=MAX_SESSION_SECONDS)
try:
# Giv tunnel 10 sekunder til at starte
await asyncio.sleep(10)
yield {
"gateway_id": gateway_id,
"started_at": started_at.isoformat(),
"expires_at": deadline.isoformat(),
}
# Auto-expire: vent til timeout
remaining = (deadline - datetime.now(timezone.utc)).total_seconds()
if remaining > 0:
await asyncio.sleep(remaining)
finally:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5.0)
except asyncio.TimeoutError:
proc.kill()
aenter / aexit klasse-baseret
# server/src/workers/batch_processor.py
class BatchProcessor:
"""
Samler readings i buffer og flusher til DB atomisk.
Brugt som context manager for at garantere flush ved exit.
"""
def __init__(self, session: AsyncSession, batch_size: int = 100):
self._session = session
self._batch_size = batch_size
self._buffer: list = []
async def __aenter__(self) -> "BatchProcessor":
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
if exc_type is None:
# Normal exit: flush resterende buffered items
await self._flush()
else:
# Fejl: discard buffer (DB transaction rulles tilbage af caller)
self._buffer.clear()
return False # Propager undtagelse
async def add(self, reading: dict) -> None:
self._buffer.append(reading)
if len(self._buffer) >= self._batch_size:
await self._flush()
async def _flush(self) -> None:
if not self._buffer:
return
self._session.add_all([Reading(**r) for r in self._buffer])
await self._session.flush()
self._buffer.clear()
# Brug:
async with transaction(session):
async with BatchProcessor(session, batch_size=50) as processor:
for raw_reading in incoming_readings:
await processor.add(raw_reading)
# flush sker automatisk ved exit fra BatchProcessor
# commit sker automatisk ved exit fra transaction
Nested context managers med AsyncExitStack
# server/src/workers/tasks/daily_send.py
from contextlib import AsyncExitStack
async def daily_data_send(gateway_id: str) -> None:
"""
Kombinér database session, MQTT og HTTP klient i ét atomisk flow.
AsyncExitStack rydder op i LIFO-rækkefølge.
"""
async with AsyncExitStack() as stack:
session = await stack.enter_async_context(get_session())
client = await stack.enter_async_context(
mqtt_session(MQTT_HOST, MQTT_PORT, gateway_id)
)
http = await stack.enter_async_context(http_client())
# Alle ressourcer er nu tilgængelige
readings = await fetch_readings_for_today(session, gateway_id)
payload = encode_payload(readings)
client.publish(f"meters/{gateway_id}/data", payload, qos=1)
# Rapportér success til platform API
await http.post(
f"{PLATFORM_URL}/api/v1/gateways/{gateway_id}/heartbeat",
json={"status": "sent", "count": len(readings)},
)
# Alle ressourcer lukkes i LIFO rækkefølge: http → mqtt → session
Test med async context managers
# server/tests/conftest.py
import pytest_asyncio
from contextlib import asynccontextmanager
@pytest_asyncio.fixture
async def db_session():
"""Test fixture: session med automatisk rollback."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(bind=conn) as session:
yield session
await session.rollback() # Aldrig commit i tests
@asynccontextmanager
async def mock_mqtt():
"""In-memory MQTT mock til unit tests."""
messages = []
class MockClient:
def publish(self, topic, payload, qos=0):
messages.append({"topic": topic, "payload": payload})
yield MockClient(), messages
Konklusion
Async context managers via @asynccontextmanager eller __aenter__/__aexit__ garanterer ressourceoprydning selv ved exceptions. AsyncExitStack kombinerer multiple context managers i LIFO-rækkefølge. I M-Bus Gateway platformen bruges mønstret til: database transactions med rollback, MQTT session lifecycle, Cloudflare Tunnel on-demand og batch-processing med automatisk flush.