M-Bus Gateway
← Tilbage til blog
· asyncpg· PostgreSQL· Python· async· database· performance· backend· IoT

asyncpg connection pool — mønstre til høj-performance PostgreSQL

asyncpg connection pool i Python: pool_size tuning, prepared statements, COPY til bulk insert, listen/notify, cursors til store datasæt og monitoring.

Af M-Bus Gateway

asyncpg er den hurtigste PostgreSQL-driver til Python. Her er produktionsmønstrene til høj-volumen IoT-data.


Pool-konfiguration

# server/src/db.py

import asyncpg
from server.src.config import get_settings

_pool: asyncpg.Pool | None = None


async def init_pool() -> asyncpg.Pool:
    global _pool
    settings = get_settings()

    _pool = await asyncpg.create_pool(
        dsn=settings.database_url_str.replace("+asyncpg", ""),
        min_size=5,           # Minimum forbindelser i pool
        max_size=20,          # Maximum forbindelser i pool
        max_queries=50_000,   # Max queries pr. forbindelse inden den recycles
        max_inactive_connection_lifetime=300,  # Luk inactive connections efter 5 min
        command_timeout=30,   # Max 30 sekunder pr. query
        statement_cache_size=200,  # Cache prepared statements (0 = disabled)
        init=_init_connection,     # Kald pr. ny forbindelse
    )
    return _pool


async def _init_connection(conn: asyncpg.Connection) -> None:
    """Sæt op pr. forbindelse: search_path, type codecs, osv."""
    # UUID-decoding til Python UUID-objekter
    await conn.set_type_codec(
        "uuid",
        encoder=str,
        decoder=lambda s: s,
        schema="pg_catalog",
        format="text",
    )
    # TimescaleDB: Set work_mem for tunge queries
    await conn.execute("SET work_mem = '64MB'")


async def get_pool() -> asyncpg.Pool:
    if _pool is None:
        return await init_pool()
    return _pool


async def close_pool() -> None:
    if _pool:
        await _pool.close()

COPY til bulk insert af readings

# server/src/mqtt/ingest.py

import asyncpg
from datetime import datetime
from typing import AsyncIterator


async def bulk_insert_readings(
    pool: asyncpg.Pool,
    readings: list[dict],
) -> int:
    """
    COPY FROM er 10-50× hurtigere end INSERT pr. rækker.
    Bruges ved daglig gateway-payload (potentielt 10.000+ readings på én gang).
    """
    async with pool.acquire() as conn:
        result = await conn.copy_records_to_table(
            "reading",
            records=[
                (
                    r["meter_installation_id"],
                    r["value"],
                    r["unit"],
                    r["meter_type"],
                    datetime.fromisoformat(r["timestamp"]),
                    r.get("battery_level_pct"),
                    r.get("rssi_dbm"),
                    datetime.utcnow(),  # received_at
                )
                for r in readings
            ],
            columns=[
                "meter_installation_id", "value", "unit", "meter_type",
                "timestamp", "battery_level_pct", "rssi_dbm", "received_at"
            ],
        )
        return int(result.split()[-1])  # "COPY N" → N


async def upsert_readings(
    pool: asyncpg.Pool,
    readings: list[dict],
) -> None:
    """Alternativ: executemany med ON CONFLICT DO UPDATE (idempotent)."""
    async with pool.acquire() as conn:
        await conn.executemany(
            """
            INSERT INTO reading (
                meter_installation_id, value, unit, meter_type,
                timestamp, battery_level_pct, rssi_dbm, received_at
            ) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
            ON CONFLICT (meter_installation_id, timestamp) DO UPDATE
                SET value = EXCLUDED.value,
                    battery_level_pct = EXCLUDED.battery_level_pct,
                    rssi_dbm = EXCLUDED.rssi_dbm
            """,
            [
                (
                    r["meter_installation_id"], r["value"], r["unit"],
                    r["meter_type"], datetime.fromisoformat(r["timestamp"]),
                    r.get("battery_level_pct"), r.get("rssi_dbm"),
                )
                for r in readings
            ],
        )

Prepared statements til hyppige queries

# server/src/readings/queries.py

import asyncpg

# Prepared statements kompileres én gang og genbruges:
GET_LATEST_READING_STMT = """
    SELECT value, unit, timestamp, battery_level_pct
    FROM reading
    WHERE meter_installation_id = $1
      AND deleted_at IS NULL
    ORDER BY timestamp DESC
    LIMIT 1
"""

GET_MONTHLY_CONSUMPTION_STMT = """
    SELECT
        DATE_TRUNC('month', timestamp) AS month,
        MAX(value) - MIN(value) AS consumption
    FROM reading
    WHERE meter_installation_id = $1
      AND timestamp BETWEEN $2 AND $3
      AND deleted_at IS NULL
    GROUP BY 1
    ORDER BY 1
"""


async def get_latest_reading(
    conn: asyncpg.Connection,
    installation_id: str,
) -> asyncpg.Record | None:
    # asyncpg cacher prepared statement automatisk ved gentagne kald
    return await conn.fetchrow(GET_LATEST_READING_STMT, installation_id)


async def get_monthly_consumption(
    conn: asyncpg.Connection,
    installation_id: str,
    start: datetime,
    end: datetime,
) -> list[asyncpg.Record]:
    return await conn.fetch(GET_MONTHLY_CONSUMPTION_STMT, installation_id, start, end)

Cursor til store datasæt

# server/src/export/readings.py

import asyncpg
from typing import AsyncIterator


async def stream_readings_for_export(
    pool: asyncpg.Pool,
    property_id: str,
    period_start: datetime,
    period_end: datetime,
) -> AsyncIterator[dict]:
    """
    Server-side cursor: Streamer store datasæt uden at loade alt i RAM.
    Kritisk ved CSV-eksport af mange år med readings.
    """
    async with pool.acquire() as conn:
        async with conn.transaction():
            # Kræver transaction for server-side cursor
            async for record in conn.cursor(
                """
                SELECT r.timestamp, r.value, r.unit, r.meter_type,
                       r.battery_level_pct, r.rssi_dbm,
                       mi.unit_id, u.label AS unit_label
                FROM reading r
                JOIN meter_installation mi ON r.meter_installation_id = mi.id
                JOIN unit u ON mi.unit_id = u.id
                WHERE u.property_id = $1
                  AND r.timestamp BETWEEN $2 AND $3
                  AND r.deleted_at IS NULL
                ORDER BY r.timestamp
                """,
                property_id, period_start, period_end,
                prefetch=1000,  # Hent 1000 rækker ad gangen fra server
            ):
                yield dict(record)

LISTEN/NOTIFY til realtime events

# server/src/realtime/listener.py

import asyncpg
import asyncio
import json
import structlog

log = structlog.get_logger()


async def setup_notifications(pool: asyncpg.Pool) -> None:
    """
    PostgreSQL LISTEN/NOTIFY til realtime gateway-events.
    Supplerer MQTT ved interne events der ikke kommer fra gateway.
    """
    conn = await pool.acquire()

    async def handle_notification(
        conn: asyncpg.Connection,
        pid: int,
        channel: str,
        payload: str,
    ) -> None:
        try:
            data = json.loads(payload)
            log.info("PostgreSQL notification", channel=channel, data=data)
            await dispatch_notification(channel, data)
        except Exception as exc:
            log.error("Notification handler failed", exc=str(exc))

    await conn.add_listener("gateway_alarm", handle_notification)
    await conn.add_listener("settlement_generated", handle_notification)

    # Hold forbindelsen åben (ligesom MQTT subscriber)
    log.info("PostgreSQL LISTEN active on: gateway_alarm, settlement_generated")


# Trigger i PostgreSQL (kørsel ved migrering):
CREATE_TRIGGER_SQL = """
CREATE OR REPLACE FUNCTION notify_gateway_alarm()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify(
    'gateway_alarm',
    json_build_object(
      'gateway_id', NEW.gateway_id,
      'alarm_type', NEW.alarm_type,
      'severity', NEW.severity
    )::text
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER after_alarm_insert
AFTER INSERT ON alarm
FOR EACH ROW EXECUTE FUNCTION notify_gateway_alarm();
"""

Pool monitoring

# server/src/monitoring/db.py

async def get_pool_stats(pool: asyncpg.Pool) -> dict:
    """Pool-statistik til monitoring/alerting."""
    return {
        "size": pool.get_size(),
        "free": pool.get_idle_size(),
        "in_use": pool.get_size() - pool.get_idle_size(),
        "max_size": pool.get_max_size(),
        "min_size": pool.get_min_size(),
        "utilization_pct": round(
            (pool.get_size() - pool.get_idle_size()) / pool.get_max_size() * 100, 1
        ),
    }

# Alert: Hvis utilization > 80% i > 30 sekunder → scale up pool_size

Konklusion

asyncpg's COPY-protokol er essentiel til daglig bulk-indsættelse af IoT-readings (10-50× hurtigere end INSERT). Server-side cursors forhindrer OOM-fejl ved CSV-eksport af store datasæt. Statement-cache og connection pool optimering sikrer lav latens ved samtidige requests. LISTEN/NOTIFY muliggør realtids-events uden polling.

Se asyncpg PostgreSQL guide eller TimescaleDB hypertable guide.