· Python· asyncio· TaskGroup· timeout· async· concurrency· IoT· gateway
Avancerede asyncio mønstre i Python — timeout, cancel og task groups
Avancerede Python asyncio mønstre: TaskGroup, timeout(), asyncio.gather fejlhåndtering, CancelledError, Semaphore til rate limiting og async generator pipelines.
Af M-Bus Gateway
Python asyncio har avancerede primitiver der gør concurrent I/O-kode mere robust. Her er de vigtigste mønstre.
TaskGroup (Python 3.11+)
# gateway/src/main.py
import asyncio
async def main() -> None:
"""
TaskGroup erstatter asyncio.gather() og giver bedre fejlhåndtering.
Hvis én task fejler, cancelles alle andre automatisk.
"""
async with asyncio.TaskGroup() as tg:
tg.create_task(run_wmbusmeters()) # wM-Bus lytning
tg.create_task(run_heartbeat()) # MQTT heartbeat hvert 5. min
tg.create_task(run_daily_sender()) # Daglig send kl. 06:00
tg.create_task(run_cmd_listener()) # MQTT kommandoer
tg.create_task(run_watchdog_feeder()) # Hardware watchdog
# Nås kun hvis en task rejser exception
# ExceptionGroup indeholder alle fejl
async def run_wmbusmeters() -> None:
"""Kørerer indefinitely — genstarter ved crash."""
while True:
try:
await start_wmbusmeters_subprocess()
except Exception as exc:
logger.error("wmbusmeters_crashed", exc_info=exc)
await asyncio.sleep(5)
asyncio.timeout (Python 3.11+)
# server/src/mqtt/subscriber.py
import asyncio
async def wait_for_gateway_ack(
gateway_id: str,
command_id: str,
timeout_seconds: float = 30.0,
) -> bool:
"""
Vent på ACK fra gateway med timeout.
asyncio.timeout() er cleaner end asyncio.wait_for().
"""
try:
async with asyncio.timeout(timeout_seconds):
ack = await ack_queue.get(f"{gateway_id}:{command_id}")
return ack.success
except asyncio.TimeoutError:
logger.warning(
"gateway_ack_timeout",
gateway_id=gateway_id,
command_id=command_id,
timeout=timeout_seconds,
)
return False
# Brug:
success = await wait_for_gateway_ack(gateway_id, cmd_id, timeout_seconds=30)
if not success:
await mark_command_failed(session, cmd_id)
gather() med fejlhåndtering
# server/src/portfolio/service.py
async def fetch_portfolio_data(
session: AsyncSession,
tenant_id: UUID,
) -> PortfolioData:
"""
Hent data fra flere sources parallelt.
return_exceptions=True: Fejl i én query crasher ikke resten.
"""
results = await asyncio.gather(
_get_property_count(session, tenant_id),
_get_unit_count(session, tenant_id),
_get_active_gateways(session, tenant_id),
_get_alarm_count(session, tenant_id),
_get_settlement_status(session, tenant_id),
return_exceptions=True,
)
# Tjek resultater individuelt:
property_count, unit_count, gateways, alarms, settlements = results
return PortfolioData(
property_count=property_count if not isinstance(property_count, Exception) else 0,
unit_count=unit_count if not isinstance(unit_count, Exception) else 0,
active_gateways=gateways if not isinstance(gateways, Exception) else 0,
alarm_count=alarms if not isinstance(alarms, Exception) else 0,
settlement_status=settlements if not isinstance(settlements, Exception) else {},
# Degraded mode: Returner hvad vi har, selvom noget fejlede
)
Semaphore til rate limiting
# server/src/workers/tasks/email.py
import asyncio
# Max 10 samtidige email-sends (Brevo rate limit: 100 req/sek)
EMAIL_SEMAPHORE = asyncio.Semaphore(10)
async def send_settlement_emails_batch(
settlements: list[Settlement],
) -> dict[str, bool]:
"""Send mange emails parallelt med rate limiting."""
async def send_one(settlement: Settlement) -> tuple[str, bool]:
async with EMAIL_SEMAPHORE:
try:
await send_settlement_email(settlement)
return str(settlement.id), True
except Exception as exc:
logger.error("email_failed", settlement_id=str(settlement.id))
return str(settlement.id), False
results = await asyncio.gather(
*[send_one(s) for s in settlements],
return_exceptions=False,
)
return dict(results)
Async generator pipeline
# gateway/src/wmbus/pipeline.py
import asyncio
from typing import AsyncIterator
async def read_telegrams(device: str) -> AsyncIterator[str]:
"""Asynkron generator: Yield ét telegram ad gangen fra wmbusmeters."""
proc = await asyncio.create_subprocess_exec(
"wmbusmeters", "--format=json", device,
stdout=asyncio.subprocess.PIPE,
)
async for line in proc.stdout:
raw = line.decode("utf-8", errors="replace").strip()
if raw.startswith("{"):
yield raw
async def parse_telegrams(
source: AsyncIterator[str],
) -> AsyncIterator[WMBusReading]:
"""Andet lag: Parse JSON."""
async for raw in source:
reading = parse_wmbus_json(raw)
if reading and reading.status == "OK":
yield reading
async def filter_known_meters(
source: AsyncIterator[WMBusReading],
known_ids: set[str],
) -> AsyncIterator[WMBusReading]:
"""Tredje lag: Filtrer ukendte målere."""
async for reading in source:
if reading.meter_id in known_ids:
yield reading
async def run_pipeline(device: str, known_ids: set[str]) -> None:
"""Kæd generators — backpressure håndteres automatisk."""
pipeline = filter_known_meters(
parse_telegrams(
read_telegrams(device)
),
known_ids,
)
async for reading in pipeline:
await store_reading(reading)
CancelledError håndtering
# gateway/src/mqtt/client.py
async def mqtt_send_loop(client: mqtt.Client, gateway_id: str) -> None:
"""
Send daglig payload kl. 06:00.
Håndter korrekt CancelledError ved shutdown.
"""
try:
while True:
# Vent til næste kl. 06:00 UTC
now = datetime.now(timezone.utc)
next_run = now.replace(hour=6, minute=0, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
wait_seconds = (next_run - now).total_seconds()
await asyncio.sleep(wait_seconds)
await send_daily_payload(client, gateway_id)
except asyncio.CancelledError:
# Ryd op inden exit:
logger.info("mqtt_send_loop_cancelled")
client.disconnect()
raise # Altid re-raise CancelledError!
Event-baseret kommunikation
# gateway/src/events.py
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class GatewayEvent:
type: str
data: Any
class EventBus:
def __init__(self):
self._queues: list[asyncio.Queue] = []
def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue()
self._queues.append(q)
return q
async def publish(self, event: GatewayEvent) -> None:
for q in self._queues:
await q.put(event)
bus = EventBus()
# Publisher (wmbusmeters listener):
async def on_reading(reading: WMBusReading) -> None:
await bus.publish(GatewayEvent(type="reading", data=reading))
# Subscriber (MQTT sender):
async def mqtt_event_handler() -> None:
q = bus.subscribe()
while True:
event = await q.get()
if event.type == "reading":
buffer.append(event.data)
Konklusion
asyncio.TaskGroup giver struktureret concurrency med automatisk cleanup ved fejl. asyncio.timeout() er renere end wait_for(). Async generator pipelines giver backpressure-kontrol. CancelledError skal altid re-raises. asyncio.Semaphore begrænser parallelle operationer til eksterne APIs.
Se Python async context managers guide eller FastAPI background tasks guide.