· FastAPI· Python· lifespan· startup· shutdown· async· database· backend
FastAPI lifespan — startup og shutdown events korrekt
FastAPI lifespan context manager: database pool oprettelse, MQTT-forbindelser, Celery-workers, cleanup ved shutdown og korrekt async resource management.
Af M-Bus Gateway
FastAPI's lifespan context manager er den anbefalede måde at håndtere startup og shutdown af ressourcer. Her er mønstrene fra produktion.
Lifespan context manager
# server/src/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog
log = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Alt inden yield: startup.
Alt efter yield: shutdown (kører selv ved exception).
"""
log.info("Starting M-Bus Gateway API")
# 1. Database connection pool
from server.src.db import init_db, close_db
await init_db()
log.info("Database pool initialized")
# 2. MQTT subscriber (baggrundstask)
from server.src.mqtt.subscriber import MQTTSubscriber
mqtt = MQTTSubscriber()
await mqtt.connect()
log.info("MQTT subscriber connected")
# 3. Redis connection pool
from server.src.cache import init_cache, close_cache
await init_cache()
log.info("Redis cache initialized")
# Applikationen kører her
yield
# Shutdown (kører altid — selv ved SIGTERM)
log.info("Shutting down M-Bus Gateway API")
await mqtt.disconnect()
await close_cache()
await close_db()
log.info("Shutdown complete")
app = FastAPI(
title="M-Bus Gateway API",
version="1.0.0",
lifespan=lifespan,
)
Database pool med SQLAlchemy
# server/src/db.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from server.src.config import get_settings
_engine = None
_session_factory = None
async def init_db() -> None:
global _engine, _session_factory
settings = get_settings()
_engine = create_async_engine(
settings.database_url_str,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Test forbindelser ved checkout
pool_recycle=3600, # Genbrug forbindelser hvert 60. min
echo=settings.debug, # SQL logging i dev
)
_session_factory = async_sessionmaker(
_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def close_db() -> None:
if _engine:
await _engine.dispose()
async def get_session() -> AsyncSession:
"""FastAPI dependency — session pr. request."""
async with _session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
MQTT subscriber som baggrundstask
# server/src/mqtt/subscriber.py
import asyncio
import aiomqtt
import structlog
from server.src.config import get_settings
log = structlog.get_logger()
class MQTTSubscriber:
def __init__(self):
self._client = None
self._task = None
async def connect(self) -> None:
settings = get_settings()
self._client = aiomqtt.Client(
hostname=settings.mqtt_host,
port=settings.mqtt_port,
keepalive=60,
)
await self._client.__aenter__()
await self._client.subscribe("meters/+/data")
await self._client.subscribe("meters/+/status")
await self._client.subscribe("meters/+/alarm")
# Start lyttende baggrundstask
self._task = asyncio.create_task(self._listen())
log.info("MQTT subscriber started")
async def disconnect(self) -> None:
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
if self._client:
await self._client.__aexit__(None, None, None)
log.info("MQTT subscriber disconnected")
async def _listen(self) -> None:
try:
async for message in self._client.messages:
await self._handle_message(message)
except asyncio.CancelledError:
pass
except Exception as exc:
log.error("MQTT listener crashed", exc=str(exc))
async def _handle_message(self, message: aiomqtt.Message) -> None:
from server.src.mqtt.handlers import dispatch_message
try:
await dispatch_message(str(message.topic), message.payload)
except Exception as exc:
log.error("Message handling failed", topic=str(message.topic), exc=str(exc))
State i app.state
# Gem ressourcer i app.state for adgang i andre moduler:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialiser og gem i app.state
mqtt = MQTTSubscriber()
await mqtt.connect()
app.state.mqtt = mqtt
from server.src.notifications import NotificationService
app.state.notifications = NotificationService()
yield
await app.state.mqtt.disconnect()
# I router — adgang via Request:
from fastapi import Request
@router.post("/test-alarm")
async def test_alarm(request: Request):
mqtt: MQTTSubscriber = request.app.state.mqtt
await mqtt.publish("meters/GW-0001/alarm", b'{"type":"test"}')
return {"status": "alarm sent"}
Test med lifespan
# server/tests/conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from server.src.main import app
@pytest.fixture
async def client():
"""AsyncClient der kører lifespan (startup/shutdown)."""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
yield ac
# lifespan shutdown kører automatisk ved exit
@pytest.fixture
async def client_no_lifespan():
"""AsyncClient UDEN lifespan — hurtigere til unit tests."""
from fastapi import FastAPI
test_app = FastAPI()
# Registrér kun routes der skal testes
test_app.include_router(your_router)
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
) as ac:
yield ac
Graceful shutdown med SIGTERM
# For Kubernetes og Docker: Håndtér SIGTERM korrekt
import signal
import asyncio
def setup_signal_handlers(app: FastAPI) -> None:
"""Registrér SIGTERM handler til graceful shutdown."""
loop = asyncio.get_event_loop()
def handle_sigterm(*args):
log.info("SIGTERM received — initiating graceful shutdown")
loop.create_task(shutdown(app))
signal.signal(signal.SIGTERM, handle_sigterm)
async def shutdown(app: FastAPI) -> None:
"""Graceful shutdown: Vent på igangværende requests."""
from server.src.main import lifespan
# FastAPI lifespan håndterer cleanup automatisk
# Tilføj her: Vent på at Celery tasks fuldføres, luk MQTT, osv.
await asyncio.sleep(5) # Grace period for igangværende requests
# docker-compose.yml:
# stop_grace_period: "30s" # Giv containeren 30s til graceful shutdown
# stop_signal: SIGTERM
Konklusion
FastAPI's lifespan context manager sikrer at ressourcer (database pools, MQTT connections, Redis) oprettes ved startup og lukkes korrekt ved shutdown — uanset om shutdown skyldes normal stop eller exception. app.state giver adgang til ressourcerne overalt. Brug AsyncClient med ASGITransport i tests for at køre lifespan i testmiljøet.
Se FastAPI SQLModel multi-tenant guide eller FastAPI dependency injection guide.