FastAPI lifespan — avanceret startup og shutdown til produktion
FastAPI lifespan til produktion: database connection pools, MQTT klient, Celery worker, graceful shutdown med timeout, health checks, dependency injection fra app.state og test med ASGITransport.
Af M-Bus Gateway
FastAPI's lifespan context manager er det eneste korrekte mønster til startup/shutdown i produktion. Her er de avancerede mønstre.
Basal lifespan med app.state
# server/src/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog
log = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Alle ressourcer initialiseres her — én gang ved opstart.
app.state: FastAPI's dependency injection carrier.
Yield adskiller startup (øverst) fra shutdown (nedenfor).
"""
log.info("server_starting")
# 1. Database connection pool
from server.src.db.engine import create_engine
engine = create_engine()
app.state.engine = engine
# 2. MQTT subscriber
from server.src.mqtt.subscriber import MQTTSubscriber
mqtt = MQTTSubscriber(app.state)
await mqtt.connect()
app.state.mqtt = mqtt
# 3. Verify critical dependencies
await verify_database_connection(engine)
log.info("server_started")
yield # ← Application er i live her
# Shutdown (omvendt rækkefølge)
log.info("server_stopping")
await mqtt.disconnect()
await engine.dispose()
log.info("server_stopped")
app = FastAPI(title="M-Bus Gateway API", version="1.0.0", lifespan=lifespan)
Graceful shutdown med timeout
# server/src/main.py (udvidet)
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# ... startup ...
yield
# Graceful shutdown med timeout
log.info("graceful_shutdown_started")
shutdown_tasks = [
asyncio.create_task(app.state.mqtt.disconnect(), name="mqtt-disconnect"),
asyncio.create_task(app.state.engine.dispose(), name="db-dispose"),
]
try:
# Max 30 sekunder til graceful shutdown
done, pending = await asyncio.wait(shutdown_tasks, timeout=30.0)
for task in pending:
log.warning("shutdown_task_timeout", task=task.get_name())
task.cancel()
# Vent på at cancelled tasks afslutter
if pending:
await asyncio.gather(*pending, return_exceptions=True)
except Exception:
log.exception("shutdown_error")
log.info("graceful_shutdown_complete")
async def verify_database_connection(engine) -> None:
"""
Fail-fast: Verificér DB-forbindelse ved opstart.
Server starter IKKE hvis DB er utilgængelig.
"""
from sqlalchemy import text
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
log.info("database_connection_verified")
app.state i dependency injection
# server/src/api/deps.py
from fastapi import Depends, Request
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import AsyncEngine
from typing import AsyncGenerator
async def get_db_session(request: Request) -> AsyncGenerator[AsyncSession, None]:
"""
Hent database session fra app.state engine.
Opretter ny session pr. request — frigives automatisk.
"""
engine: AsyncEngine = request.app.state.engine
async with AsyncSession(engine) as session:
yield session
def get_mqtt_client(request: Request):
"""Hent MQTT-klient fra app.state."""
return request.app.state.mqtt
def get_settings(request: Request):
"""Hent indlæste settings fra app.state."""
return request.app.state.settings
# Brug i router:
from fastapi import APIRouter
router = APIRouter()
@router.get("/gateways/{gateway_id}/command")
async def send_command(
gateway_id: str,
command: str,
mqtt=Depends(get_mqtt_client),
db: AsyncSession = Depends(get_db_session),
):
await mqtt.publish(f"meters/{gateway_id}/cmd", command)
return {"status": "sent"}
Health check endpoint
# server/src/api/routers/health.py
from fastapi import APIRouter, Request
from sqlalchemy import text
import asyncio
router = APIRouter(prefix="/api/v1", tags=["Health"])
@router.get("/health")
async def health_check(request: Request) -> dict:
"""
Liveness check: Er serveren oppe?
Readiness check: Kan serveren behandle requests?
Kubernetes/Docker: /health returnerer 200 = server OK
"""
checks = {}
# Database check
try:
async with request.app.state.engine.connect() as conn:
await asyncio.wait_for(
conn.execute(text("SELECT 1")),
timeout=2.0,
)
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {type(e).__name__}"
# MQTT check
try:
mqtt = request.app.state.mqtt
checks["mqtt"] = "ok" if mqtt.is_connected() else "disconnected"
except Exception as e:
checks["mqtt"] = f"error: {type(e).__name__}"
# Overall status
all_ok = all(v == "ok" for v in checks.values())
status_code = 200 if all_ok else 503
from fastapi.responses import JSONResponse
return JSONResponse(
content={"status": "ok" if all_ok else "degraded", "checks": checks},
status_code=status_code,
)
Celery worker via lifespan
# server/src/main.py (Celery integration)
from contextlib import asynccontextmanager
import threading
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start Celery beat i separat thread (kun i produktion)
from server.src.config import get_settings
settings = get_settings()
celery_thread = None
if settings.is_production:
from server.src.workers.celery_app import celery_app
celery_thread = threading.Thread(
target=lambda: celery_app.Beat(loglevel="info").run(),
daemon=True, # Dræbes automatisk ved main-process stop
name="celery-beat",
)
celery_thread.start()
log.info("celery_beat_started")
yield
if celery_thread:
# Beat thread er daemon — ingen eksplicit join nødvendig
log.info("celery_beat_stopping")
Test med ASGITransport og lifespan
# server/tests/conftest.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from server.src.main import app, lifespan
@pytest_asyncio.fixture
async def test_client():
"""
Test-klient der kører den faktiske lifespan.
ASGITransport: Direkte ASGI-kald uden netværk.
"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
@pytest_asyncio.fixture
async def test_client_with_overrides(override_settings):
"""
Override dependencies inden lifespan starter.
Nyttigt til at mock database og MQTT i tests.
"""
from server.src.api.deps import get_db_session
from unittest.mock import AsyncMock
mock_session = AsyncMock()
app.dependency_overrides[get_db_session] = lambda: mock_session
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
app.dependency_overrides.clear()
# Alternativ: Test UDEN lifespan (hurtigere tests):
@pytest_asyncio.fixture
async def fast_test_client():
"""
Kør lifespan=False: Spring startup/shutdown over.
Brug dependency_overrides til at levere mock-ressourcer.
"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
Lifespan vs. @app.on_event (deprecated)
# FORKERT — deprecated siden FastAPI 0.93:
# @app.on_event("startup")
# async def startup():
# app.state.engine = create_engine()
#
# @app.on_event("shutdown")
# async def shutdown():
# await app.state.engine.dispose()
# KORREKT — lifespan pattern:
# @asynccontextmanager
# async def lifespan(app: FastAPI):
# app.state.engine = create_engine()
# yield
# await app.state.engine.dispose()
#
# app = FastAPI(lifespan=lifespan)
# Fordele ved lifespan:
# 1. Enkelt sted til al startup/shutdown-logik
# 2. Korrekt exception-håndtering (yield = try/finally)
# 3. Test-support med ASGITransport
# 4. Ingen globale variabler nødvendige
# 5. Type-checking virker korrekt
Konklusion
FastAPI lifespan er det eneste korrekte mønster til ressourcestyring i produktion. app.state er dependency injection-carrier. Graceful shutdown med asyncio.wait og timeout forhindrer hang ved deployment. Health check på /api/v1/health giver Kubernetes/Docker readiness-svar. Test med ASGITransport(app=app) kører den faktiske lifespan — ingen mock af hele applikationen nødvendigt.
Se FastAPI dependency injection guide eller FastAPI middleware guide.