M-Bus Gateway
← Tilbage til blog
· Python· asyncio· TaskGroup· concurrency· FastAPI· IoT· backend

Python asyncio TaskGroup — struktureret concurrency fra Python 3.11

asyncio.TaskGroup i Python 3.11: struktureret concurrency, fejlhåndtering med ExceptionGroup, vs gather/create_task, IoT gateway parallel-opgaver og FastAPI lifespan integration.

Af M-Bus Gateway

asyncio.TaskGroup (Python 3.11+) erstatter det fejlbehæftede asyncio.gather() med struktureret concurrency. Her er de praktiske mønstre til IoT-platforme.


TaskGroup vs. gather — hvorfor skifte

# GAMMELT: asyncio.gather — problematisk fejlhåndtering
results = await asyncio.gather(
    fetch_gateway_data(gw1),
    fetch_gateway_data(gw2),
    fetch_gateway_data(gw3),
    return_exceptions=True,    # Skjuler fejl som resultater
)
# Problem 1: return_exceptions=True blander fejl og resultater
# Problem 2: Uden return_exceptions afbrydes alle tasks ved første fejl
#            men de øvrige tasks forsætter at køre ukontrolleret

# NY: asyncio.TaskGroup — struktureret
async with asyncio.TaskGroup() as tg:
    t1 = tg.create_task(fetch_gateway_data(gw1))
    t2 = tg.create_task(fetch_gateway_data(gw2))
    t3 = tg.create_task(fetch_gateway_data(gw3))
# Blokerer indtil alle tasks er færdige
# Ved fejl: alle tasks annulleres + ExceptionGroup raises
results = [t1.result(), t2.result(), t3.result()]

ExceptionGroup — håndter delfejl

# IoT: Rapportér data fra 10 gateways parallelt

async def report_all_gateways(gateway_ids: list[str]) -> dict:
    results = {}
    errors = {}

    async with asyncio.TaskGroup() as tg:
        tasks = {
            gw_id: tg.create_task(report_single_gateway(gw_id))
            for gw_id in gateway_ids
        }
    # Kaster ExceptionGroup hvis en eller flere tasks fejler

    # BRUG try/except*:  (Python 3.11 except* syntax)
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = {
                gw_id: tg.create_task(report_single_gateway(gw_id))
                for gw_id in gateway_ids
            }
    except* StripeError as eg:
        for exc in eg.exceptions:
            log.error("Stripe rapportering fejlet", error=str(exc))
    except* MQTTError as eg:
        for exc in eg.exceptions:
            log.warning("MQTT fejl", error=str(exc))

    return {gw_id: t.result() for gw_id, t in tasks.items()
            if not t.cancelled() and t.exception() is None}

Gateway: Parallel data-indsamling

# gateway/src/wmbus/listener.py

async def collect_all_meter_data(meter_ids: list[str]) -> list[Reading]:
    """
    Anmod om opdaterede aflæsninger fra alle kendte målere parallelt.
    Bruges inden daglig MQTT-send for at sikre friske data.
    """
    readings: list[Reading] = []

    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(request_meter_reading(meter_id))
                for meter_id in meter_ids
            ]
    except* MeterTimeoutError:
        log.warning("Nogle målere svarede ikke — bruger cached data")

    for task in tasks:
        if not task.cancelled() and task.exception() is None:
            readings.append(task.result())

    return readings


async def request_meter_reading(meter_id: str) -> Reading:
    """Timeout pr. måler: 30 sekunder."""
    return await asyncio.wait_for(
        _listen_for_meter(meter_id),
        timeout=30.0,
    )

Server: Parallel Celery-lignende processing

# server/src/workers/parallel_processor.py

async def process_settlement_batch(property_ids: list[uuid.UUID]) -> None:
    """
    Generer årsafregninger for flere ejendomme parallelt.
    Max 5 samtidige (semaphore) for at beskytte DB.
    """
    sem = asyncio.Semaphore(5)

    async def process_one(property_id: uuid.UUID) -> None:
        async with sem:
            await generate_settlement(property_id)

    try:
        async with asyncio.TaskGroup() as tg:
            for pid in property_ids:
                tg.create_task(process_one(pid))
    except* SettlementError as eg:
        failed = [str(e) for e in eg.exceptions]
        log.error("Batch settlement fejl", failures=failed)
        raise

FastAPI lifespan med TaskGroup

# server/src/main.py

from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: Start baggrundstjenester
    async with asyncio.TaskGroup() as tg:
        mqtt_task = tg.create_task(start_mqtt_subscriber())
        dmi_task = tg.create_task(prefetch_degree_days())
    # Alle startup-tasks afsluttet

    yield  # App kører

    # Shutdown: Afslut tasks pænt
    # TaskGroup-tasks er allerede færdige (de var startup-tasks)
    # For persistente background tasks — brug create_task + cancel:
    background = asyncio.create_task(run_mqtt_loop())
    try:
        yield  # ← Brug kun ét yield i lifespan
    finally:
        background.cancel()
        await asyncio.gather(background, return_exceptions=True)

Timeout + cancellation

# Parallel med individuelle timeouts:

async def fetch_with_timeout(url: str, timeout: float) -> dict:
    async with asyncio.timeout(timeout):    # Python 3.11 asyncio.timeout
        async with httpx.AsyncClient() as client:
            resp = await client.get(url)
            return resp.json()


async def fetch_multiple_apis() -> list[dict]:
    results = []
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch_with_timeout(url, 5.0))
                for url in API_ENDPOINTS
            ]
    except* TimeoutError as eg:
        log.warning(f"{len(eg.exceptions)} API kald timeout")

    return [t.result() for t in tasks
            if not t.cancelled() and t.exception() is None]


# asyncio.timeout erstatter asyncio.wait_for() i Python 3.11:
async with asyncio.timeout(10.0):
    result = await long_running_operation()

Sammenligning: TaskGroup vs. alternativer

# 1. asyncio.gather — simpel, men begrænset fejlhåndtering:
results = await asyncio.gather(*coros, return_exceptions=True)

# 2. asyncio.TaskGroup — struktureret, anbefalet fra 3.11:
async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(c) for c in coros]

# 3. anyio.create_task_group — kompatibel med trio og asyncio:
async with anyio.create_task_group() as tg:
    for c in coros:
        tg.start_soon(c)

# 4. Semaphore + gather — rate-limiting parallel:
sem = asyncio.Semaphore(5)
async def limited(c): 
    async with sem: return await c
results = await asyncio.gather(*[limited(c) for c in coros])

# Platform valg: TaskGroup + except* til produktionskode
# gather+return_exceptions kun til simple scripts

Konklusion

asyncio.TaskGroup med except* syntax giver struktureret concurrency i Python 3.11+: alle child-tasks afbrydes ved fejl, og ExceptionGroup gør det muligt at håndtere delfejl pr. type. Til IoT-platforme med mange parallelle gateway-kald eller batch-afregninger er TaskGroup + Semaphore standarden. asyncio.timeout() erstatter asyncio.wait_for() og kan nestes med TaskGroup.

Se Python asyncio advanced patterns eller FastAPI lifespan guide.