· asyncpg· PostgreSQL· Python· async· database· performance· backend· IoT
asyncpg connection pool — mønstre til høj-performance PostgreSQL
asyncpg connection pool i Python: pool_size tuning, prepared statements, COPY til bulk insert, listen/notify, cursors til store datasæt og monitoring.
Af M-Bus Gateway
asyncpg er den hurtigste PostgreSQL-driver til Python. Her er produktionsmønstrene til høj-volumen IoT-data.
Pool-konfiguration
# server/src/db.py
import asyncpg
from server.src.config import get_settings
_pool: asyncpg.Pool | None = None
async def init_pool() -> asyncpg.Pool:
global _pool
settings = get_settings()
_pool = await asyncpg.create_pool(
dsn=settings.database_url_str.replace("+asyncpg", ""),
min_size=5, # Minimum forbindelser i pool
max_size=20, # Maximum forbindelser i pool
max_queries=50_000, # Max queries pr. forbindelse inden den recycles
max_inactive_connection_lifetime=300, # Luk inactive connections efter 5 min
command_timeout=30, # Max 30 sekunder pr. query
statement_cache_size=200, # Cache prepared statements (0 = disabled)
init=_init_connection, # Kald pr. ny forbindelse
)
return _pool
async def _init_connection(conn: asyncpg.Connection) -> None:
"""Sæt op pr. forbindelse: search_path, type codecs, osv."""
# UUID-decoding til Python UUID-objekter
await conn.set_type_codec(
"uuid",
encoder=str,
decoder=lambda s: s,
schema="pg_catalog",
format="text",
)
# TimescaleDB: Set work_mem for tunge queries
await conn.execute("SET work_mem = '64MB'")
async def get_pool() -> asyncpg.Pool:
if _pool is None:
return await init_pool()
return _pool
async def close_pool() -> None:
if _pool:
await _pool.close()
COPY til bulk insert af readings
# server/src/mqtt/ingest.py
import asyncpg
from datetime import datetime
from typing import AsyncIterator
async def bulk_insert_readings(
pool: asyncpg.Pool,
readings: list[dict],
) -> int:
"""
COPY FROM er 10-50× hurtigere end INSERT pr. rækker.
Bruges ved daglig gateway-payload (potentielt 10.000+ readings på én gang).
"""
async with pool.acquire() as conn:
result = await conn.copy_records_to_table(
"reading",
records=[
(
r["meter_installation_id"],
r["value"],
r["unit"],
r["meter_type"],
datetime.fromisoformat(r["timestamp"]),
r.get("battery_level_pct"),
r.get("rssi_dbm"),
datetime.utcnow(), # received_at
)
for r in readings
],
columns=[
"meter_installation_id", "value", "unit", "meter_type",
"timestamp", "battery_level_pct", "rssi_dbm", "received_at"
],
)
return int(result.split()[-1]) # "COPY N" → N
async def upsert_readings(
pool: asyncpg.Pool,
readings: list[dict],
) -> None:
"""Alternativ: executemany med ON CONFLICT DO UPDATE (idempotent)."""
async with pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO reading (
meter_installation_id, value, unit, meter_type,
timestamp, battery_level_pct, rssi_dbm, received_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (meter_installation_id, timestamp) DO UPDATE
SET value = EXCLUDED.value,
battery_level_pct = EXCLUDED.battery_level_pct,
rssi_dbm = EXCLUDED.rssi_dbm
""",
[
(
r["meter_installation_id"], r["value"], r["unit"],
r["meter_type"], datetime.fromisoformat(r["timestamp"]),
r.get("battery_level_pct"), r.get("rssi_dbm"),
)
for r in readings
],
)
Prepared statements til hyppige queries
# server/src/readings/queries.py
import asyncpg
# Prepared statements kompileres én gang og genbruges:
GET_LATEST_READING_STMT = """
SELECT value, unit, timestamp, battery_level_pct
FROM reading
WHERE meter_installation_id = $1
AND deleted_at IS NULL
ORDER BY timestamp DESC
LIMIT 1
"""
GET_MONTHLY_CONSUMPTION_STMT = """
SELECT
DATE_TRUNC('month', timestamp) AS month,
MAX(value) - MIN(value) AS consumption
FROM reading
WHERE meter_installation_id = $1
AND timestamp BETWEEN $2 AND $3
AND deleted_at IS NULL
GROUP BY 1
ORDER BY 1
"""
async def get_latest_reading(
conn: asyncpg.Connection,
installation_id: str,
) -> asyncpg.Record | None:
# asyncpg cacher prepared statement automatisk ved gentagne kald
return await conn.fetchrow(GET_LATEST_READING_STMT, installation_id)
async def get_monthly_consumption(
conn: asyncpg.Connection,
installation_id: str,
start: datetime,
end: datetime,
) -> list[asyncpg.Record]:
return await conn.fetch(GET_MONTHLY_CONSUMPTION_STMT, installation_id, start, end)
Cursor til store datasæt
# server/src/export/readings.py
import asyncpg
from typing import AsyncIterator
async def stream_readings_for_export(
pool: asyncpg.Pool,
property_id: str,
period_start: datetime,
period_end: datetime,
) -> AsyncIterator[dict]:
"""
Server-side cursor: Streamer store datasæt uden at loade alt i RAM.
Kritisk ved CSV-eksport af mange år med readings.
"""
async with pool.acquire() as conn:
async with conn.transaction():
# Kræver transaction for server-side cursor
async for record in conn.cursor(
"""
SELECT r.timestamp, r.value, r.unit, r.meter_type,
r.battery_level_pct, r.rssi_dbm,
mi.unit_id, u.label AS unit_label
FROM reading r
JOIN meter_installation mi ON r.meter_installation_id = mi.id
JOIN unit u ON mi.unit_id = u.id
WHERE u.property_id = $1
AND r.timestamp BETWEEN $2 AND $3
AND r.deleted_at IS NULL
ORDER BY r.timestamp
""",
property_id, period_start, period_end,
prefetch=1000, # Hent 1000 rækker ad gangen fra server
):
yield dict(record)
LISTEN/NOTIFY til realtime events
# server/src/realtime/listener.py
import asyncpg
import asyncio
import json
import structlog
log = structlog.get_logger()
async def setup_notifications(pool: asyncpg.Pool) -> None:
"""
PostgreSQL LISTEN/NOTIFY til realtime gateway-events.
Supplerer MQTT ved interne events der ikke kommer fra gateway.
"""
conn = await pool.acquire()
async def handle_notification(
conn: asyncpg.Connection,
pid: int,
channel: str,
payload: str,
) -> None:
try:
data = json.loads(payload)
log.info("PostgreSQL notification", channel=channel, data=data)
await dispatch_notification(channel, data)
except Exception as exc:
log.error("Notification handler failed", exc=str(exc))
await conn.add_listener("gateway_alarm", handle_notification)
await conn.add_listener("settlement_generated", handle_notification)
# Hold forbindelsen åben (ligesom MQTT subscriber)
log.info("PostgreSQL LISTEN active on: gateway_alarm, settlement_generated")
# Trigger i PostgreSQL (kørsel ved migrering):
CREATE_TRIGGER_SQL = """
CREATE OR REPLACE FUNCTION notify_gateway_alarm()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'gateway_alarm',
json_build_object(
'gateway_id', NEW.gateway_id,
'alarm_type', NEW.alarm_type,
'severity', NEW.severity
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER after_alarm_insert
AFTER INSERT ON alarm
FOR EACH ROW EXECUTE FUNCTION notify_gateway_alarm();
"""
Pool monitoring
# server/src/monitoring/db.py
async def get_pool_stats(pool: asyncpg.Pool) -> dict:
"""Pool-statistik til monitoring/alerting."""
return {
"size": pool.get_size(),
"free": pool.get_idle_size(),
"in_use": pool.get_size() - pool.get_idle_size(),
"max_size": pool.get_max_size(),
"min_size": pool.get_min_size(),
"utilization_pct": round(
(pool.get_size() - pool.get_idle_size()) / pool.get_max_size() * 100, 1
),
}
# Alert: Hvis utilization > 80% i > 30 sekunder → scale up pool_size
Konklusion
asyncpg's COPY-protokol er essentiel til daglig bulk-indsættelse af IoT-readings (10-50× hurtigere end INSERT). Server-side cursors forhindrer OOM-fejl ved CSV-eksport af store datasæt. Statement-cache og connection pool optimering sikrer lav latens ved samtidige requests. LISTEN/NOTIFY muliggør realtids-events uden polling.
Se asyncpg PostgreSQL guide eller TimescaleDB hypertable guide.