Python SQLAlchemy Core — avancerede mønstre til IoT og SaaS
SQLAlchemy Core mønstre til IoT og SaaS: text() raw SQL, bulk insert via COPY, upsert ON CONFLICT, window functions til RSSI-trend, CTE til hierarkisk data og connection pool tuning.
Af M-Bus Gateway
SQLAlchemy ORM er praktisk til CRUD, men IoT-data kræver Core SQL til bulk-operationer, window functions og TimescaleDB-specifikke queries. Her er mønstrene.
text() — raw SQL når ORM ikke er nok
# server/src/db/queries.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from decimal import Decimal
async def get_portfolio_heat_summary(
tenant_id: str,
period_start: str,
period_end: str,
db: AsyncSession,
) -> list[dict]:
"""
Kompleks aggregering der kombinerer TimescaleDB og PostgreSQL.
ORM kan ikke udtrykke denne query — text() er rettigt valg.
"""
query = text("""
SELECT
p.id AS property_id,
p.address,
COUNT(DISTINCT mi.id) AS meter_count,
SUM(r.value_kwh) AS total_kwh,
AVG(r.rssi_dbm) AS avg_rssi,
MAX(r.timestamp) AS last_reading
FROM properties p
LEFT JOIN units u ON u.property_id = p.id AND u.deleted_at IS NULL
LEFT JOIN meter_installations mi ON mi.unit_id = u.id AND mi.removed_at IS NULL
LEFT JOIN readings r ON r.meter_installation_id = mi.id
AND r.timestamp BETWEEN :period_start AND :period_end
WHERE p.tenant_id = :tenant_id
AND p.deleted_at IS NULL
GROUP BY p.id, p.address
ORDER BY total_kwh DESC NULLS LAST
""")
result = await db.execute(
query,
{"tenant_id": tenant_id, "period_start": period_start, "period_end": period_end},
)
return [dict(row._mapping) for row in result.fetchall()]
Bulk INSERT via asyncpg COPY
# server/src/db/bulk_insert.py
# asyncpg COPY er 10-50x hurtigere end individuelle INSERT
import asyncpg
from datetime import datetime
async def bulk_insert_readings(
readings: list[dict],
dsn: str,
) -> int:
"""
Indsæt op til 100.000 aflæsninger pr. sekund via asyncpg COPY.
readings: [{"meter_installation_id": str, "timestamp": datetime, "value_kwh": float, ...}]
"""
conn = await asyncpg.connect(dsn)
try:
# COPY FROM RECORDS: Hurtigste metode
await conn.copy_records_to_table(
"readings",
records=[
(
r["meter_installation_id"],
r["timestamp"],
r["value_kwh"],
r.get("rssi_dbm"),
r.get("battery_pct"),
datetime.utcnow(), # received_at
)
for r in readings
],
columns=[
"meter_installation_id",
"timestamp",
"value_kwh",
"rssi_dbm",
"battery_pct",
"received_at",
],
)
return len(readings)
finally:
await conn.close()
# Alternativ: INSERT ON CONFLICT DO NOTHING (idempotent)
async def upsert_readings_safe(readings: list[dict], db: AsyncSession) -> None:
"""Idempotent insert — sikker ved dubleret levering fra gateway."""
from sqlalchemy.dialects.postgresql import insert
if not readings:
return
stmt = insert(Reading).values(readings)
stmt = stmt.on_conflict_do_nothing(
index_elements=["meter_installation_id", "timestamp"]
)
await db.execute(stmt)
await db.commit()
Window functions — RSSI-trend og forbrug
# server/src/analytics/window_queries.py
from sqlalchemy import text
async def rssi_trend_by_gateway(
gateway_id: str,
days: int,
db: AsyncSession,
) -> list[dict]:
"""
Window function: Sammenlign RSSI med 7-dages glidende gennemsnit.
Bruges til at detektere forværring i signalstyrke.
"""
query = text("""
WITH daily_rssi AS (
SELECT
DATE_TRUNC('day', r.timestamp) AS day,
AVG(r.rssi_dbm) AS avg_rssi
FROM readings r
JOIN meter_installations mi ON mi.id = r.meter_installation_id
JOIN gateways g ON g.id = mi.gateway_id
WHERE g.gateway_id = :gateway_id
AND r.timestamp > NOW() - MAKE_INTERVAL(days => :days)
GROUP BY 1
)
SELECT
day,
avg_rssi,
AVG(avg_rssi) OVER (
ORDER BY day
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS rolling_7d_avg,
avg_rssi - LAG(avg_rssi, 7) OVER (ORDER BY day) AS week_delta
FROM daily_rssi
ORDER BY day
""")
result = await db.execute(query, {"gateway_id": gateway_id, "days": days})
return [dict(row._mapping) for row in result.fetchall()]
async def cumulative_consumption_by_unit(
unit_id: str,
db: AsyncSession,
) -> list[dict]:
"""
Kumulativt forbrug: Running SUM over 12 måneder.
EU EED 2027: Lejerportal viser akkumuleret forbrug.
"""
query = text("""
SELECT
DATE_TRUNC('month', timestamp) AS month,
SUM(value_kwh) AS monthly_kwh,
SUM(SUM(value_kwh)) OVER (ORDER BY DATE_TRUNC('month', timestamp)) AS cumulative_kwh
FROM readings r
JOIN meter_installations mi ON mi.id = r.meter_installation_id
WHERE mi.unit_id = :unit_id
AND timestamp > NOW() - INTERVAL '12 months'
GROUP BY 1
ORDER BY 1
""")
result = await db.execute(query, {"unit_id": unit_id})
return [dict(row._mapping) for row in result.fetchall()]
CTE — hierarkisk data og fordelingsresultat
# server/src/distribution/cte_queries.py
from sqlalchemy import text
async def get_distribution_with_occupancy_history(
annual_input_id: str,
db: AsyncSession,
) -> list[dict]:
"""
CTE: Kombiner fordelingsresultat med occupancy-historik.
Bruges til at vise lejerhistorik i afregnings-audit.
"""
query = text("""
WITH distribution AS (
SELECT
dr.unit_id,
dr.fixed_amount,
dr.variable_amount,
dr.total_amount,
dr.period_days
FROM distribution_results dr
WHERE dr.annual_input_id = :annual_input_id
),
occupancy_in_period AS (
SELECT
o.unit_id,
u.name AS tenant_name,
u.email AS tenant_email,
o.move_in_date,
o.move_out_date,
o.floor_area_m2
FROM occupancies o
JOIN users u ON u.id = o.user_id
WHERE o.annual_input_id = :annual_input_id
AND o.deleted_at IS NULL
)
SELECT
d.unit_id,
d.fixed_amount,
d.variable_amount,
d.total_amount,
d.period_days,
o.tenant_name,
o.tenant_email,
o.move_in_date,
o.move_out_date,
o.floor_area_m2
FROM distribution d
LEFT JOIN occupancy_in_period o ON o.unit_id = d.unit_id
ORDER BY d.unit_id
""")
result = await db.execute(query, {"annual_input_id": annual_input_id})
return [dict(row._mapping) for row in result.fetchall()]
Connection pool tuning til IoT burst
# server/src/db/engine.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from server.src.config import get_settings
def create_engine() -> AsyncEngine:
"""
Connection pool konfigureret til IoT-burst mønster:
- Normale dage: 1-5 samtidige requests
- Kl. 06:00 UTC: Alle 50 gateways sender data samtidigt → burst
"""
settings = get_settings()
return create_async_engine(
str(settings.database_url),
# Pool-størrelse: Normalt 10, burst til 30
pool_size=10,
max_overflow=20, # Op til 30 samtidige forbindelser
pool_timeout=30, # Max ventetid på ledig forbindels
pool_recycle=3600, # Genopret forbindelser hvert 60 min (undgår server-timeout)
pool_pre_ping=True, # Test forbindelsen inden brug (undgår stale connections)
# PostgreSQL tuning
connect_args={
"command_timeout": 60,
"statement_timeout": "30000", # 30 sekunder per statement
"application_name": "mbus-server",
},
# Logging (kun i dev)
echo=settings.environment == "development",
)
TimescaleDB-specifik: chunks og retention
# server/src/db/timescale_setup.py
# Kør ved migrations — TimescaleDB hypertable-opsætning
from sqlalchemy import text
async def setup_timescaledb_hypertable(db: AsyncSession) -> None:
"""
Konvertér readings-tabel til TimescaleDB hypertable.
Kald kun én gang ved oprettelse — fejler gracefully ved genopkald.
"""
# 1. Konvertér til hypertable (chunk pr. 7 dage)
await db.execute(text("""
SELECT create_hypertable(
'readings',
'timestamp',
chunk_time_interval => INTERVAL '7 days',
if_not_exists => TRUE
)
"""))
# 2. Retention policy: Slet rådata ældre end 10 år
await db.execute(text("""
SELECT add_retention_policy(
'readings',
INTERVAL '10 years',
if_not_exists => TRUE
)
"""))
# 3. Komprimering: Aktiver for chunks ældre end 30 dage
await db.execute(text("""
ALTER TABLE readings SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'meter_installation_id',
timescaledb.compress_orderby = 'timestamp DESC'
)
"""))
await db.execute(text("""
SELECT add_compression_policy(
'readings',
INTERVAL '30 days',
if_not_exists => TRUE
)
"""))
await db.commit()
Konklusion
SQLAlchemy ORM dækker 80% af databaseoperationerne, men IoT-data kræver text() til komplekse aggregeringer, asyncpg COPY til bulk-insert af gateway-telemetri (10-50x hurtigere), window functions til RSSI-trend og kumulativt forbrug, og CTE til hierarkisk fordelingsresultat med lejerhistorik. Connection pool tuning til IoT-burst (kl. 06:00 UTC) sikrer at alle 50 gateways kan levere data simultant.
Se asyncpg PostgreSQL guide eller TimescaleDB hypertable guide.