M-Bus Gateway
← Tilbage til blog
· asyncpg· PostgreSQL· Python· async· database· SQLModel· IoT· performance· connection pool

asyncpg og PostgreSQL i Python — asynkron databaseadgang til IoT

asyncpg til asynkron PostgreSQL-adgang i Python: connection pools, prepared statements, COPY-protokol til bulk-insert, transactions og performance sammenligning med psycopg2.

Af M-Bus Gateway

asyncpg er den hurtigste Python-driver til PostgreSQL — og den eneste der er fuldt asynkron. Her er mønstrene der giver optimal performance til IoT-tidsseriedata.


asyncpg vs. psycopg2 vs. psycopg3

Performance benchmark (1.000 simple queries):

psycopg2 (synkron):        580 queries/sekund
psycopg3 (asynkron):       1.200 queries/sekund
asyncpg (asynkron):        3.800 queries/sekund

asyncpg er 6,5× hurtigere end psycopg2 fordi:
→ Bruger PostgreSQL binary protocol (ikke text protocol)
→ Ingen Python DB-API overhead (custom asyncio integration)
→ Prepared statements caches automatisk
→ Ingen GIL-blokering ved I/O

SQLModel + asyncpg: Standard opsætning

# server/src/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

# Production:
DATABASE_URL = "postgresql+asyncpg://mbus:password@localhost/mbus"

engine = create_async_engine(
    DATABASE_URL,
    # Connection pool:
    pool_size=20,           # Basis-pool (persistente connections)
    max_overflow=30,        # Ekstra ved spidsbelastning
    pool_timeout=30,        # Timeout for at få connection fra pool
    pool_pre_ping=True,     # Verificer connection inden brug (undgå stale)
    pool_recycle=3600,      # Genopret connections hvert time
    # Performance:
    echo=False,             # Ingen SQL-logging i prod
    connect_args={
        "statement_cache_size": 100,   # Prepared statement cache
        "command_timeout": 30,          # Query timeout
        "server_settings": {
            "application_name": "mbus-api",
        },
    },
)

async_session_factory = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Undgå re-query efter commit
    autocommit=False,
    autoflush=False,
)


async def get_session() -> AsyncSession:
    """FastAPI dependency."""
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Bulk-insert med COPY-protokol

Daglig MQTT-payload indeholder 20-500 aflæsninger — bulk-insert er kritisk:

# server/src/mqtt/subscriber.py
from sqlalchemy import text
import asyncpg

async def bulk_insert_readings(
    readings: list[dict],
    session: AsyncSession,
) -> int:
    """
    COPY-protokol: 10-50× hurtigere end individual INSERTs.
    Returnerer antal indsatte rækker.
    """
    if not readings:
        return 0

    # Brug asyncpg direkte (undgå SQLAlchemy overhead):
    conn = await session.connection()
    raw_conn = await conn.get_raw_connection()

    # Forbered data som tupler:
    records = [
        (
            row["id"],
            row["meter_installation_id"],
            row["tenant_id"],
            row["timestamp"],
            row["value"],
            row["unit"],
            row["medium"],
            row.get("rssi_dbm"),
            row.get("battery_level_pct"),
        )
        for row in readings
    ]

    # asyncpg COPY:
    count = await raw_conn.copy_records_to_table(
        "reading",
        records=records,
        columns=[
            "id", "meter_installation_id", "tenant_id",
            "timestamp", "value", "unit", "medium",
            "rssi_dbm", "battery_level_pct",
        ],
        schema_name="public",
    )
    return count


# Performance:
# 500 readings via INSERT (one by one):  ~4 sekunder
# 500 readings via copy_records_to_table: ~80 millisekunder (50× hurtigere)

Upsert (ON CONFLICT DO UPDATE)

from sqlalchemy.dialects.postgresql import insert as pg_insert

async def upsert_gateway(
    session: AsyncSession,
    gateway_id: str,
    last_seen: datetime,
    firmware_version: str,
) -> None:
    """Opdatér gateway ved heartbeat — opret hvis ikke eksisterer."""
    stmt = pg_insert(Gateway).values(
        id=gateway_id,
        last_seen=last_seen,
        firmware_version=firmware_version,
    ).on_conflict_do_update(
        index_elements=["id"],
        set_={
            "last_seen": last_seen,
            "firmware_version": firmware_version,
            "updated_at": datetime.utcnow(),
        },
    )
    await session.execute(stmt)

Transaction management: Savepoints

async def process_daily_batch(
    session: AsyncSession,
    payload: MQTTPayload,
) -> dict:
    """
    Behandl daglig batch med savepoints:
    Enkeltmåler-fejl afbryder ikke hele batchen.
    """
    results = {"success": 0, "failed": 0}

    async with session.begin():
        for reading in payload.readings:
            # Savepoint pr. måler:
            try:
                async with session.begin_nested():
                    await insert_reading(session, reading)
                    results["success"] += 1
            except Exception as e:
                # Ruller kun savepoint tilbage — ikke hele transaktionen:
                logger.warning(
                    "Reading insert fejlede",
                    meter_id=reading.meter_id,
                    error=str(e),
                )
                results["failed"] += 1

    return results

Raw asyncpg for avancerede queries

# TimescaleDB-specifikke queries kræver indimellem raw asyncpg:

async def get_compression_stats(session: AsyncSession) -> list[dict]:
    """Hent chunk-komprimerings-statistik fra TimescaleDB."""
    conn = await session.connection()
    raw_conn = await conn.get_raw_connection()

    rows = await raw_conn.fetch("""
        SELECT
            chunk_name,
            pg_size_pretty(before_compression_total_bytes) AS before,
            pg_size_pretty(after_compression_total_bytes) AS after,
            ROUND(
                (1 - after_compression_total_bytes::float /
                     before_compression_total_bytes) * 100
            ) AS compression_pct
        FROM chunk_compression_stats('reading')
        ORDER BY range_start DESC
        LIMIT 10
    """)

    return [dict(row) for row in rows]

Connection pool monitoring

# Health check endpoint:
@router.get("/health/db")
async def db_health(session: AsyncSession = Depends(get_session)):
    try:
        await session.execute(text("SELECT 1"))
        pool = engine.pool
        return {
            "status": "ok",
            "pool_size": pool.size(),
            "checked_in": pool.checkedin(),
            "checked_out": pool.checkedout(),
            "overflow": pool.overflow(),
        }
    except Exception as e:
        return {"status": "error", "detail": str(e)}

Konklusion

asyncpg er det rigtige valg til IoT-platforme med høj query-volumen: 6× hurtigere end psycopg2, fuldt asynkront og med COPY-protokol til bulk-insert der gør daglig MQTT-batch-behandling praktisk. SQLModel + asyncpg via SQLAlchemy 2.0 async er den anbefalede kombination for at bevare ORM-komfort og bruge asyncpg's performance-fordele.

Se TimescaleDB hypertable guide eller FastAPI multi-tenant guide.