M-Bus Gateway
← Tilbage til blog
· fastapi· lifespan· startup· shutdown· MQTT· database· Celery· health-check· produktion

FastAPI lifespan — avanceret startup og shutdown til produktion

FastAPI lifespan til produktion: database connection pools, MQTT klient, Celery worker, graceful shutdown med timeout, health checks, dependency injection fra app.state og test med ASGITransport.

Af M-Bus Gateway

FastAPI's lifespan context manager er det eneste korrekte mønster til startup/shutdown i produktion. Her er de avancerede mønstre.


Basal lifespan med app.state

# server/src/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog

log = structlog.get_logger()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Alle ressourcer initialiseres her — én gang ved opstart.
    app.state: FastAPI's dependency injection carrier.
    Yield adskiller startup (øverst) fra shutdown (nedenfor).
    """
    log.info("server_starting")

    # 1. Database connection pool
    from server.src.db.engine import create_engine
    engine = create_engine()
    app.state.engine = engine

    # 2. MQTT subscriber
    from server.src.mqtt.subscriber import MQTTSubscriber
    mqtt = MQTTSubscriber(app.state)
    await mqtt.connect()
    app.state.mqtt = mqtt

    # 3. Verify critical dependencies
    await verify_database_connection(engine)

    log.info("server_started")
    yield    # ← Application er i live her

    # Shutdown (omvendt rækkefølge)
    log.info("server_stopping")
    await mqtt.disconnect()
    await engine.dispose()
    log.info("server_stopped")


app = FastAPI(title="M-Bus Gateway API", version="1.0.0", lifespan=lifespan)

Graceful shutdown med timeout

# server/src/main.py (udvidet)
import asyncio
from contextlib import asynccontextmanager


@asynccontextmanager
async def lifespan(app: FastAPI):
    # ... startup ...
    yield

    # Graceful shutdown med timeout
    log.info("graceful_shutdown_started")

    shutdown_tasks = [
        asyncio.create_task(app.state.mqtt.disconnect(), name="mqtt-disconnect"),
        asyncio.create_task(app.state.engine.dispose(), name="db-dispose"),
    ]

    try:
        # Max 30 sekunder til graceful shutdown
        done, pending = await asyncio.wait(shutdown_tasks, timeout=30.0)

        for task in pending:
            log.warning("shutdown_task_timeout", task=task.get_name())
            task.cancel()

        # Vent på at cancelled tasks afslutter
        if pending:
            await asyncio.gather(*pending, return_exceptions=True)

    except Exception:
        log.exception("shutdown_error")

    log.info("graceful_shutdown_complete")


async def verify_database_connection(engine) -> None:
    """
    Fail-fast: Verificér DB-forbindelse ved opstart.
    Server starter IKKE hvis DB er utilgængelig.
    """
    from sqlalchemy import text
    async with engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    log.info("database_connection_verified")

app.state i dependency injection

# server/src/api/deps.py
from fastapi import Depends, Request
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import AsyncEngine
from typing import AsyncGenerator


async def get_db_session(request: Request) -> AsyncGenerator[AsyncSession, None]:
    """
    Hent database session fra app.state engine.
    Opretter ny session pr. request — frigives automatisk.
    """
    engine: AsyncEngine = request.app.state.engine
    async with AsyncSession(engine) as session:
        yield session


def get_mqtt_client(request: Request):
    """Hent MQTT-klient fra app.state."""
    return request.app.state.mqtt


def get_settings(request: Request):
    """Hent indlæste settings fra app.state."""
    return request.app.state.settings


# Brug i router:
from fastapi import APIRouter
router = APIRouter()


@router.get("/gateways/{gateway_id}/command")
async def send_command(
    gateway_id: str,
    command: str,
    mqtt=Depends(get_mqtt_client),
    db: AsyncSession = Depends(get_db_session),
):
    await mqtt.publish(f"meters/{gateway_id}/cmd", command)
    return {"status": "sent"}

Health check endpoint

# server/src/api/routers/health.py
from fastapi import APIRouter, Request
from sqlalchemy import text
import asyncio

router = APIRouter(prefix="/api/v1", tags=["Health"])


@router.get("/health")
async def health_check(request: Request) -> dict:
    """
    Liveness check: Er serveren oppe?
    Readiness check: Kan serveren behandle requests?
    Kubernetes/Docker: /health returnerer 200 = server OK
    """
    checks = {}

    # Database check
    try:
        async with request.app.state.engine.connect() as conn:
            await asyncio.wait_for(
                conn.execute(text("SELECT 1")),
                timeout=2.0,
            )
        checks["database"] = "ok"
    except Exception as e:
        checks["database"] = f"error: {type(e).__name__}"

    # MQTT check
    try:
        mqtt = request.app.state.mqtt
        checks["mqtt"] = "ok" if mqtt.is_connected() else "disconnected"
    except Exception as e:
        checks["mqtt"] = f"error: {type(e).__name__}"

    # Overall status
    all_ok = all(v == "ok" for v in checks.values())
    status_code = 200 if all_ok else 503

    from fastapi.responses import JSONResponse
    return JSONResponse(
        content={"status": "ok" if all_ok else "degraded", "checks": checks},
        status_code=status_code,
    )

Celery worker via lifespan

# server/src/main.py (Celery integration)
from contextlib import asynccontextmanager
import threading


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start Celery beat i separat thread (kun i produktion)
    from server.src.config import get_settings
    settings = get_settings()

    celery_thread = None
    if settings.is_production:
        from server.src.workers.celery_app import celery_app
        celery_thread = threading.Thread(
            target=lambda: celery_app.Beat(loglevel="info").run(),
            daemon=True,    # Dræbes automatisk ved main-process stop
            name="celery-beat",
        )
        celery_thread.start()
        log.info("celery_beat_started")

    yield

    if celery_thread:
        # Beat thread er daemon — ingen eksplicit join nødvendig
        log.info("celery_beat_stopping")

Test med ASGITransport og lifespan

# server/tests/conftest.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from server.src.main import app, lifespan


@pytest_asyncio.fixture
async def test_client():
    """
    Test-klient der kører den faktiske lifespan.
    ASGITransport: Direkte ASGI-kald uden netværk.
    """
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        yield client


@pytest_asyncio.fixture
async def test_client_with_overrides(override_settings):
    """
    Override dependencies inden lifespan starter.
    Nyttigt til at mock database og MQTT i tests.
    """
    from server.src.api.deps import get_db_session
    from unittest.mock import AsyncMock

    mock_session = AsyncMock()
    app.dependency_overrides[get_db_session] = lambda: mock_session

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        yield client

    app.dependency_overrides.clear()


# Alternativ: Test UDEN lifespan (hurtigere tests):
@pytest_asyncio.fixture
async def fast_test_client():
    """
    Kør lifespan=False: Spring startup/shutdown over.
    Brug dependency_overrides til at levere mock-ressourcer.
    """
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        yield client

Lifespan vs. @app.on_event (deprecated)

# FORKERT — deprecated siden FastAPI 0.93:
# @app.on_event("startup")
# async def startup():
#     app.state.engine = create_engine()
#
# @app.on_event("shutdown")
# async def shutdown():
#     await app.state.engine.dispose()

# KORREKT — lifespan pattern:
# @asynccontextmanager
# async def lifespan(app: FastAPI):
#     app.state.engine = create_engine()
#     yield
#     await app.state.engine.dispose()
#
# app = FastAPI(lifespan=lifespan)

# Fordele ved lifespan:
# 1. Enkelt sted til al startup/shutdown-logik
# 2. Korrekt exception-håndtering (yield = try/finally)
# 3. Test-support med ASGITransport
# 4. Ingen globale variabler nødvendige
# 5. Type-checking virker korrekt

Konklusion

FastAPI lifespan er det eneste korrekte mønster til ressourcestyring i produktion. app.state er dependency injection-carrier. Graceful shutdown med asyncio.wait og timeout forhindrer hang ved deployment. Health check på /api/v1/health giver Kubernetes/Docker readiness-svar. Test med ASGITransport(app=app) kører den faktiske lifespan — ingen mock af hele applikationen nødvendigt.

Se FastAPI dependency injection guide eller FastAPI middleware guide.