M-Bus Gateway
← Tilbage til blog
· FastAPI· SQLAlchemy· async· PostgreSQL· Python· backend· database· IoT

FastAPI og SQLAlchemy async — avancerede mønstre

FastAPI og SQLAlchemy async: cursor-baseret paginering, upsert med ON CONFLICT, advisory locks, COPY bulk-insert, materialized views og connection pool-tuning til IoT-platform.

Af M-Bus Gateway

SQLAlchemy async til PostgreSQL IoT-platforme kræver specifikke mønstre for performance og korrekthed. Her er de produktionsklare patterns.


Async session factory med lifespan

# server/src/db/session.py

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from contextlib import asynccontextmanager

engine = create_async_engine(
    settings.database_url,
    pool_size=20,             # Baseline pool
    max_overflow=40,          # Burst-kapacitet
    pool_timeout=30,          # Vent max 30s på ledig connection
    pool_pre_ping=True,       # Test connection inden brug
    pool_recycle=3600,        # Genbrug connections max 1 time
    echo=settings.debug,
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,    # Undgå lazy load fejl efter commit
    autoflush=False,           # Eksplicit flush-kontrol
)


@asynccontextmanager
async def get_async_session() -> AsyncSession:
    """Context manager til programmatisk brug (ikke FastAPI Depends)."""
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


async def get_session() -> AsyncSession:
    """FastAPI Depends-funktion."""
    async with AsyncSessionLocal() as session:
        yield session

Cursor-baseret paginering (keyset)

# server/src/readings/repository.py
# Keyset pagination — stabil og hurtig ved store datasæt

from sqlalchemy import select, and_, tuple_
import base64
import json


def encode_cursor(timestamp: datetime, id: uuid.UUID) -> str:
    """Opak cursor — skjuler implementation detaljer."""
    data = {"ts": timestamp.isoformat(), "id": str(id)}
    return base64.b64encode(json.dumps(data).encode()).decode()


def decode_cursor(cursor: str) -> tuple[datetime, uuid.UUID]:
    data = json.loads(base64.b64decode(cursor).decode())
    return datetime.fromisoformat(data["ts"]), uuid.UUID(data["id"])


async def list_readings_cursor(
    session: AsyncSession,
    tenant_id: uuid.UUID,
    meter_installation_id: uuid.UUID,
    cursor: str | None = None,
    limit: int = 100,
) -> tuple[list[Reading], str | None]:
    """
    Keyset pagination med komposit cursor (timestamp, id).
    Stabil selv ved inserts — ingen offset-skew.
    """
    stmt = (
        select(Reading)
        .where(
            Reading.tenant_id == tenant_id,
            Reading.meter_installation_id == meter_installation_id,
            Reading.deleted_at.is_(None),
        )
        .order_by(Reading.timestamp.desc(), Reading.id.desc())
        .limit(limit + 1)    # Hent én ekstra for next_cursor detection
    )

    if cursor:
        ts, rid = decode_cursor(cursor)
        stmt = stmt.where(
            tuple_(Reading.timestamp, Reading.id) < tuple_(ts, rid)
        )

    rows = (await session.execute(stmt)).scalars().all()

    if len(rows) > limit:
        last = rows[limit - 1]
        next_cursor = encode_cursor(last.timestamp, last.id)
        return rows[:limit], next_cursor

    return rows, None

Upsert med ON CONFLICT

# server/src/mqtt/subscriber.py
# MQTT gateway-payload: Upsert aflæsninger (idempotent ved gensendelse)

from sqlalchemy.dialects.postgresql import insert


async def upsert_readings(
    session: AsyncSession,
    readings: list[dict],
    gateway_id: str,
) -> int:
    """
    Upsert aflæsninger — idempotent ved MQTT-gensendelse.
    ON CONFLICT på (meter_installation_id, timestamp).
    """
    if not readings:
        return 0

    stmt = insert(Reading).values(readings)
    stmt = stmt.on_conflict_do_update(
        index_elements=["meter_installation_id", "timestamp"],
        set_={
            "value": stmt.excluded.value,
            "received_at": stmt.excluded.received_at,
            "gateway_id": stmt.excluded.gateway_id,
        }
    )
    result = await session.execute(stmt)
    await session.commit()
    return result.rowcount


# Gateway-duplicate håndtering:
# → Samme gateway sender samme dag-payload (MQTT QoS 1)
# → ON CONFLICT opdaterer value + received_at
# → Ingen dubletter i readings-tabel

COPY bulk-insert til historisk import

# server/src/readings/bulk_import.py
# Brug PostgreSQL COPY for max throughput ved historisk dataimport

from asyncpg import Connection
import csv
import io


async def bulk_import_readings(
    conn: Connection,
    readings: list[dict],
    tenant_id: uuid.UUID,
) -> int:
    """
    PostgreSQL COPY FROM — 100x hurtigere end INSERT ved store datasæt.
    Bruges til historisk datamigrering fra Excel/Techem.
    """
    buffer = io.StringIO()
    writer = csv.writer(buffer)

    for r in readings:
        writer.writerow([
            str(uuid.uuid4()),           # id
            str(tenant_id),              # tenant_id
            str(r["meter_installation_id"]),
            r["timestamp"].isoformat(),
            r["value"],
            r.get("unit", "kWh"),
            datetime.utcnow().isoformat(),  # received_at
        ])

    buffer.seek(0)
    result = await conn.copy_to_table(
        "readings",
        source=buffer,
        columns=["id", "tenant_id", "meter_installation_id",
                 "timestamp", "value", "unit", "received_at"],
        format="csv",
    )

    return result


# Benchmark:
# INSERT INTO (1.000 rows): ~500ms
# COPY FROM (1.000 rows):    ~20ms (25x hurtigere)
# COPY FROM (100.000 rows): ~500ms

Advisory locks til concurrency kontrol

# server/src/settlements/service.py
# Advisory lock: Forhindrer dobbelt settlement-generering

from sqlalchemy import text


async def generate_settlement_exclusive(
    session: AsyncSession,
    property_id: uuid.UUID,
    period: SettlementPeriod,
) -> Settlement:
    """
    Brug PostgreSQL advisory lock til at forhindre race condition.
    Kun én settlement kan genereres pr. ejendom ad gangen.
    """
    lock_key = hash(str(property_id)) % (2**31)

    # Forsøg at tage advisory lock (non-blocking):
    acquired = await session.scalar(
        text("SELECT pg_try_advisory_xact_lock(:key)"),
        {"key": lock_key},
    )

    if not acquired:
        raise ConflictError(
            "En afregning genereres allerede for denne ejendom. Prøv igen."
        )

    # Lock holdes til transaktionen afslutter:
    existing = await _check_existing_settlement(session, property_id, period)
    if existing:
        raise ConflictError("Afregning for denne periode eksisterer allerede")

    return await _do_generate_settlement(session, property_id, period)

Connection pool tuning til IoT

# Optimal pool-konfiguration for IoT-platform:

# Workload profil:
# - 24/7 gateway-data: ~100 inserts/sek (peak morgen 06:00 UTC)
# - Få samtidige HTTP-brugere (SaaS — ikke consumer-app)
# - Tunge queries: settlement-beregning, portfolio-analyse

engine = create_async_engine(
    settings.database_url,

    # Pool-størrelse:
    # pool_size = baseline concurrent connections
    # Sæt til: antal workers × expected DB-time ratio
    pool_size=20,

    # max_overflow: Ekstra connections ved burst
    # Gateway-burst kl. 06:00: 100 gateways sender samtidigt
    max_overflow=80,    # Total max: 100 connections

    # pool_timeout: Hvad sker der ved alle connections optaget?
    pool_timeout=5.0,   # Kast TimeoutError efter 5s (ikke uendeligt vent)

    # pool_pre_ping: Test forbindelsen inden brug
    pool_pre_ping=True,  # Fanger dropped connections (network hickup)

    # pool_recycle: Genopret connections periodisk
    pool_recycle=1800,   # 30 minutter (forhindrer stale connections)

    # Execution options:
    execution_options={
        "isolation_level": "READ COMMITTED",   # Default er fint
    },

    # Statement timeout (forhindrer rogue queries):
    connect_args={
        "server_settings": {
            "statement_timeout": "30000",   # 30 sekunder
            "idle_in_transaction_session_timeout": "60000",  # 60 sekunder
        }
    },
)

Konklusion

SQLAlchemy async til IoT-platforme kræver fire kritiske mønstre: expire_on_commit=False for lazy load-fri sessions, keyset pagination med opak cursor til stabile sider ved concurrent inserts, ON CONFLICT DO UPDATE upsert til idempotent MQTT-payload-håndtering, og COPY FROM til 25x hurtigere bulk-import. Advisory locks forhindrer race conditions ved settlement-generering. Pool-størrelse dimensioneres til gateway-burst (kl. 06:00 UTC) med pool_timeout=5s der kaster fejl i stedet for at blokere.

Se asyncpg guide eller SQLAlchemy async patterns guide.