· SQLAlchemy· asyncpg· FastAPI· Python· PostgreSQL· async· ORM· TimescaleDB
SQLAlchemy async mønstre — asyncpg og FastAPI produktion
SQLAlchemy 2.0 async mønstre: connection pooling, eager loading, upserts, bulk operations, cursor pagination og common pitfalls med asyncpg og FastAPI.
Af M-Bus Gateway
SQLAlchemy 2.0 async API er fundamentalt anderledes end sync-versionen. Her er produktionsmønstrene fra M-Bus Gateway platformen.
Async engine og session factory
# server/src/db/engine.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
)
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost:5432/mbus",
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Test forbindelser inden brug
pool_recycle=3600, # Genanvend forbindelser hvert 60. min
echo=False, # True for SQL debug-logging
connect_args={
"server_settings": {
"application_name": "mbus-server",
"statement_timeout": "30000", # 30 sekunder max pr. query
}
},
)
AsyncSessionFactory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Undgå lazy-load fejl efter commit
autoflush=False,
)
async def get_session() -> AsyncSession:
async with AsyncSessionFactory() as session:
yield session
Eager loading (undgå N+1)
# SQLAlchemy 2.0: selectin_load vs. joined_load
from sqlalchemy.orm import selectinload, joinedload
from sqlmodel import select
# ❌ N+1 problem — 1 query pr. ejendom til units:
stmt = select(Property).where(Property.tenant_id == tenant_id)
properties = (await session.execute(stmt)).scalars().all()
for p in properties:
units = p.units # Trigger N lazy loads!
# ✅ selectin_load — 2 queries total:
stmt = (
select(Property)
.where(Property.tenant_id == tenant_id)
.options(selectinload(Property.units))
)
properties = (await session.execute(stmt)).scalars().all()
# p.units er nu tilgængeligt uden yderligere queries
# ✅ joined_load — 1 query med JOIN (godt ved lav kardinalitet):
stmt = (
select(Property)
.options(joinedload(Property.config)) # 1-1 relation
.where(Property.tenant_id == tenant_id)
)
Upserts med ON CONFLICT
# server/src/mqtt/subscriber.py
from sqlalchemy.dialects.postgresql import insert
async def upsert_reading(
session: AsyncSession,
reading: dict,
) -> None:
"""
Idempotent upsert: Indsæt eller opdatér ved duplikat (meter_id + timestamp).
Brugt ved MQTT-payloads der kan gentransmitteres.
"""
stmt = insert(Reading).values(
meter_installation_id=reading["installation_id"],
timestamp=reading["timestamp"],
value=reading["value"],
rssi_dbm=reading.get("rssi_dbm"),
battery_level_pct=reading.get("battery_level_pct"),
received_at=datetime.now(timezone.utc),
)
# ON CONFLICT: Opdatér kun nyere data
stmt = stmt.on_conflict_do_update(
constraint="uq_reading_installation_timestamp",
set_={
"value": stmt.excluded.value,
"rssi_dbm": stmt.excluded.rssi_dbm,
"received_at": stmt.excluded.received_at,
},
)
await session.execute(stmt)
Bulk insert med COPY
# server/src/db/bulk.py
import asyncpg
async def bulk_insert_readings(
conn: asyncpg.Connection,
readings: list[dict],
) -> int:
"""
asyncpg COPY til bulk insert — 10-50× hurtigere end INSERT pr. række.
Bruges ved daglig MQTT-payload med 50-200 aflæsninger.
"""
rows = [
(
r["meter_installation_id"],
r["timestamp"],
r["value"],
r.get("rssi_dbm"),
r.get("battery_level_pct"),
datetime.now(timezone.utc),
)
for r in readings
]
count = await conn.copy_records_to_table(
"reading",
records=rows,
columns=[
"meter_installation_id",
"timestamp",
"value",
"rssi_dbm",
"battery_level_pct",
"received_at",
],
)
return count
# Brug fra FastAPI med asyncpg direkte:
async def process_gateway_payload(pool: asyncpg.Pool, readings: list) -> None:
async with pool.acquire() as conn:
async with conn.transaction():
count = await bulk_insert_readings(conn, readings)
Cursor-baseret paginering
# server/src/readings/router.py
from datetime import datetime
from typing import Optional
@router.get("/readings")
async def list_readings(
installation_id: UUID,
before: Optional[datetime] = None, # Cursor (exclusive)
limit: int = 100,
user: TokenPayload = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""
Cursor-baseret paginering — stabil ved concurrent inserts.
Bruger timestamp som cursor (unikt nok for readings).
"""
stmt = (
select(Reading)
.where(Reading.meter_installation_id == installation_id)
.order_by(Reading.timestamp.desc())
.limit(limit + 1) # Hent én ekstra for at detektere næste side
)
if before:
stmt = stmt.where(Reading.timestamp < before)
result = (await session.execute(stmt)).scalars().all()
has_next = len(result) > limit
items = result[:limit]
return {
"items": [r.model_dump() for r in items],
"next_cursor": items[-1].timestamp.isoformat() if has_next else None,
"has_next": has_next,
}
Subquery og scalar subquery
# Dashboard: Seneste aflæsning pr. installation
from sqlalchemy import func, select as sa_select
latest_reading_subq = (
sa_select(func.max(Reading.timestamp))
.where(Reading.meter_installation_id == MeterInstallation.id)
.correlate(MeterInstallation)
.scalar_subquery()
)
stmt = (
select(
MeterInstallation.id,
MeterInstallation.unit_id,
latest_reading_subq.label("last_reading_at"),
)
.where(MeterInstallation.tenant_id == tenant_id)
.where(MeterInstallation.deleted_at.is_(None))
.order_by(latest_reading_subq.asc().nullsfirst())
)
# Filtrér stale installationer (ingen reading > 48t):
stmt = stmt.having(
latest_reading_subq < datetime.now(timezone.utc) - timedelta(hours=48)
)
Common pitfalls
# ❌ Pitfall 1: Brug session efter commit
async with session.begin():
obj = await session.get(Property, prop_id)
await session.commit()
print(obj.name) # DetachedInstanceError! expire_on_commit=True
# ✅ Fix: expire_on_commit=False i sessionmaker (se ovenfor)
# ELLER: tilgå attributter inden commit
# ❌ Pitfall 2: Lazy loading i async context
stmt = select(Property)
prop = (await session.execute(stmt)).scalar_one()
units = prop.units # MissingGreenlet error! Lazy load ikke muligt
# ✅ Fix: Brug selectinload eksplicit
# ❌ Pitfall 3: session.execute() uden await
result = session.execute(stmt) # Returner coroutine, ikke result!
# ✅ Fix: Altid await
result = await session.execute(stmt)
# ❌ Pitfall 4: Brug add_all med allerede-persisterede objekter
# ved bulk update — bruger individuelle UPDATE statements
session.add_all(objects_to_update) # N UPDATE statements
# ✅ Fix: Brug bulk_update_mappings for store batches
await session.execute(
update(Reading).where(Reading.id == bindparam("_id")),
[{"_id": r.id, "value": r.value} for r in readings],
)
TimescaleDB hypertable specifics
# SQLModel + TimescaleDB: Definer reading som hypertable
class Reading(SQLModel, table=True):
__tablename__ = "reading"
__table_args__ = (
# Primærnøgle inkluderer tid (krav til TimescaleDB hypertable)
PrimaryKeyConstraint("meter_installation_id", "timestamp"),
# Unik constraint til ON CONFLICT upsert
UniqueConstraint(
"meter_installation_id",
"timestamp",
name="uq_reading_installation_timestamp",
),
)
meter_installation_id: UUID = Field(foreign_key="meter_installation.id")
timestamp: datetime = Field(sa_column=Column(
DateTime(timezone=True),
nullable=False,
))
value: float
rssi_dbm: float | None = None
battery_level_pct: float | None = None
received_at: datetime
# NB: Reading har IKKE tenant_id direkte (arves via meter_installation)
# NB: Reading har IKKE deleted_at (append-only tidsseriedata)
Konklusion
SQLAlchemy 2.0 async kræver eksplicitte await overalt, eager loading via selectinload/joinedload for at undgå N+1, og expire_on_commit=False for at tilgå attributter efter commit. ON CONFLICT DO UPDATE upserts sikrer idempotens ved MQTT-retransmission. asyncpg COPY giver 10-50× bedre bulk insert performance. TimescaleDB hypertables kræver at primærnøglen inkluderer timestamp-kolonnen.