M-Bus Gateway
← Tilbage til blog
· TimescaleDB· PostgreSQL· IoT· hypertable· partitionering· retention· tidsserie· database· aflæsninger

TimescaleDB hypertable til IoT — opsætning, partitionering og retention

TimescaleDB hypertable til IoT-aflæsninger: oprettelse, chunk_time_interval, space partitionering, data retention policies, pg_size og integration med SQLModel.

Af M-Bus Gateway

M-Bus Gateway bruger TimescaleDB hypertable til at gemme millioner af wM-Bus aflæsninger. Her er den komplette opsætning.


Hypertable vs. regulær PostgreSQL-tabel

-- Regulær tabel med 5 millioner rækker:
-- SELECT WHERE timestamp BETWEEN ... → Full table scan (sekunder)

-- TimescaleDB hypertable:
-- Automatisk partitioneret i "chunks" baseret på tid
-- SELECT WHERE timestamp BETWEEN ... → Scan kun relevante chunks (ms)

-- Chunk = automatisk sub-tabel pr. tidsinterval
-- Default chunk interval: 7 dage
-- Hvert chunk har eget index → hurtigere parallel scan

-- Hvornår bruge TimescaleDB?
-- ✅ > 100.000 tidsserier-rækker
-- ✅ Hyppige range-queries på timestamp
-- ✅ Behov for automatic compression + retention
-- ✅ Real-time aggregates (continuous aggregates)
-- ✗ Små datasæt < 50.000 rækker — regulær PostgreSQL er fine

Opret hypertable: Migration

-- Alembic migration:
-- server/alembic/versions/0015_timescale_reading.py

-- 1. Opret reading-tabellen normalt:
CREATE TABLE reading (
    meter_installation_id UUID NOT NULL
        REFERENCES meter_installation(id),
    timestamp             TIMESTAMPTZ NOT NULL,
    value                 DOUBLE PRECISION NOT NULL,
    rssi_dbm              REAL,
    battery_level_pct     REAL,
    flags                 INTEGER NOT NULL DEFAULT 0,
    received_at           TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- Compound primary key (krævet af TimescaleDB):
    PRIMARY KEY (meter_installation_id, timestamp)
);

-- 2. Konvertér til hypertable:
SELECT create_hypertable(
    'reading',
    'timestamp',
    chunk_time_interval => INTERVAL '1 week',  -- 1 chunk pr. uge
    -- space partitionering på meter_installation_id:
    -- partitioning_column => 'meter_installation_id',
    -- number_partitions => 4,
    migrate_data => true  -- Ved konvertering af eksisterende data
);

-- 3. Unik index til upsert (ON CONFLICT):
CREATE UNIQUE INDEX ON reading (meter_installation_id, timestamp);

-- 4. Index til API-queries (hurtig hentning pr. installation):
CREATE INDEX ON reading (meter_installation_id, timestamp DESC);

Chunk interval: Vælg korrekt størrelse

-- Chunk størrelse afhænger af query-mønster og ingest-rate:

-- M-Bus Gateway setup:
-- 50 målere × 3 aflæsninger/dag = 150 rækker/dag
-- 50 målere × 365 dage = 18.250 rækker/år
-- 1 uge = 1.050 rækker pr. chunk → passende størrelse

-- Ved større setup (1.000 målere):
-- 1.000 × 3 × 7 = 21.000 rækker pr. uge → overvej 1 dag interval

SELECT set_chunk_time_interval('reading', INTERVAL '7 days');

-- Se chunks og deres størrelse:
SELECT
    chunk_name,
    range_start::date,
    range_end::date,
    pg_size_pretty(total_bytes) AS size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'reading'
ORDER BY range_start DESC
LIMIT 10;

Data retention: Automatisk sletning af gamle chunks

-- Behold 7 år (GDPR bogføringspligt) — slet ældre:
SELECT add_retention_policy(
    'reading',
    INTERVAL '7 years',
    if_not_exists => true
);

-- Tjek retention policy:
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention';

-- Manuel retention (én gang):
SELECT drop_chunks('reading', OLDER_THAN => INTERVAL '7 years');

-- OBS: Retention sletter hele chunks — ikke individuelle rækker.
-- Data ældre end 7 år + op til chunk_time_interval kan slettes.
-- Eks.: chunk 2019-01-01 → 2019-01-08: Hele chunken slettes ved 7-årsgrænse.

Komprimering: 93% pladsbesparelse

-- Aktivér komprimering:
ALTER TABLE reading SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'meter_installation_id',
    timescaledb.compress_orderby = 'timestamp DESC'
);

-- Auto-komprimér chunks ældre end 7 dage:
SELECT add_compression_policy('reading', INTERVAL '7 days');

-- Komprimeringsratio:
SELECT
    chunk_name,
    pg_size_pretty(before_compression_total_bytes) AS before,
    pg_size_pretty(after_compression_total_bytes) AS after,
    round(
        (1 - after_compression_total_bytes::numeric
             / before_compression_total_bytes) * 100, 1
    ) AS pct_saved
FROM chunk_compression_stats('reading')
WHERE compression_status = 'Compressed'
ORDER BY chunk_name;

-- Typisk output:
-- chunk_name       before   after    pct_saved
-- _hyper_1_1_chunk 12 MB    840 kB   93.0

SQLModel integration: Reading model

# server/src/db/models.py
from sqlmodel import SQLModel, Field
from datetime import datetime
from uuid import UUID

class Reading(SQLModel, table=True):
    """
    TimescaleDB hypertable — bemærk: INGEN TenantBase arv.
    Hypertable kræver compound primary key (installation_id + timestamp).
    Tenant-isolation via JOIN til meter_installation → unit → property.
    """
    __tablename__ = "reading"

    meter_installation_id: UUID = Field(
        foreign_key="meter_installation.id",
        primary_key=True,
        index=True,
    )
    timestamp: datetime = Field(primary_key=True)
    value: float
    rssi_dbm: float | None = None
    battery_level_pct: float | None = None
    flags: int = Field(default=0)
    received_at: datetime = Field(default_factory=datetime.utcnow)


# Query med tenant-isolation via JOIN:
async def get_readings(
    tenant_id: UUID,
    installation_id: UUID,
    days: int = 30,
    session: AsyncSession = ...,
) -> list[Reading]:
    stmt = (
        select(Reading)
        .join(MeterInstallation, MeterInstallation.id == Reading.meter_installation_id)
        .join(Unit, Unit.id == MeterInstallation.unit_id)
        .join(Property, Property.id == Unit.property_id)
        .where(
            Property.tenant_id == tenant_id,
            Reading.meter_installation_id == installation_id,
            Reading.timestamp >= datetime.utcnow() - timedelta(days=days),
        )
        .order_by(Reading.timestamp.desc())
    )
    result = await session.execute(stmt)
    return result.scalars().all()

Størrelse og performance monitoring

-- Total hypertable størrelse:
SELECT
    hypertable_name,
    pg_size_pretty(hypertable_size(format('%I', hypertable_name)::regclass)) AS size,
    num_chunks
FROM timescaledb_information.hypertables;

-- Rækker pr. dag (ingest-rate):
SELECT
    time_bucket('1 day', timestamp) AS day,
    COUNT(*) AS row_count
FROM reading
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;

-- Query-plan: Tjek at kun relevante chunks scannes:
EXPLAIN (ANALYZE, BUFFERS)
SELECT MAX(value) - MIN(value) AS consumption
FROM reading
WHERE meter_installation_id = 'uuid-here'
  AND timestamp BETWEEN '2026-01-01' AND '2026-02-01';
-- → Tjek "Chunks excluded" > 0 i output

Konklusion

TimescaleDB hypertable giver automatisk tidspartitionering, transparent komprimering (93%) og kontinuerlige aggregater. Reading-tabellen bruger compound primary key (installation_id + timestamp) — ikke TenantBase — fordi TimescaleDB kræver timestamp som partitionsnøgle. Tenant-isolation sikres via JOIN-kæden Property → Unit → MeterInstallation → Reading. Data retention policy sletter automatisk data ældre end 7 år.

Se TimescaleDB continuous aggregates guide eller asyncpg guide.