M-Bus Gateway
← Tilbage til blog
· python· asyncio· IoT· concurrency· TaskGroup· Queue· graceful-shutdown· pytest-asyncio

Python asyncio — mønstre til IoT og realtidssystemer

Asyncio mønstre til IoT: TaskGroup, Queue producer-consumer, asyncio.Event koordinering, graceful shutdown, timeout via asyncio.wait_for, structured concurrency og test med pytest-asyncio.

Af M-Bus Gateway

Gateway-software på Raspberry Pi kræver concurrent IO: wM-Bus lytning, MQTT send, heartbeat og watchdog kører parallelt. Her er asyncio-mønstrene der virker i produktion.


TaskGroup — structured concurrency

# gateway/src/main.py
# TaskGroup er det moderne alternativ til gather() — fejl propageres korrekt

import asyncio
from gateway.src.wmbus.listener import run_wmbus_listener
from gateway.src.mqtt.client import run_mqtt_client
from gateway.src.api.server import run_api_server
from gateway.src.watchdog import feed_watchdog


async def main() -> None:
    """
    Kør alle gateway-tasks parallelt.
    TaskGroup: Hvis én task fejler, cancelles resten automatisk.
    Python 3.11+ kræves for TaskGroup.
    """
    async with asyncio.TaskGroup() as tg:
        tg.create_task(run_wmbus_listener(), name="wmbus-listener")
        tg.create_task(run_mqtt_client(), name="mqtt-client")
        tg.create_task(run_api_server(), name="api-server")
        tg.create_task(feed_watchdog(), name="watchdog")

    # Alle tasks er afsluttet eller annulleret


if __name__ == "__main__":
    asyncio.run(main())

Queue — producer-consumer til wM-Bus telegrammer

# gateway/src/wmbus/pipeline.py
# Producent (wmbusmeters) → Queue → Konsument (parser + storage)

import asyncio
import json
from dataclasses import dataclass
from datetime import datetime


@dataclass
class RawTelegram:
    raw_json: str
    received_at: datetime


async def telegram_producer(
    queue: asyncio.Queue[RawTelegram],
    process: asyncio.subprocess.Process,
) -> None:
    """Læs JSON-linjer fra wmbusmeters stdout og put i queue."""
    assert process.stdout is not None

    async for line in process.stdout:
        stripped = line.strip()
        if not stripped:
            continue
        telegram = RawTelegram(
            raw_json=stripped.decode("utf-8"),
            received_at=datetime.utcnow(),
        )
        await queue.put(telegram)


async def telegram_consumer(
    queue: asyncio.Queue[RawTelegram],
    db_path: str,
) -> None:
    """Tag telegrammer fra queue og gem i SQLite."""
    import aiosqlite

    async with aiosqlite.connect(db_path) as db:
        while True:
            telegram = await queue.get()
            try:
                data = json.loads(telegram.raw_json)
                await db.execute(
                    "INSERT INTO readings (meter_id, raw_json, received_at) VALUES (?, ?, ?)",
                    (data.get("id"), telegram.raw_json, telegram.received_at.isoformat()),
                )
                await db.commit()
            except Exception:
                pass    # Log fejl men fortsæt — aldrig crash consumer
            finally:
                queue.task_done()


async def run_pipeline(db_path: str) -> None:
    queue: asyncio.Queue[RawTelegram] = asyncio.Queue(maxsize=1000)

    process = await asyncio.create_subprocess_exec(
        "wmbusmeters", "--format=json", "--logtelegrams=false",
        "auto:im871a:868.95M:c1t1",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.DEVNULL,
    )

    async with asyncio.TaskGroup() as tg:
        tg.create_task(telegram_producer(queue, process))
        tg.create_task(telegram_consumer(queue, db_path))

asyncio.Event — koordinering mellem tasks

# gateway/src/coordination.py
# Event bruges til at signalere tilstand mellem tasks

import asyncio

# Globale events (oprettes én gang ved opstart)
mqtt_connected = asyncio.Event()
send_ready = asyncio.Event()
shutdown_requested = asyncio.Event()


async def mqtt_connection_task() -> None:
    """Sæt event når MQTT-forbindelse er etableret."""
    import paho.mqtt.client as mqtt_client
    # ... forbind til broker ...
    mqtt_connected.set()    # Signal til andre tasks: MQTT er klar

    try:
        await shutdown_requested.wait()    # Vent på shutdown-signal
    finally:
        # ... disconnect ...
        mqtt_connected.clear()


async def data_sender_task() -> None:
    """Vent på MQTT-forbindelse før første send."""
    await mqtt_connected.wait()    # Blokér til MQTT er klar
    await send_ready.wait()        # Vent til kl. 06:00 (sat af cron-task)

    # Nu er det sikkert at sende
    await do_send()


async def cron_task() -> None:
    """Sæt send_ready event ved planlagt sendetidspunkt."""
    import asyncio
    from datetime import datetime, time

    while not shutdown_requested.is_set():
        now = datetime.utcnow().time()
        target = time(6, 0, 0)    # 06:00 UTC

        if now >= target and not send_ready.is_set():
            send_ready.set()
            await asyncio.sleep(3600)    # Ikke igen i mindst 1 time
            send_ready.clear()

        await asyncio.sleep(60)

asyncio.wait_for — timeout på operationer

# gateway/src/mqtt/client.py
import asyncio


async def send_with_timeout(
    payload: bytes,
    mqtt_client,
    topic: str,
    timeout: float = 30.0,
) -> bool:
    """
    Send MQTT-besked med timeout.
    Returnerer False hvis timeout (gateway fortsætter — ikke crash).
    """
    try:
        await asyncio.wait_for(
            mqtt_client.publish(topic, payload, qos=1),
            timeout=timeout,
        )
        return True
    except asyncio.TimeoutError:
        return False


async def connect_with_retry(
    host: str,
    port: int,
    max_attempts: int = 5,
) -> None:
    """Forbind med eksponentiel backoff og samlet timeout."""
    for attempt in range(max_attempts):
        backoff = min(2 ** attempt, 60)    # Max 60 sekunder
        try:
            await asyncio.wait_for(
                do_connect(host, port),
                timeout=10.0,
            )
            return    # Succes
        except asyncio.TimeoutError:
            if attempt < max_attempts - 1:
                await asyncio.sleep(backoff)
            else:
                raise    # Alle forsøg opbrugt

Graceful shutdown

# gateway/src/shutdown.py
import asyncio
import signal
import structlog

log = structlog.get_logger()


def setup_signal_handlers(shutdown_event: asyncio.Event) -> None:
    """
    Registrer SIGTERM og SIGINT handlers.
    systemd sender SIGTERM ved service stop.
    """
    loop = asyncio.get_event_loop()

    def handle_shutdown(sig: signal.Signals) -> None:
        log.info("shutdown_signal_received", signal=sig.name)
        shutdown_event.set()

    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, handle_shutdown, sig)


async def graceful_shutdown(
    tasks: list[asyncio.Task],
    timeout: float = 30.0,
) -> None:
    """
    Annullér tasks og vent på at de afslutter.
    Hardware watchdog fodres én gang mere under nedlukning.
    """
    for task in tasks:
        task.cancel()

    try:
        await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=timeout,
        )
    except asyncio.TimeoutError:
        log.warning("graceful_shutdown_timeout", timeout=timeout)

pytest-asyncio testing

# gateway/tests/test_pipeline.py
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from gateway.src.wmbus.pipeline import telegram_consumer, RawTelegram
from datetime import datetime


@pytest.mark.asyncio
async def test_telegram_consumer_processes_valid_telegram(tmp_path):
    """Consumer gemmer gyldigt telegram i SQLite."""
    db_path = str(tmp_path / "test.db")
    queue: asyncio.Queue[RawTelegram] = asyncio.Queue()

    telegram = RawTelegram(
        raw_json='{"id":"12345678","total_kwh":234.5,"timestamp":"2026-05-24T06:00:00Z"}',
        received_at=datetime(2026, 5, 24, 6, 0, 0),
    )
    await queue.put(telegram)

    # Stop consumer efter første element
    async def consumer_with_stop():
        await telegram_consumer(queue, db_path)

    await asyncio.wait_for(consumer_with_stop(), timeout=1.0)


@pytest.mark.asyncio
async def test_event_coordination():
    """asyncio.Event koordinerer tasks korrekt."""
    ready_event = asyncio.Event()
    results = []

    async def waiter():
        await ready_event.wait()
        results.append("waiter_done")

    async def setter():
        await asyncio.sleep(0.01)    # Kort forsinkelse
        ready_event.set()
        results.append("setter_done")

    async with asyncio.TaskGroup() as tg:
        tg.create_task(waiter())
        tg.create_task(setter())

    assert results == ["setter_done", "waiter_done"]


@pytest.mark.asyncio
async def test_wait_for_timeout():
    """asyncio.wait_for rejser TimeoutError korrekt."""
    async def slow_operation():
        await asyncio.sleep(10)

    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(slow_operation(), timeout=0.05)

Watchdog-feeding som background task

# gateway/src/watchdog.py
# Hardware watchdog (/dev/watchdog) — skal fodres hvert 10. sekund

import asyncio
import fcntl
import structlog

log = structlog.get_logger()
WATCHDOG_IOCTL_MAGIC = 0x80045705    # WDIOC_KEEPALIVE


async def feed_watchdog(interval: float = 10.0) -> None:
    """
    Fod hardware watchdog hvert 10. sekund.
    Hvis denne task stopper → Pi rebooter automatisk efter 15s.
    """
    try:
        with open("/dev/watchdog", "wb", buffering=0) as wdog:
            log.info("watchdog_started")
            while True:
                wdog.write(b"1")    # Fod watchdog
                await asyncio.sleep(interval)
    except FileNotFoundError:
        # /dev/watchdog ikke tilgængelig i dev-miljø — ignorer
        log.debug("watchdog_not_available")
        while True:
            await asyncio.sleep(interval)

Konklusion

TaskGroup + Queue + asyncio.Event er tre kernedele i et robust IoT asyncio-system. TaskGroup håndterer crash-propagering korrekt, Queue dekobler producent og konsument, Event koordinerer mellem tasks uden polling. asyncio.wait_for forhindrer permanent hang ved netværksoperationer. Structured concurrency med TaskGroup er Python 3.11+ best practice.

Se Python subprocess guide eller wmbusmeters installation guide.