M-Bus Gateway
← Tilbage til blog
· Python· asyncio· TaskGroup· timeout· async· concurrency· IoT· gateway

Avancerede asyncio mønstre i Python — timeout, cancel og task groups

Avancerede Python asyncio mønstre: TaskGroup, timeout(), asyncio.gather fejlhåndtering, CancelledError, Semaphore til rate limiting og async generator pipelines.

Af M-Bus Gateway

Python asyncio har avancerede primitiver der gør concurrent I/O-kode mere robust. Her er de vigtigste mønstre.


TaskGroup (Python 3.11+)

# gateway/src/main.py

import asyncio

async def main() -> None:
    """
    TaskGroup erstatter asyncio.gather() og giver bedre fejlhåndtering.
    Hvis én task fejler, cancelles alle andre automatisk.
    """
    async with asyncio.TaskGroup() as tg:
        tg.create_task(run_wmbusmeters())      # wM-Bus lytning
        tg.create_task(run_heartbeat())         # MQTT heartbeat hvert 5. min
        tg.create_task(run_daily_sender())      # Daglig send kl. 06:00
        tg.create_task(run_cmd_listener())      # MQTT kommandoer
        tg.create_task(run_watchdog_feeder())   # Hardware watchdog

    # Nås kun hvis en task rejser exception
    # ExceptionGroup indeholder alle fejl

async def run_wmbusmeters() -> None:
    """Kørerer indefinitely — genstarter ved crash."""
    while True:
        try:
            await start_wmbusmeters_subprocess()
        except Exception as exc:
            logger.error("wmbusmeters_crashed", exc_info=exc)
            await asyncio.sleep(5)

asyncio.timeout (Python 3.11+)

# server/src/mqtt/subscriber.py

import asyncio

async def wait_for_gateway_ack(
    gateway_id: str,
    command_id: str,
    timeout_seconds: float = 30.0,
) -> bool:
    """
    Vent på ACK fra gateway med timeout.
    asyncio.timeout() er cleaner end asyncio.wait_for().
    """
    try:
        async with asyncio.timeout(timeout_seconds):
            ack = await ack_queue.get(f"{gateway_id}:{command_id}")
            return ack.success
    except asyncio.TimeoutError:
        logger.warning(
            "gateway_ack_timeout",
            gateway_id=gateway_id,
            command_id=command_id,
            timeout=timeout_seconds,
        )
        return False

# Brug:
success = await wait_for_gateway_ack(gateway_id, cmd_id, timeout_seconds=30)
if not success:
    await mark_command_failed(session, cmd_id)

gather() med fejlhåndtering

# server/src/portfolio/service.py

async def fetch_portfolio_data(
    session: AsyncSession,
    tenant_id: UUID,
) -> PortfolioData:
    """
    Hent data fra flere sources parallelt.
    return_exceptions=True: Fejl i én query crasher ikke resten.
    """
    results = await asyncio.gather(
        _get_property_count(session, tenant_id),
        _get_unit_count(session, tenant_id),
        _get_active_gateways(session, tenant_id),
        _get_alarm_count(session, tenant_id),
        _get_settlement_status(session, tenant_id),
        return_exceptions=True,
    )

    # Tjek resultater individuelt:
    property_count, unit_count, gateways, alarms, settlements = results

    return PortfolioData(
        property_count=property_count if not isinstance(property_count, Exception) else 0,
        unit_count=unit_count if not isinstance(unit_count, Exception) else 0,
        active_gateways=gateways if not isinstance(gateways, Exception) else 0,
        alarm_count=alarms if not isinstance(alarms, Exception) else 0,
        settlement_status=settlements if not isinstance(settlements, Exception) else {},
        # Degraded mode: Returner hvad vi har, selvom noget fejlede
    )

Semaphore til rate limiting

# server/src/workers/tasks/email.py

import asyncio

# Max 10 samtidige email-sends (Brevo rate limit: 100 req/sek)
EMAIL_SEMAPHORE = asyncio.Semaphore(10)

async def send_settlement_emails_batch(
    settlements: list[Settlement],
) -> dict[str, bool]:
    """Send mange emails parallelt med rate limiting."""

    async def send_one(settlement: Settlement) -> tuple[str, bool]:
        async with EMAIL_SEMAPHORE:
            try:
                await send_settlement_email(settlement)
                return str(settlement.id), True
            except Exception as exc:
                logger.error("email_failed", settlement_id=str(settlement.id))
                return str(settlement.id), False

    results = await asyncio.gather(
        *[send_one(s) for s in settlements],
        return_exceptions=False,
    )
    return dict(results)

Async generator pipeline

# gateway/src/wmbus/pipeline.py

import asyncio
from typing import AsyncIterator

async def read_telegrams(device: str) -> AsyncIterator[str]:
    """Asynkron generator: Yield ét telegram ad gangen fra wmbusmeters."""
    proc = await asyncio.create_subprocess_exec(
        "wmbusmeters", "--format=json", device,
        stdout=asyncio.subprocess.PIPE,
    )
    async for line in proc.stdout:
        raw = line.decode("utf-8", errors="replace").strip()
        if raw.startswith("{"):
            yield raw

async def parse_telegrams(
    source: AsyncIterator[str],
) -> AsyncIterator[WMBusReading]:
    """Andet lag: Parse JSON."""
    async for raw in source:
        reading = parse_wmbus_json(raw)
        if reading and reading.status == "OK":
            yield reading

async def filter_known_meters(
    source: AsyncIterator[WMBusReading],
    known_ids: set[str],
) -> AsyncIterator[WMBusReading]:
    """Tredje lag: Filtrer ukendte målere."""
    async for reading in source:
        if reading.meter_id in known_ids:
            yield reading

async def run_pipeline(device: str, known_ids: set[str]) -> None:
    """Kæd generators — backpressure håndteres automatisk."""
    pipeline = filter_known_meters(
        parse_telegrams(
            read_telegrams(device)
        ),
        known_ids,
    )
    async for reading in pipeline:
        await store_reading(reading)

CancelledError håndtering

# gateway/src/mqtt/client.py

async def mqtt_send_loop(client: mqtt.Client, gateway_id: str) -> None:
    """
    Send daglig payload kl. 06:00.
    Håndter korrekt CancelledError ved shutdown.
    """
    try:
        while True:
            # Vent til næste kl. 06:00 UTC
            now = datetime.now(timezone.utc)
            next_run = now.replace(hour=6, minute=0, second=0, microsecond=0)
            if next_run <= now:
                next_run += timedelta(days=1)

            wait_seconds = (next_run - now).total_seconds()
            await asyncio.sleep(wait_seconds)

            await send_daily_payload(client, gateway_id)

    except asyncio.CancelledError:
        # Ryd op inden exit:
        logger.info("mqtt_send_loop_cancelled")
        client.disconnect()
        raise  # Altid re-raise CancelledError!

Event-baseret kommunikation

# gateway/src/events.py

import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class GatewayEvent:
    type: str
    data: Any

class EventBus:
    def __init__(self):
        self._queues: list[asyncio.Queue] = []

    def subscribe(self) -> asyncio.Queue:
        q: asyncio.Queue = asyncio.Queue()
        self._queues.append(q)
        return q

    async def publish(self, event: GatewayEvent) -> None:
        for q in self._queues:
            await q.put(event)

bus = EventBus()

# Publisher (wmbusmeters listener):
async def on_reading(reading: WMBusReading) -> None:
    await bus.publish(GatewayEvent(type="reading", data=reading))

# Subscriber (MQTT sender):
async def mqtt_event_handler() -> None:
    q = bus.subscribe()
    while True:
        event = await q.get()
        if event.type == "reading":
            buffer.append(event.data)

Konklusion

asyncio.TaskGroup giver struktureret concurrency med automatisk cleanup ved fejl. asyncio.timeout() er renere end wait_for(). Async generator pipelines giver backpressure-kontrol. CancelledError skal altid re-raises. asyncio.Semaphore begrænser parallelle operationer til eksterne APIs.

Se Python async context managers guide eller FastAPI background tasks guide.