FastAPI og SQLAlchemy async — avancerede mønstre
FastAPI og SQLAlchemy async: cursor-baseret paginering, upsert med ON CONFLICT, advisory locks, COPY bulk-insert, materialized views og connection pool-tuning til IoT-platform.
Af M-Bus Gateway
SQLAlchemy async til PostgreSQL IoT-platforme kræver specifikke mønstre for performance og korrekthed. Her er de produktionsklare patterns.
Async session factory med lifespan
# server/src/db/session.py
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from contextlib import asynccontextmanager
engine = create_async_engine(
settings.database_url,
pool_size=20, # Baseline pool
max_overflow=40, # Burst-kapacitet
pool_timeout=30, # Vent max 30s på ledig connection
pool_pre_ping=True, # Test connection inden brug
pool_recycle=3600, # Genbrug connections max 1 time
echo=settings.debug,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Undgå lazy load fejl efter commit
autoflush=False, # Eksplicit flush-kontrol
)
@asynccontextmanager
async def get_async_session() -> AsyncSession:
"""Context manager til programmatisk brug (ikke FastAPI Depends)."""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_session() -> AsyncSession:
"""FastAPI Depends-funktion."""
async with AsyncSessionLocal() as session:
yield session
Cursor-baseret paginering (keyset)
# server/src/readings/repository.py
# Keyset pagination — stabil og hurtig ved store datasæt
from sqlalchemy import select, and_, tuple_
import base64
import json
def encode_cursor(timestamp: datetime, id: uuid.UUID) -> str:
"""Opak cursor — skjuler implementation detaljer."""
data = {"ts": timestamp.isoformat(), "id": str(id)}
return base64.b64encode(json.dumps(data).encode()).decode()
def decode_cursor(cursor: str) -> tuple[datetime, uuid.UUID]:
data = json.loads(base64.b64decode(cursor).decode())
return datetime.fromisoformat(data["ts"]), uuid.UUID(data["id"])
async def list_readings_cursor(
session: AsyncSession,
tenant_id: uuid.UUID,
meter_installation_id: uuid.UUID,
cursor: str | None = None,
limit: int = 100,
) -> tuple[list[Reading], str | None]:
"""
Keyset pagination med komposit cursor (timestamp, id).
Stabil selv ved inserts — ingen offset-skew.
"""
stmt = (
select(Reading)
.where(
Reading.tenant_id == tenant_id,
Reading.meter_installation_id == meter_installation_id,
Reading.deleted_at.is_(None),
)
.order_by(Reading.timestamp.desc(), Reading.id.desc())
.limit(limit + 1) # Hent én ekstra for next_cursor detection
)
if cursor:
ts, rid = decode_cursor(cursor)
stmt = stmt.where(
tuple_(Reading.timestamp, Reading.id) < tuple_(ts, rid)
)
rows = (await session.execute(stmt)).scalars().all()
if len(rows) > limit:
last = rows[limit - 1]
next_cursor = encode_cursor(last.timestamp, last.id)
return rows[:limit], next_cursor
return rows, None
Upsert med ON CONFLICT
# server/src/mqtt/subscriber.py
# MQTT gateway-payload: Upsert aflæsninger (idempotent ved gensendelse)
from sqlalchemy.dialects.postgresql import insert
async def upsert_readings(
session: AsyncSession,
readings: list[dict],
gateway_id: str,
) -> int:
"""
Upsert aflæsninger — idempotent ved MQTT-gensendelse.
ON CONFLICT på (meter_installation_id, timestamp).
"""
if not readings:
return 0
stmt = insert(Reading).values(readings)
stmt = stmt.on_conflict_do_update(
index_elements=["meter_installation_id", "timestamp"],
set_={
"value": stmt.excluded.value,
"received_at": stmt.excluded.received_at,
"gateway_id": stmt.excluded.gateway_id,
}
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
# Gateway-duplicate håndtering:
# → Samme gateway sender samme dag-payload (MQTT QoS 1)
# → ON CONFLICT opdaterer value + received_at
# → Ingen dubletter i readings-tabel
COPY bulk-insert til historisk import
# server/src/readings/bulk_import.py
# Brug PostgreSQL COPY for max throughput ved historisk dataimport
from asyncpg import Connection
import csv
import io
async def bulk_import_readings(
conn: Connection,
readings: list[dict],
tenant_id: uuid.UUID,
) -> int:
"""
PostgreSQL COPY FROM — 100x hurtigere end INSERT ved store datasæt.
Bruges til historisk datamigrering fra Excel/Techem.
"""
buffer = io.StringIO()
writer = csv.writer(buffer)
for r in readings:
writer.writerow([
str(uuid.uuid4()), # id
str(tenant_id), # tenant_id
str(r["meter_installation_id"]),
r["timestamp"].isoformat(),
r["value"],
r.get("unit", "kWh"),
datetime.utcnow().isoformat(), # received_at
])
buffer.seek(0)
result = await conn.copy_to_table(
"readings",
source=buffer,
columns=["id", "tenant_id", "meter_installation_id",
"timestamp", "value", "unit", "received_at"],
format="csv",
)
return result
# Benchmark:
# INSERT INTO (1.000 rows): ~500ms
# COPY FROM (1.000 rows): ~20ms (25x hurtigere)
# COPY FROM (100.000 rows): ~500ms
Advisory locks til concurrency kontrol
# server/src/settlements/service.py
# Advisory lock: Forhindrer dobbelt settlement-generering
from sqlalchemy import text
async def generate_settlement_exclusive(
session: AsyncSession,
property_id: uuid.UUID,
period: SettlementPeriod,
) -> Settlement:
"""
Brug PostgreSQL advisory lock til at forhindre race condition.
Kun én settlement kan genereres pr. ejendom ad gangen.
"""
lock_key = hash(str(property_id)) % (2**31)
# Forsøg at tage advisory lock (non-blocking):
acquired = await session.scalar(
text("SELECT pg_try_advisory_xact_lock(:key)"),
{"key": lock_key},
)
if not acquired:
raise ConflictError(
"En afregning genereres allerede for denne ejendom. Prøv igen."
)
# Lock holdes til transaktionen afslutter:
existing = await _check_existing_settlement(session, property_id, period)
if existing:
raise ConflictError("Afregning for denne periode eksisterer allerede")
return await _do_generate_settlement(session, property_id, period)
Connection pool tuning til IoT
# Optimal pool-konfiguration for IoT-platform:
# Workload profil:
# - 24/7 gateway-data: ~100 inserts/sek (peak morgen 06:00 UTC)
# - Få samtidige HTTP-brugere (SaaS — ikke consumer-app)
# - Tunge queries: settlement-beregning, portfolio-analyse
engine = create_async_engine(
settings.database_url,
# Pool-størrelse:
# pool_size = baseline concurrent connections
# Sæt til: antal workers × expected DB-time ratio
pool_size=20,
# max_overflow: Ekstra connections ved burst
# Gateway-burst kl. 06:00: 100 gateways sender samtidigt
max_overflow=80, # Total max: 100 connections
# pool_timeout: Hvad sker der ved alle connections optaget?
pool_timeout=5.0, # Kast TimeoutError efter 5s (ikke uendeligt vent)
# pool_pre_ping: Test forbindelsen inden brug
pool_pre_ping=True, # Fanger dropped connections (network hickup)
# pool_recycle: Genopret connections periodisk
pool_recycle=1800, # 30 minutter (forhindrer stale connections)
# Execution options:
execution_options={
"isolation_level": "READ COMMITTED", # Default er fint
},
# Statement timeout (forhindrer rogue queries):
connect_args={
"server_settings": {
"statement_timeout": "30000", # 30 sekunder
"idle_in_transaction_session_timeout": "60000", # 60 sekunder
}
},
)
Konklusion
SQLAlchemy async til IoT-platforme kræver fire kritiske mønstre: expire_on_commit=False for lazy load-fri sessions, keyset pagination med opak cursor til stabile sider ved concurrent inserts, ON CONFLICT DO UPDATE upsert til idempotent MQTT-payload-håndtering, og COPY FROM til 25x hurtigere bulk-import. Advisory locks forhindrer race conditions ved settlement-generering. Pool-størrelse dimensioneres til gateway-burst (kl. 06:00 UTC) med pool_timeout=5s der kaster fejl i stedet for at blokere.