M-Bus Gateway
← Tilbage til blog
· TimescaleDB· continuous aggregates· IoT· PostgreSQL· varmemålinger· aggregering· performance· SQL

TimescaleDB continuous aggregates — realtidsaggregering af IoT-data

TimescaleDB continuous aggregates til IoT: daglige/månedlige aggregater af varmemålinger, refresh policies, real-time aggregation og query performance.

Af M-Bus Gateway

M-Bus Gateway platformen bruger TimescaleDB continuous aggregates til at pre-aggregere daglige og månedlige varmemålinger. Her er opsætningen.


Problemet med rå aflæsninger

-- Without aggregates: Beregn månedlige summaries fra 50.000 rækker:
SELECT
  date_trunc('month', timestamp) AS month,
  meter_installation_id,
  MAX(value) - MIN(value) AS consumption
FROM reading
WHERE timestamp >= '2025-06-01' AND timestamp < '2026-06-01'
GROUP BY 1, 2
ORDER BY 1;

-- Med 50 målere × 365 dage × 3 aflæsninger/dag = 54.750 rækker
-- Full table scan: ~500ms

-- Med continuous aggregate: <5ms (pre-beregnet)

Opret continuous aggregate: Daglig

-- TimescaleDB continuous aggregate — daglige aflæsninger pr. installation:
CREATE MATERIALIZED VIEW daily_reading_summary
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 day', timestamp) AS day,
  meter_installation_id,
  tenant_id,
  MAX(value) AS max_value,
  MIN(value) AS min_value,
  MAX(value) - MIN(value) AS daily_delta,
  AVG(rssi_dbm) AS avg_rssi,
  AVG(battery_level_pct) AS avg_battery,
  COUNT(*) AS reading_count
FROM reading
GROUP BY 1, 2, 3
WITH NO DATA;

-- Refresh policy: Opdatér seneste 7 dage automatisk:
SELECT add_continuous_aggregate_policy(
  'daily_reading_summary',
  start_offset => INTERVAL '7 days',
  end_offset   => INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour'
);

Månedlig aggregate (oven på daglig)

-- Månedlig aggregate — bygger på den daglige view (cascade):
CREATE MATERIALIZED VIEW monthly_reading_summary
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 month', day) AS month,
  meter_installation_id,
  tenant_id,
  SUM(daily_delta) AS monthly_consumption,
  AVG(avg_rssi) AS avg_rssi,
  AVG(avg_battery) AS avg_battery,
  SUM(reading_count) AS total_readings
FROM daily_reading_summary
GROUP BY 1, 2, 3
WITH NO DATA;

-- Real-time aggregation: Inkludér ikke-materialiserede data:
SELECT add_continuous_aggregate_policy(
  'monthly_reading_summary',
  start_offset => INTERVAL '3 months',
  end_offset   => INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day'
);

Query med continuous aggregates

# server/src/readings/router.py

@router.get("/monthly")
async def get_monthly_readings(
    unit_id: str | None = None,
    installation_id: str | None = None,
    user: TokenPayload = Depends(get_current_user),
    session: AsyncSession = Depends(get_session),
):
    """
    Månedligt forbrug — queries monthly_reading_summary (pre-aggregeret).
    Resulterer i <5ms response vs. 500ms på rå aflæsninger.
    """
    stmt = (
        select(
            func.date_trunc("month", MonthlyReadingSummary.month).label("month"),
            MonthlyReadingSummary.meter_installation_id,
            func.sum(MonthlyReadingSummary.monthly_consumption).label("consumption"),
        )
        .where(MonthlyReadingSummary.tenant_id == user.tenant_id)
        .group_by(text("1"), MonthlyReadingSummary.meter_installation_id)
        .order_by(text("1"))
    )

    if unit_id:
        # Join til meter_installations for unit_id filter:
        stmt = stmt.join(
            MeterInstallation,
            MeterInstallation.id == MonthlyReadingSummary.meter_installation_id,
        ).where(MeterInstallation.unit_id == unit_id)

    result = await session.execute(stmt)
    return [
        {"month": row.month.isoformat(), "consumption": row.consumption}
        for row in result
    ]

Komprimering: 93% pladsbesparelse

-- Aktivér TimescaleDB komprimering på reading-hypertablen:
ALTER TABLE reading SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'meter_installation_id',
  timescaledb.compress_orderby = 'timestamp DESC'
);

-- Komprimér automatisk chunks ældre end 7 dage:
SELECT add_compression_policy('reading', INTERVAL '7 days');

-- Tjek komprimeringsratio:
SELECT
  chunk_name,
  pg_size_pretty(before_compression_total_bytes) AS before,
  pg_size_pretty(after_compression_total_bytes) AS after,
  round((1 - after_compression_total_bytes::numeric /
    before_compression_total_bytes) * 100, 1) AS ratio_pct
FROM chunk_compression_stats('reading')
ORDER BY chunk_name;

-- Typisk output:
-- chunk_name       before    after     ratio_pct
-- _hyper_1_1_chunk 12 MB     840 kB    93.0

Real-time: Inkludér ikke-materialiserede data

-- TimescaleDB continuous aggregates understøtter real-time aggregation:
-- Returnerer pre-aggregerede + live (ikke-refreshede) data i ét query.

-- Standard query — kun materialiserede data:
SELECT * FROM daily_reading_summary
WHERE day >= now() - INTERVAL '7 days';

-- Med real-time aggregation (aktiveret som default):
-- Samme query — inkluderer automatisk de seneste ikke-refreshede data
-- Ingen kodeændring nødvendig i applikationen

Vedligeholdelse og monitoring

-- Monitorer continuous aggregate backlog:
SELECT
  view_name,
  materialization_hypertable_name,
  last_run_status,
  job_status,
  next_scheduled_run
FROM timescaledb_information.continuous_aggregates ca
JOIN timescaledb_information.jobs j ON j.job_id = (
  SELECT job_id FROM timescaledb_information.continuous_aggregate_policies
  WHERE materialization_hypertable = ca.materialization_hypertable_name
  LIMIT 1
);

-- Manuel refresh (ved migration/recovery):
CALL refresh_continuous_aggregate('daily_reading_summary',
  '2025-01-01', '2026-01-01');

Konklusion

TimescaleDB continuous aggregates reducerer query-tid fra 500ms til under 5ms for månedlige summaries — uden at ændre application-kode. Daglige aggregater bygges automatisk, månedlige aggregater cascades fra daglige. Komprimering giver 93% pladsbesparelse på historiske data. Real-time aggregation sikrer at seneste data altid er med.

Se TimescaleDB hypertable guide eller asyncpg guide.