M-Bus Gateway
← Tilbage til blog
· FastAPI· Python· lifespan· startup· shutdown· async· database· backend

FastAPI lifespan — startup og shutdown events korrekt

FastAPI lifespan context manager: database pool oprettelse, MQTT-forbindelser, Celery-workers, cleanup ved shutdown og korrekt async resource management.

Af M-Bus Gateway

FastAPI's lifespan context manager er den anbefalede måde at håndtere startup og shutdown af ressourcer. Her er mønstrene fra produktion.


Lifespan context manager

# server/src/main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog

log = structlog.get_logger()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Alt inden yield: startup.
    Alt efter yield: shutdown (kører selv ved exception).
    """
    log.info("Starting M-Bus Gateway API")

    # 1. Database connection pool
    from server.src.db import init_db, close_db
    await init_db()
    log.info("Database pool initialized")

    # 2. MQTT subscriber (baggrundstask)
    from server.src.mqtt.subscriber import MQTTSubscriber
    mqtt = MQTTSubscriber()
    await mqtt.connect()
    log.info("MQTT subscriber connected")

    # 3. Redis connection pool
    from server.src.cache import init_cache, close_cache
    await init_cache()
    log.info("Redis cache initialized")

    # Applikationen kører her
    yield

    # Shutdown (kører altid — selv ved SIGTERM)
    log.info("Shutting down M-Bus Gateway API")
    await mqtt.disconnect()
    await close_cache()
    await close_db()
    log.info("Shutdown complete")


app = FastAPI(
    title="M-Bus Gateway API",
    version="1.0.0",
    lifespan=lifespan,
)

Database pool med SQLAlchemy

# server/src/db.py

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from server.src.config import get_settings

_engine = None
_session_factory = None


async def init_db() -> None:
    global _engine, _session_factory

    settings = get_settings()
    _engine = create_async_engine(
        settings.database_url_str,
        pool_size=10,
        max_overflow=20,
        pool_pre_ping=True,      # Test forbindelser ved checkout
        pool_recycle=3600,       # Genbrug forbindelser hvert 60. min
        echo=settings.debug,     # SQL logging i dev
    )
    _session_factory = async_sessionmaker(
        _engine,
        class_=AsyncSession,
        expire_on_commit=False,
    )


async def close_db() -> None:
    if _engine:
        await _engine.dispose()


async def get_session() -> AsyncSession:
    """FastAPI dependency — session pr. request."""
    async with _session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

MQTT subscriber som baggrundstask

# server/src/mqtt/subscriber.py

import asyncio
import aiomqtt
import structlog
from server.src.config import get_settings

log = structlog.get_logger()


class MQTTSubscriber:
    def __init__(self):
        self._client = None
        self._task = None

    async def connect(self) -> None:
        settings = get_settings()
        self._client = aiomqtt.Client(
            hostname=settings.mqtt_host,
            port=settings.mqtt_port,
            keepalive=60,
        )
        await self._client.__aenter__()
        await self._client.subscribe("meters/+/data")
        await self._client.subscribe("meters/+/status")
        await self._client.subscribe("meters/+/alarm")

        # Start lyttende baggrundstask
        self._task = asyncio.create_task(self._listen())
        log.info("MQTT subscriber started")

    async def disconnect(self) -> None:
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
        if self._client:
            await self._client.__aexit__(None, None, None)
        log.info("MQTT subscriber disconnected")

    async def _listen(self) -> None:
        try:
            async for message in self._client.messages:
                await self._handle_message(message)
        except asyncio.CancelledError:
            pass
        except Exception as exc:
            log.error("MQTT listener crashed", exc=str(exc))

    async def _handle_message(self, message: aiomqtt.Message) -> None:
        from server.src.mqtt.handlers import dispatch_message
        try:
            await dispatch_message(str(message.topic), message.payload)
        except Exception as exc:
            log.error("Message handling failed", topic=str(message.topic), exc=str(exc))

State i app.state

# Gem ressourcer i app.state for adgang i andre moduler:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialiser og gem i app.state
    mqtt = MQTTSubscriber()
    await mqtt.connect()
    app.state.mqtt = mqtt

    from server.src.notifications import NotificationService
    app.state.notifications = NotificationService()

    yield

    await app.state.mqtt.disconnect()


# I router — adgang via Request:
from fastapi import Request

@router.post("/test-alarm")
async def test_alarm(request: Request):
    mqtt: MQTTSubscriber = request.app.state.mqtt
    await mqtt.publish("meters/GW-0001/alarm", b'{"type":"test"}')
    return {"status": "alarm sent"}

Test med lifespan

# server/tests/conftest.py

import pytest
from httpx import AsyncClient, ASGITransport
from server.src.main import app


@pytest.fixture
async def client():
    """AsyncClient der kører lifespan (startup/shutdown)."""
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as ac:
        yield ac
    # lifespan shutdown kører automatisk ved exit


@pytest.fixture
async def client_no_lifespan():
    """AsyncClient UDEN lifespan — hurtigere til unit tests."""
    from fastapi import FastAPI
    test_app = FastAPI()
    # Registrér kun routes der skal testes
    test_app.include_router(your_router)

    async with AsyncClient(
        transport=ASGITransport(app=test_app),
        base_url="http://test",
    ) as ac:
        yield ac

Graceful shutdown med SIGTERM

# For Kubernetes og Docker: Håndtér SIGTERM korrekt

import signal
import asyncio

def setup_signal_handlers(app: FastAPI) -> None:
    """Registrér SIGTERM handler til graceful shutdown."""
    loop = asyncio.get_event_loop()

    def handle_sigterm(*args):
        log.info("SIGTERM received — initiating graceful shutdown")
        loop.create_task(shutdown(app))

    signal.signal(signal.SIGTERM, handle_sigterm)


async def shutdown(app: FastAPI) -> None:
    """Graceful shutdown: Vent på igangværende requests."""
    from server.src.main import lifespan
    # FastAPI lifespan håndterer cleanup automatisk
    # Tilføj her: Vent på at Celery tasks fuldføres, luk MQTT, osv.
    await asyncio.sleep(5)  # Grace period for igangværende requests


# docker-compose.yml:
# stop_grace_period: "30s"  # Giv containeren 30s til graceful shutdown
# stop_signal: SIGTERM

Konklusion

FastAPI's lifespan context manager sikrer at ressourcer (database pools, MQTT connections, Redis) oprettes ved startup og lukkes korrekt ved shutdown — uanset om shutdown skyldes normal stop eller exception. app.state giver adgang til ressourcerne overalt. Brug AsyncClient med ASGITransport i tests for at køre lifespan i testmiljøet.

Se FastAPI SQLModel multi-tenant guide eller FastAPI dependency injection guide.