M-Bus Gateway
← Tilbage til blog
· python· sqlalchemy· postgresql· timescaledb· bulk-insert· upsert· window-functions· CTE· IoT· SaaS

Python SQLAlchemy Core — avancerede mønstre til IoT og SaaS

SQLAlchemy Core mønstre til IoT og SaaS: text() raw SQL, bulk insert via COPY, upsert ON CONFLICT, window functions til RSSI-trend, CTE til hierarkisk data og connection pool tuning.

Af M-Bus Gateway

SQLAlchemy ORM er praktisk til CRUD, men IoT-data kræver Core SQL til bulk-operationer, window functions og TimescaleDB-specifikke queries. Her er mønstrene.


text() — raw SQL når ORM ikke er nok

# server/src/db/queries.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from decimal import Decimal


async def get_portfolio_heat_summary(
    tenant_id: str,
    period_start: str,
    period_end: str,
    db: AsyncSession,
) -> list[dict]:
    """
    Kompleks aggregering der kombinerer TimescaleDB og PostgreSQL.
    ORM kan ikke udtrykke denne query — text() er rettigt valg.
    """
    query = text("""
        SELECT
            p.id AS property_id,
            p.address,
            COUNT(DISTINCT mi.id) AS meter_count,
            SUM(r.value_kwh) AS total_kwh,
            AVG(r.rssi_dbm) AS avg_rssi,
            MAX(r.timestamp) AS last_reading
        FROM properties p
        LEFT JOIN units u ON u.property_id = p.id AND u.deleted_at IS NULL
        LEFT JOIN meter_installations mi ON mi.unit_id = u.id AND mi.removed_at IS NULL
        LEFT JOIN readings r ON r.meter_installation_id = mi.id
            AND r.timestamp BETWEEN :period_start AND :period_end
        WHERE p.tenant_id = :tenant_id
            AND p.deleted_at IS NULL
        GROUP BY p.id, p.address
        ORDER BY total_kwh DESC NULLS LAST
    """)

    result = await db.execute(
        query,
        {"tenant_id": tenant_id, "period_start": period_start, "period_end": period_end},
    )
    return [dict(row._mapping) for row in result.fetchall()]

Bulk INSERT via asyncpg COPY

# server/src/db/bulk_insert.py
# asyncpg COPY er 10-50x hurtigere end individuelle INSERT

import asyncpg
from datetime import datetime


async def bulk_insert_readings(
    readings: list[dict],
    dsn: str,
) -> int:
    """
    Indsæt op til 100.000 aflæsninger pr. sekund via asyncpg COPY.
    readings: [{"meter_installation_id": str, "timestamp": datetime, "value_kwh": float, ...}]
    """
    conn = await asyncpg.connect(dsn)
    try:
        # COPY FROM RECORDS: Hurtigste metode
        await conn.copy_records_to_table(
            "readings",
            records=[
                (
                    r["meter_installation_id"],
                    r["timestamp"],
                    r["value_kwh"],
                    r.get("rssi_dbm"),
                    r.get("battery_pct"),
                    datetime.utcnow(),    # received_at
                )
                for r in readings
            ],
            columns=[
                "meter_installation_id",
                "timestamp",
                "value_kwh",
                "rssi_dbm",
                "battery_pct",
                "received_at",
            ],
        )
        return len(readings)
    finally:
        await conn.close()


# Alternativ: INSERT ON CONFLICT DO NOTHING (idempotent)
async def upsert_readings_safe(readings: list[dict], db: AsyncSession) -> None:
    """Idempotent insert — sikker ved dubleret levering fra gateway."""
    from sqlalchemy.dialects.postgresql import insert

    if not readings:
        return

    stmt = insert(Reading).values(readings)
    stmt = stmt.on_conflict_do_nothing(
        index_elements=["meter_installation_id", "timestamp"]
    )
    await db.execute(stmt)
    await db.commit()

Window functions — RSSI-trend og forbrug

# server/src/analytics/window_queries.py
from sqlalchemy import text


async def rssi_trend_by_gateway(
    gateway_id: str,
    days: int,
    db: AsyncSession,
) -> list[dict]:
    """
    Window function: Sammenlign RSSI med 7-dages glidende gennemsnit.
    Bruges til at detektere forværring i signalstyrke.
    """
    query = text("""
        WITH daily_rssi AS (
            SELECT
                DATE_TRUNC('day', r.timestamp) AS day,
                AVG(r.rssi_dbm) AS avg_rssi
            FROM readings r
            JOIN meter_installations mi ON mi.id = r.meter_installation_id
            JOIN gateways g ON g.id = mi.gateway_id
            WHERE g.gateway_id = :gateway_id
                AND r.timestamp > NOW() - MAKE_INTERVAL(days => :days)
            GROUP BY 1
        )
        SELECT
            day,
            avg_rssi,
            AVG(avg_rssi) OVER (
                ORDER BY day
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) AS rolling_7d_avg,
            avg_rssi - LAG(avg_rssi, 7) OVER (ORDER BY day) AS week_delta
        FROM daily_rssi
        ORDER BY day
    """)

    result = await db.execute(query, {"gateway_id": gateway_id, "days": days})
    return [dict(row._mapping) for row in result.fetchall()]


async def cumulative_consumption_by_unit(
    unit_id: str,
    db: AsyncSession,
) -> list[dict]:
    """
    Kumulativt forbrug: Running SUM over 12 måneder.
    EU EED 2027: Lejerportal viser akkumuleret forbrug.
    """
    query = text("""
        SELECT
            DATE_TRUNC('month', timestamp) AS month,
            SUM(value_kwh) AS monthly_kwh,
            SUM(SUM(value_kwh)) OVER (ORDER BY DATE_TRUNC('month', timestamp)) AS cumulative_kwh
        FROM readings r
        JOIN meter_installations mi ON mi.id = r.meter_installation_id
        WHERE mi.unit_id = :unit_id
            AND timestamp > NOW() - INTERVAL '12 months'
        GROUP BY 1
        ORDER BY 1
    """)

    result = await db.execute(query, {"unit_id": unit_id})
    return [dict(row._mapping) for row in result.fetchall()]

CTE — hierarkisk data og fordelingsresultat

# server/src/distribution/cte_queries.py
from sqlalchemy import text


async def get_distribution_with_occupancy_history(
    annual_input_id: str,
    db: AsyncSession,
) -> list[dict]:
    """
    CTE: Kombiner fordelingsresultat med occupancy-historik.
    Bruges til at vise lejerhistorik i afregnings-audit.
    """
    query = text("""
        WITH distribution AS (
            SELECT
                dr.unit_id,
                dr.fixed_amount,
                dr.variable_amount,
                dr.total_amount,
                dr.period_days
            FROM distribution_results dr
            WHERE dr.annual_input_id = :annual_input_id
        ),
        occupancy_in_period AS (
            SELECT
                o.unit_id,
                u.name AS tenant_name,
                u.email AS tenant_email,
                o.move_in_date,
                o.move_out_date,
                o.floor_area_m2
            FROM occupancies o
            JOIN users u ON u.id = o.user_id
            WHERE o.annual_input_id = :annual_input_id
                AND o.deleted_at IS NULL
        )
        SELECT
            d.unit_id,
            d.fixed_amount,
            d.variable_amount,
            d.total_amount,
            d.period_days,
            o.tenant_name,
            o.tenant_email,
            o.move_in_date,
            o.move_out_date,
            o.floor_area_m2
        FROM distribution d
        LEFT JOIN occupancy_in_period o ON o.unit_id = d.unit_id
        ORDER BY d.unit_id
    """)

    result = await db.execute(query, {"annual_input_id": annual_input_id})
    return [dict(row._mapping) for row in result.fetchall()]

Connection pool tuning til IoT burst

# server/src/db/engine.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from server.src.config import get_settings


def create_engine() -> AsyncEngine:
    """
    Connection pool konfigureret til IoT-burst mønster:
    - Normale dage: 1-5 samtidige requests
    - Kl. 06:00 UTC: Alle 50 gateways sender data samtidigt → burst
    """
    settings = get_settings()

    return create_async_engine(
        str(settings.database_url),
        # Pool-størrelse: Normalt 10, burst til 30
        pool_size=10,
        max_overflow=20,          # Op til 30 samtidige forbindelser
        pool_timeout=30,          # Max ventetid på ledig forbindels
        pool_recycle=3600,        # Genopret forbindelser hvert 60 min (undgår server-timeout)
        pool_pre_ping=True,       # Test forbindelsen inden brug (undgår stale connections)

        # PostgreSQL tuning
        connect_args={
            "command_timeout": 60,
            "statement_timeout": "30000",   # 30 sekunder per statement
            "application_name": "mbus-server",
        },

        # Logging (kun i dev)
        echo=settings.environment == "development",
    )

TimescaleDB-specifik: chunks og retention

# server/src/db/timescale_setup.py
# Kør ved migrations — TimescaleDB hypertable-opsætning

from sqlalchemy import text


async def setup_timescaledb_hypertable(db: AsyncSession) -> None:
    """
    Konvertér readings-tabel til TimescaleDB hypertable.
    Kald kun én gang ved oprettelse — fejler gracefully ved genopkald.
    """
    # 1. Konvertér til hypertable (chunk pr. 7 dage)
    await db.execute(text("""
        SELECT create_hypertable(
            'readings',
            'timestamp',
            chunk_time_interval => INTERVAL '7 days',
            if_not_exists => TRUE
        )
    """))

    # 2. Retention policy: Slet rådata ældre end 10 år
    await db.execute(text("""
        SELECT add_retention_policy(
            'readings',
            INTERVAL '10 years',
            if_not_exists => TRUE
        )
    """))

    # 3. Komprimering: Aktiver for chunks ældre end 30 dage
    await db.execute(text("""
        ALTER TABLE readings SET (
            timescaledb.compress,
            timescaledb.compress_segmentby = 'meter_installation_id',
            timescaledb.compress_orderby = 'timestamp DESC'
        )
    """))
    await db.execute(text("""
        SELECT add_compression_policy(
            'readings',
            INTERVAL '30 days',
            if_not_exists => TRUE
        )
    """))

    await db.commit()

Konklusion

SQLAlchemy ORM dækker 80% af databaseoperationerne, men IoT-data kræver text() til komplekse aggregeringer, asyncpg COPY til bulk-insert af gateway-telemetri (10-50x hurtigere), window functions til RSSI-trend og kumulativt forbrug, og CTE til hierarkisk fordelingsresultat med lejerhistorik. Connection pool tuning til IoT-burst (kl. 06:00 UTC) sikrer at alle 50 gateways kan levere data simultant.

Se asyncpg PostgreSQL guide eller TimescaleDB hypertable guide.