M-Bus Gateway
← Tilbage til blog
· SQLAlchemy· asyncpg· FastAPI· Python· PostgreSQL· async· ORM· TimescaleDB

SQLAlchemy async mønstre — asyncpg og FastAPI produktion

SQLAlchemy 2.0 async mønstre: connection pooling, eager loading, upserts, bulk operations, cursor pagination og common pitfalls med asyncpg og FastAPI.

Af M-Bus Gateway

SQLAlchemy 2.0 async API er fundamentalt anderledes end sync-versionen. Her er produktionsmønstrene fra M-Bus Gateway platformen.


Async engine og session factory

# server/src/db/engine.py

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker,
)

engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost:5432/mbus",
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,       # Test forbindelser inden brug
    pool_recycle=3600,        # Genanvend forbindelser hvert 60. min
    echo=False,               # True for SQL debug-logging
    connect_args={
        "server_settings": {
            "application_name": "mbus-server",
            "statement_timeout": "30000",  # 30 sekunder max pr. query
        }
    },
)

AsyncSessionFactory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,   # Undgå lazy-load fejl efter commit
    autoflush=False,
)

async def get_session() -> AsyncSession:
    async with AsyncSessionFactory() as session:
        yield session

Eager loading (undgå N+1)

# SQLAlchemy 2.0: selectin_load vs. joined_load

from sqlalchemy.orm import selectinload, joinedload
from sqlmodel import select

# ❌ N+1 problem — 1 query pr. ejendom til units:
stmt = select(Property).where(Property.tenant_id == tenant_id)
properties = (await session.execute(stmt)).scalars().all()
for p in properties:
    units = p.units  # Trigger N lazy loads!

# ✅ selectin_load — 2 queries total:
stmt = (
    select(Property)
    .where(Property.tenant_id == tenant_id)
    .options(selectinload(Property.units))
)
properties = (await session.execute(stmt)).scalars().all()
# p.units er nu tilgængeligt uden yderligere queries

# ✅ joined_load — 1 query med JOIN (godt ved lav kardinalitet):
stmt = (
    select(Property)
    .options(joinedload(Property.config))  # 1-1 relation
    .where(Property.tenant_id == tenant_id)
)

Upserts med ON CONFLICT

# server/src/mqtt/subscriber.py

from sqlalchemy.dialects.postgresql import insert

async def upsert_reading(
    session: AsyncSession,
    reading: dict,
) -> None:
    """
    Idempotent upsert: Indsæt eller opdatér ved duplikat (meter_id + timestamp).
    Brugt ved MQTT-payloads der kan gentransmitteres.
    """
    stmt = insert(Reading).values(
        meter_installation_id=reading["installation_id"],
        timestamp=reading["timestamp"],
        value=reading["value"],
        rssi_dbm=reading.get("rssi_dbm"),
        battery_level_pct=reading.get("battery_level_pct"),
        received_at=datetime.now(timezone.utc),
    )
    # ON CONFLICT: Opdatér kun nyere data
    stmt = stmt.on_conflict_do_update(
        constraint="uq_reading_installation_timestamp",
        set_={
            "value": stmt.excluded.value,
            "rssi_dbm": stmt.excluded.rssi_dbm,
            "received_at": stmt.excluded.received_at,
        },
    )
    await session.execute(stmt)

Bulk insert med COPY

# server/src/db/bulk.py

import asyncpg

async def bulk_insert_readings(
    conn: asyncpg.Connection,
    readings: list[dict],
) -> int:
    """
    asyncpg COPY til bulk insert — 10-50× hurtigere end INSERT pr. række.
    Bruges ved daglig MQTT-payload med 50-200 aflæsninger.
    """
    rows = [
        (
            r["meter_installation_id"],
            r["timestamp"],
            r["value"],
            r.get("rssi_dbm"),
            r.get("battery_level_pct"),
            datetime.now(timezone.utc),
        )
        for r in readings
    ]

    count = await conn.copy_records_to_table(
        "reading",
        records=rows,
        columns=[
            "meter_installation_id",
            "timestamp",
            "value",
            "rssi_dbm",
            "battery_level_pct",
            "received_at",
        ],
    )
    return count

# Brug fra FastAPI med asyncpg direkte:
async def process_gateway_payload(pool: asyncpg.Pool, readings: list) -> None:
    async with pool.acquire() as conn:
        async with conn.transaction():
            count = await bulk_insert_readings(conn, readings)

Cursor-baseret paginering

# server/src/readings/router.py

from datetime import datetime
from typing import Optional

@router.get("/readings")
async def list_readings(
    installation_id: UUID,
    before: Optional[datetime] = None,  # Cursor (exclusive)
    limit: int = 100,
    user: TokenPayload = Depends(get_current_user),
    session: AsyncSession = Depends(get_session),
):
    """
    Cursor-baseret paginering — stabil ved concurrent inserts.
    Bruger timestamp som cursor (unikt nok for readings).
    """
    stmt = (
        select(Reading)
        .where(Reading.meter_installation_id == installation_id)
        .order_by(Reading.timestamp.desc())
        .limit(limit + 1)  # Hent én ekstra for at detektere næste side
    )

    if before:
        stmt = stmt.where(Reading.timestamp < before)

    result = (await session.execute(stmt)).scalars().all()

    has_next = len(result) > limit
    items = result[:limit]

    return {
        "items": [r.model_dump() for r in items],
        "next_cursor": items[-1].timestamp.isoformat() if has_next else None,
        "has_next": has_next,
    }

Subquery og scalar subquery

# Dashboard: Seneste aflæsning pr. installation

from sqlalchemy import func, select as sa_select

latest_reading_subq = (
    sa_select(func.max(Reading.timestamp))
    .where(Reading.meter_installation_id == MeterInstallation.id)
    .correlate(MeterInstallation)
    .scalar_subquery()
)

stmt = (
    select(
        MeterInstallation.id,
        MeterInstallation.unit_id,
        latest_reading_subq.label("last_reading_at"),
    )
    .where(MeterInstallation.tenant_id == tenant_id)
    .where(MeterInstallation.deleted_at.is_(None))
    .order_by(latest_reading_subq.asc().nullsfirst())
)

# Filtrér stale installationer (ingen reading > 48t):
stmt = stmt.having(
    latest_reading_subq < datetime.now(timezone.utc) - timedelta(hours=48)
)

Common pitfalls

# ❌ Pitfall 1: Brug session efter commit
async with session.begin():
    obj = await session.get(Property, prop_id)
    await session.commit()
print(obj.name)  # DetachedInstanceError! expire_on_commit=True

# ✅ Fix: expire_on_commit=False i sessionmaker (se ovenfor)
# ELLER: tilgå attributter inden commit

# ❌ Pitfall 2: Lazy loading i async context
stmt = select(Property)
prop = (await session.execute(stmt)).scalar_one()
units = prop.units  # MissingGreenlet error! Lazy load ikke muligt

# ✅ Fix: Brug selectinload eksplicit

# ❌ Pitfall 3: session.execute() uden await
result = session.execute(stmt)  # Returner coroutine, ikke result!

# ✅ Fix: Altid await
result = await session.execute(stmt)

# ❌ Pitfall 4: Brug add_all med allerede-persisterede objekter
# ved bulk update — bruger individuelle UPDATE statements
session.add_all(objects_to_update)  # N UPDATE statements

# ✅ Fix: Brug bulk_update_mappings for store batches
await session.execute(
    update(Reading).where(Reading.id == bindparam("_id")),
    [{"_id": r.id, "value": r.value} for r in readings],
)

TimescaleDB hypertable specifics

# SQLModel + TimescaleDB: Definer reading som hypertable

class Reading(SQLModel, table=True):
    __tablename__ = "reading"
    __table_args__ = (
        # Primærnøgle inkluderer tid (krav til TimescaleDB hypertable)
        PrimaryKeyConstraint("meter_installation_id", "timestamp"),
        # Unik constraint til ON CONFLICT upsert
        UniqueConstraint(
            "meter_installation_id",
            "timestamp",
            name="uq_reading_installation_timestamp",
        ),
    )

    meter_installation_id: UUID = Field(foreign_key="meter_installation.id")
    timestamp: datetime = Field(sa_column=Column(
        DateTime(timezone=True),
        nullable=False,
    ))
    value: float
    rssi_dbm: float | None = None
    battery_level_pct: float | None = None
    received_at: datetime

# NB: Reading har IKKE tenant_id direkte (arves via meter_installation)
# NB: Reading har IKKE deleted_at (append-only tidsseriedata)

Konklusion

SQLAlchemy 2.0 async kræver eksplicitte await overalt, eager loading via selectinload/joinedload for at undgå N+1, og expire_on_commit=False for at tilgå attributter efter commit. ON CONFLICT DO UPDATE upserts sikrer idempotens ved MQTT-retransmission. asyncpg COPY giver 10-50× bedre bulk insert performance. TimescaleDB hypertables kræver at primærnøglen inkluderer timestamp-kolonnen.

Se asyncpg guide eller TimescaleDB hypertable guide.