Python asyncio TaskGroup — struktureret concurrency fra Python 3.11
asyncio.TaskGroup i Python 3.11: struktureret concurrency, fejlhåndtering med ExceptionGroup, vs gather/create_task, IoT gateway parallel-opgaver og FastAPI lifespan integration.
Af M-Bus Gateway
asyncio.TaskGroup (Python 3.11+) erstatter det fejlbehæftede asyncio.gather() med struktureret concurrency. Her er de praktiske mønstre til IoT-platforme.
TaskGroup vs. gather — hvorfor skifte
# GAMMELT: asyncio.gather — problematisk fejlhåndtering
results = await asyncio.gather(
fetch_gateway_data(gw1),
fetch_gateway_data(gw2),
fetch_gateway_data(gw3),
return_exceptions=True, # Skjuler fejl som resultater
)
# Problem 1: return_exceptions=True blander fejl og resultater
# Problem 2: Uden return_exceptions afbrydes alle tasks ved første fejl
# men de øvrige tasks forsætter at køre ukontrolleret
# NY: asyncio.TaskGroup — struktureret
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_gateway_data(gw1))
t2 = tg.create_task(fetch_gateway_data(gw2))
t3 = tg.create_task(fetch_gateway_data(gw3))
# Blokerer indtil alle tasks er færdige
# Ved fejl: alle tasks annulleres + ExceptionGroup raises
results = [t1.result(), t2.result(), t3.result()]
ExceptionGroup — håndter delfejl
# IoT: Rapportér data fra 10 gateways parallelt
async def report_all_gateways(gateway_ids: list[str]) -> dict:
results = {}
errors = {}
async with asyncio.TaskGroup() as tg:
tasks = {
gw_id: tg.create_task(report_single_gateway(gw_id))
for gw_id in gateway_ids
}
# Kaster ExceptionGroup hvis en eller flere tasks fejler
# BRUG try/except*: (Python 3.11 except* syntax)
try:
async with asyncio.TaskGroup() as tg:
tasks = {
gw_id: tg.create_task(report_single_gateway(gw_id))
for gw_id in gateway_ids
}
except* StripeError as eg:
for exc in eg.exceptions:
log.error("Stripe rapportering fejlet", error=str(exc))
except* MQTTError as eg:
for exc in eg.exceptions:
log.warning("MQTT fejl", error=str(exc))
return {gw_id: t.result() for gw_id, t in tasks.items()
if not t.cancelled() and t.exception() is None}
Gateway: Parallel data-indsamling
# gateway/src/wmbus/listener.py
async def collect_all_meter_data(meter_ids: list[str]) -> list[Reading]:
"""
Anmod om opdaterede aflæsninger fra alle kendte målere parallelt.
Bruges inden daglig MQTT-send for at sikre friske data.
"""
readings: list[Reading] = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(request_meter_reading(meter_id))
for meter_id in meter_ids
]
except* MeterTimeoutError:
log.warning("Nogle målere svarede ikke — bruger cached data")
for task in tasks:
if not task.cancelled() and task.exception() is None:
readings.append(task.result())
return readings
async def request_meter_reading(meter_id: str) -> Reading:
"""Timeout pr. måler: 30 sekunder."""
return await asyncio.wait_for(
_listen_for_meter(meter_id),
timeout=30.0,
)
Server: Parallel Celery-lignende processing
# server/src/workers/parallel_processor.py
async def process_settlement_batch(property_ids: list[uuid.UUID]) -> None:
"""
Generer årsafregninger for flere ejendomme parallelt.
Max 5 samtidige (semaphore) for at beskytte DB.
"""
sem = asyncio.Semaphore(5)
async def process_one(property_id: uuid.UUID) -> None:
async with sem:
await generate_settlement(property_id)
try:
async with asyncio.TaskGroup() as tg:
for pid in property_ids:
tg.create_task(process_one(pid))
except* SettlementError as eg:
failed = [str(e) for e in eg.exceptions]
log.error("Batch settlement fejl", failures=failed)
raise
FastAPI lifespan med TaskGroup
# server/src/main.py
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Start baggrundstjenester
async with asyncio.TaskGroup() as tg:
mqtt_task = tg.create_task(start_mqtt_subscriber())
dmi_task = tg.create_task(prefetch_degree_days())
# Alle startup-tasks afsluttet
yield # App kører
# Shutdown: Afslut tasks pænt
# TaskGroup-tasks er allerede færdige (de var startup-tasks)
# For persistente background tasks — brug create_task + cancel:
background = asyncio.create_task(run_mqtt_loop())
try:
yield # ← Brug kun ét yield i lifespan
finally:
background.cancel()
await asyncio.gather(background, return_exceptions=True)
Timeout + cancellation
# Parallel med individuelle timeouts:
async def fetch_with_timeout(url: str, timeout: float) -> dict:
async with asyncio.timeout(timeout): # Python 3.11 asyncio.timeout
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.json()
async def fetch_multiple_apis() -> list[dict]:
results = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_with_timeout(url, 5.0))
for url in API_ENDPOINTS
]
except* TimeoutError as eg:
log.warning(f"{len(eg.exceptions)} API kald timeout")
return [t.result() for t in tasks
if not t.cancelled() and t.exception() is None]
# asyncio.timeout erstatter asyncio.wait_for() i Python 3.11:
async with asyncio.timeout(10.0):
result = await long_running_operation()
Sammenligning: TaskGroup vs. alternativer
# 1. asyncio.gather — simpel, men begrænset fejlhåndtering:
results = await asyncio.gather(*coros, return_exceptions=True)
# 2. asyncio.TaskGroup — struktureret, anbefalet fra 3.11:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(c) for c in coros]
# 3. anyio.create_task_group — kompatibel med trio og asyncio:
async with anyio.create_task_group() as tg:
for c in coros:
tg.start_soon(c)
# 4. Semaphore + gather — rate-limiting parallel:
sem = asyncio.Semaphore(5)
async def limited(c):
async with sem: return await c
results = await asyncio.gather(*[limited(c) for c in coros])
# Platform valg: TaskGroup + except* til produktionskode
# gather+return_exceptions kun til simple scripts
Konklusion
asyncio.TaskGroup med except* syntax giver struktureret concurrency i Python 3.11+: alle child-tasks afbrydes ved fejl, og ExceptionGroup gør det muligt at håndtere delfejl pr. type. Til IoT-platforme med mange parallelle gateway-kald eller batch-afregninger er TaskGroup + Semaphore standarden. asyncio.timeout() erstatter asyncio.wait_for() og kan nestes med TaskGroup.
Se Python asyncio advanced patterns eller FastAPI lifespan guide.