asyncpg og PostgreSQL i Python — asynkron databaseadgang til IoT
asyncpg til asynkron PostgreSQL-adgang i Python: connection pools, prepared statements, COPY-protokol til bulk-insert, transactions og performance sammenligning med psycopg2.
Af M-Bus Gateway
asyncpg er den hurtigste Python-driver til PostgreSQL — og den eneste der er fuldt asynkron. Her er mønstrene der giver optimal performance til IoT-tidsseriedata.
asyncpg vs. psycopg2 vs. psycopg3
Performance benchmark (1.000 simple queries):
psycopg2 (synkron): 580 queries/sekund
psycopg3 (asynkron): 1.200 queries/sekund
asyncpg (asynkron): 3.800 queries/sekund
asyncpg er 6,5× hurtigere end psycopg2 fordi:
→ Bruger PostgreSQL binary protocol (ikke text protocol)
→ Ingen Python DB-API overhead (custom asyncio integration)
→ Prepared statements caches automatisk
→ Ingen GIL-blokering ved I/O
SQLModel + asyncpg: Standard opsætning
# server/src/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
# Production:
DATABASE_URL = "postgresql+asyncpg://mbus:password@localhost/mbus"
engine = create_async_engine(
DATABASE_URL,
# Connection pool:
pool_size=20, # Basis-pool (persistente connections)
max_overflow=30, # Ekstra ved spidsbelastning
pool_timeout=30, # Timeout for at få connection fra pool
pool_pre_ping=True, # Verificer connection inden brug (undgå stale)
pool_recycle=3600, # Genopret connections hvert time
# Performance:
echo=False, # Ingen SQL-logging i prod
connect_args={
"statement_cache_size": 100, # Prepared statement cache
"command_timeout": 30, # Query timeout
"server_settings": {
"application_name": "mbus-api",
},
},
)
async_session_factory = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Undgå re-query efter commit
autocommit=False,
autoflush=False,
)
async def get_session() -> AsyncSession:
"""FastAPI dependency."""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Bulk-insert med COPY-protokol
Daglig MQTT-payload indeholder 20-500 aflæsninger — bulk-insert er kritisk:
# server/src/mqtt/subscriber.py
from sqlalchemy import text
import asyncpg
async def bulk_insert_readings(
readings: list[dict],
session: AsyncSession,
) -> int:
"""
COPY-protokol: 10-50× hurtigere end individual INSERTs.
Returnerer antal indsatte rækker.
"""
if not readings:
return 0
# Brug asyncpg direkte (undgå SQLAlchemy overhead):
conn = await session.connection()
raw_conn = await conn.get_raw_connection()
# Forbered data som tupler:
records = [
(
row["id"],
row["meter_installation_id"],
row["tenant_id"],
row["timestamp"],
row["value"],
row["unit"],
row["medium"],
row.get("rssi_dbm"),
row.get("battery_level_pct"),
)
for row in readings
]
# asyncpg COPY:
count = await raw_conn.copy_records_to_table(
"reading",
records=records,
columns=[
"id", "meter_installation_id", "tenant_id",
"timestamp", "value", "unit", "medium",
"rssi_dbm", "battery_level_pct",
],
schema_name="public",
)
return count
# Performance:
# 500 readings via INSERT (one by one): ~4 sekunder
# 500 readings via copy_records_to_table: ~80 millisekunder (50× hurtigere)
Upsert (ON CONFLICT DO UPDATE)
from sqlalchemy.dialects.postgresql import insert as pg_insert
async def upsert_gateway(
session: AsyncSession,
gateway_id: str,
last_seen: datetime,
firmware_version: str,
) -> None:
"""Opdatér gateway ved heartbeat — opret hvis ikke eksisterer."""
stmt = pg_insert(Gateway).values(
id=gateway_id,
last_seen=last_seen,
firmware_version=firmware_version,
).on_conflict_do_update(
index_elements=["id"],
set_={
"last_seen": last_seen,
"firmware_version": firmware_version,
"updated_at": datetime.utcnow(),
},
)
await session.execute(stmt)
Transaction management: Savepoints
async def process_daily_batch(
session: AsyncSession,
payload: MQTTPayload,
) -> dict:
"""
Behandl daglig batch med savepoints:
Enkeltmåler-fejl afbryder ikke hele batchen.
"""
results = {"success": 0, "failed": 0}
async with session.begin():
for reading in payload.readings:
# Savepoint pr. måler:
try:
async with session.begin_nested():
await insert_reading(session, reading)
results["success"] += 1
except Exception as e:
# Ruller kun savepoint tilbage — ikke hele transaktionen:
logger.warning(
"Reading insert fejlede",
meter_id=reading.meter_id,
error=str(e),
)
results["failed"] += 1
return results
Raw asyncpg for avancerede queries
# TimescaleDB-specifikke queries kræver indimellem raw asyncpg:
async def get_compression_stats(session: AsyncSession) -> list[dict]:
"""Hent chunk-komprimerings-statistik fra TimescaleDB."""
conn = await session.connection()
raw_conn = await conn.get_raw_connection()
rows = await raw_conn.fetch("""
SELECT
chunk_name,
pg_size_pretty(before_compression_total_bytes) AS before,
pg_size_pretty(after_compression_total_bytes) AS after,
ROUND(
(1 - after_compression_total_bytes::float /
before_compression_total_bytes) * 100
) AS compression_pct
FROM chunk_compression_stats('reading')
ORDER BY range_start DESC
LIMIT 10
""")
return [dict(row) for row in rows]
Connection pool monitoring
# Health check endpoint:
@router.get("/health/db")
async def db_health(session: AsyncSession = Depends(get_session)):
try:
await session.execute(text("SELECT 1"))
pool = engine.pool
return {
"status": "ok",
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
}
except Exception as e:
return {"status": "error", "detail": str(e)}
Konklusion
asyncpg er det rigtige valg til IoT-platforme med høj query-volumen: 6× hurtigere end psycopg2, fuldt asynkront og med COPY-protokol til bulk-insert der gør daglig MQTT-batch-behandling praktisk. SQLModel + asyncpg via SQLAlchemy 2.0 async er den anbefalede kombination for at bevare ORM-komfort og bruge asyncpg's performance-fordele.
Se TimescaleDB hypertable guide eller FastAPI multi-tenant guide.