Python asyncio — mønstre til IoT og realtidssystemer
Asyncio mønstre til IoT: TaskGroup, Queue producer-consumer, asyncio.Event koordinering, graceful shutdown, timeout via asyncio.wait_for, structured concurrency og test med pytest-asyncio.
Af M-Bus Gateway
Gateway-software på Raspberry Pi kræver concurrent IO: wM-Bus lytning, MQTT send, heartbeat og watchdog kører parallelt. Her er asyncio-mønstrene der virker i produktion.
TaskGroup — structured concurrency
# gateway/src/main.py
# TaskGroup er det moderne alternativ til gather() — fejl propageres korrekt
import asyncio
from gateway.src.wmbus.listener import run_wmbus_listener
from gateway.src.mqtt.client import run_mqtt_client
from gateway.src.api.server import run_api_server
from gateway.src.watchdog import feed_watchdog
async def main() -> None:
"""
Kør alle gateway-tasks parallelt.
TaskGroup: Hvis én task fejler, cancelles resten automatisk.
Python 3.11+ kræves for TaskGroup.
"""
async with asyncio.TaskGroup() as tg:
tg.create_task(run_wmbus_listener(), name="wmbus-listener")
tg.create_task(run_mqtt_client(), name="mqtt-client")
tg.create_task(run_api_server(), name="api-server")
tg.create_task(feed_watchdog(), name="watchdog")
# Alle tasks er afsluttet eller annulleret
if __name__ == "__main__":
asyncio.run(main())
Queue — producer-consumer til wM-Bus telegrammer
# gateway/src/wmbus/pipeline.py
# Producent (wmbusmeters) → Queue → Konsument (parser + storage)
import asyncio
import json
from dataclasses import dataclass
from datetime import datetime
@dataclass
class RawTelegram:
raw_json: str
received_at: datetime
async def telegram_producer(
queue: asyncio.Queue[RawTelegram],
process: asyncio.subprocess.Process,
) -> None:
"""Læs JSON-linjer fra wmbusmeters stdout og put i queue."""
assert process.stdout is not None
async for line in process.stdout:
stripped = line.strip()
if not stripped:
continue
telegram = RawTelegram(
raw_json=stripped.decode("utf-8"),
received_at=datetime.utcnow(),
)
await queue.put(telegram)
async def telegram_consumer(
queue: asyncio.Queue[RawTelegram],
db_path: str,
) -> None:
"""Tag telegrammer fra queue og gem i SQLite."""
import aiosqlite
async with aiosqlite.connect(db_path) as db:
while True:
telegram = await queue.get()
try:
data = json.loads(telegram.raw_json)
await db.execute(
"INSERT INTO readings (meter_id, raw_json, received_at) VALUES (?, ?, ?)",
(data.get("id"), telegram.raw_json, telegram.received_at.isoformat()),
)
await db.commit()
except Exception:
pass # Log fejl men fortsæt — aldrig crash consumer
finally:
queue.task_done()
async def run_pipeline(db_path: str) -> None:
queue: asyncio.Queue[RawTelegram] = asyncio.Queue(maxsize=1000)
process = await asyncio.create_subprocess_exec(
"wmbusmeters", "--format=json", "--logtelegrams=false",
"auto:im871a:868.95M:c1t1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
async with asyncio.TaskGroup() as tg:
tg.create_task(telegram_producer(queue, process))
tg.create_task(telegram_consumer(queue, db_path))
asyncio.Event — koordinering mellem tasks
# gateway/src/coordination.py
# Event bruges til at signalere tilstand mellem tasks
import asyncio
# Globale events (oprettes én gang ved opstart)
mqtt_connected = asyncio.Event()
send_ready = asyncio.Event()
shutdown_requested = asyncio.Event()
async def mqtt_connection_task() -> None:
"""Sæt event når MQTT-forbindelse er etableret."""
import paho.mqtt.client as mqtt_client
# ... forbind til broker ...
mqtt_connected.set() # Signal til andre tasks: MQTT er klar
try:
await shutdown_requested.wait() # Vent på shutdown-signal
finally:
# ... disconnect ...
mqtt_connected.clear()
async def data_sender_task() -> None:
"""Vent på MQTT-forbindelse før første send."""
await mqtt_connected.wait() # Blokér til MQTT er klar
await send_ready.wait() # Vent til kl. 06:00 (sat af cron-task)
# Nu er det sikkert at sende
await do_send()
async def cron_task() -> None:
"""Sæt send_ready event ved planlagt sendetidspunkt."""
import asyncio
from datetime import datetime, time
while not shutdown_requested.is_set():
now = datetime.utcnow().time()
target = time(6, 0, 0) # 06:00 UTC
if now >= target and not send_ready.is_set():
send_ready.set()
await asyncio.sleep(3600) # Ikke igen i mindst 1 time
send_ready.clear()
await asyncio.sleep(60)
asyncio.wait_for — timeout på operationer
# gateway/src/mqtt/client.py
import asyncio
async def send_with_timeout(
payload: bytes,
mqtt_client,
topic: str,
timeout: float = 30.0,
) -> bool:
"""
Send MQTT-besked med timeout.
Returnerer False hvis timeout (gateway fortsætter — ikke crash).
"""
try:
await asyncio.wait_for(
mqtt_client.publish(topic, payload, qos=1),
timeout=timeout,
)
return True
except asyncio.TimeoutError:
return False
async def connect_with_retry(
host: str,
port: int,
max_attempts: int = 5,
) -> None:
"""Forbind med eksponentiel backoff og samlet timeout."""
for attempt in range(max_attempts):
backoff = min(2 ** attempt, 60) # Max 60 sekunder
try:
await asyncio.wait_for(
do_connect(host, port),
timeout=10.0,
)
return # Succes
except asyncio.TimeoutError:
if attempt < max_attempts - 1:
await asyncio.sleep(backoff)
else:
raise # Alle forsøg opbrugt
Graceful shutdown
# gateway/src/shutdown.py
import asyncio
import signal
import structlog
log = structlog.get_logger()
def setup_signal_handlers(shutdown_event: asyncio.Event) -> None:
"""
Registrer SIGTERM og SIGINT handlers.
systemd sender SIGTERM ved service stop.
"""
loop = asyncio.get_event_loop()
def handle_shutdown(sig: signal.Signals) -> None:
log.info("shutdown_signal_received", signal=sig.name)
shutdown_event.set()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, handle_shutdown, sig)
async def graceful_shutdown(
tasks: list[asyncio.Task],
timeout: float = 30.0,
) -> None:
"""
Annullér tasks og vent på at de afslutter.
Hardware watchdog fodres én gang mere under nedlukning.
"""
for task in tasks:
task.cancel()
try:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout,
)
except asyncio.TimeoutError:
log.warning("graceful_shutdown_timeout", timeout=timeout)
pytest-asyncio testing
# gateway/tests/test_pipeline.py
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from gateway.src.wmbus.pipeline import telegram_consumer, RawTelegram
from datetime import datetime
@pytest.mark.asyncio
async def test_telegram_consumer_processes_valid_telegram(tmp_path):
"""Consumer gemmer gyldigt telegram i SQLite."""
db_path = str(tmp_path / "test.db")
queue: asyncio.Queue[RawTelegram] = asyncio.Queue()
telegram = RawTelegram(
raw_json='{"id":"12345678","total_kwh":234.5,"timestamp":"2026-05-24T06:00:00Z"}',
received_at=datetime(2026, 5, 24, 6, 0, 0),
)
await queue.put(telegram)
# Stop consumer efter første element
async def consumer_with_stop():
await telegram_consumer(queue, db_path)
await asyncio.wait_for(consumer_with_stop(), timeout=1.0)
@pytest.mark.asyncio
async def test_event_coordination():
"""asyncio.Event koordinerer tasks korrekt."""
ready_event = asyncio.Event()
results = []
async def waiter():
await ready_event.wait()
results.append("waiter_done")
async def setter():
await asyncio.sleep(0.01) # Kort forsinkelse
ready_event.set()
results.append("setter_done")
async with asyncio.TaskGroup() as tg:
tg.create_task(waiter())
tg.create_task(setter())
assert results == ["setter_done", "waiter_done"]
@pytest.mark.asyncio
async def test_wait_for_timeout():
"""asyncio.wait_for rejser TimeoutError korrekt."""
async def slow_operation():
await asyncio.sleep(10)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(), timeout=0.05)
Watchdog-feeding som background task
# gateway/src/watchdog.py
# Hardware watchdog (/dev/watchdog) — skal fodres hvert 10. sekund
import asyncio
import fcntl
import structlog
log = structlog.get_logger()
WATCHDOG_IOCTL_MAGIC = 0x80045705 # WDIOC_KEEPALIVE
async def feed_watchdog(interval: float = 10.0) -> None:
"""
Fod hardware watchdog hvert 10. sekund.
Hvis denne task stopper → Pi rebooter automatisk efter 15s.
"""
try:
with open("/dev/watchdog", "wb", buffering=0) as wdog:
log.info("watchdog_started")
while True:
wdog.write(b"1") # Fod watchdog
await asyncio.sleep(interval)
except FileNotFoundError:
# /dev/watchdog ikke tilgængelig i dev-miljø — ignorer
log.debug("watchdog_not_available")
while True:
await asyncio.sleep(interval)
Konklusion
TaskGroup + Queue + asyncio.Event er tre kernedele i et robust IoT asyncio-system. TaskGroup håndterer crash-propagering korrekt, Queue dekobler producent og konsument, Event koordinerer mellem tasks uden polling. asyncio.wait_for forhindrer permanent hang ved netværksoperationer. Structured concurrency med TaskGroup er Python 3.11+ best practice.
Se Python subprocess guide eller wmbusmeters installation guide.