· TimescaleDB· continuous aggregates· IoT· PostgreSQL· varmemålinger· aggregering· performance· SQL
TimescaleDB continuous aggregates — realtidsaggregering af IoT-data
TimescaleDB continuous aggregates til IoT: daglige/månedlige aggregater af varmemålinger, refresh policies, real-time aggregation og query performance.
Af M-Bus Gateway
M-Bus Gateway platformen bruger TimescaleDB continuous aggregates til at pre-aggregere daglige og månedlige varmemålinger. Her er opsætningen.
Problemet med rå aflæsninger
-- Without aggregates: Beregn månedlige summaries fra 50.000 rækker:
SELECT
date_trunc('month', timestamp) AS month,
meter_installation_id,
MAX(value) - MIN(value) AS consumption
FROM reading
WHERE timestamp >= '2025-06-01' AND timestamp < '2026-06-01'
GROUP BY 1, 2
ORDER BY 1;
-- Med 50 målere × 365 dage × 3 aflæsninger/dag = 54.750 rækker
-- Full table scan: ~500ms
-- Med continuous aggregate: <5ms (pre-beregnet)
Opret continuous aggregate: Daglig
-- TimescaleDB continuous aggregate — daglige aflæsninger pr. installation:
CREATE MATERIALIZED VIEW daily_reading_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', timestamp) AS day,
meter_installation_id,
tenant_id,
MAX(value) AS max_value,
MIN(value) AS min_value,
MAX(value) - MIN(value) AS daily_delta,
AVG(rssi_dbm) AS avg_rssi,
AVG(battery_level_pct) AS avg_battery,
COUNT(*) AS reading_count
FROM reading
GROUP BY 1, 2, 3
WITH NO DATA;
-- Refresh policy: Opdatér seneste 7 dage automatisk:
SELECT add_continuous_aggregate_policy(
'daily_reading_summary',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
Månedlig aggregate (oven på daglig)
-- Månedlig aggregate — bygger på den daglige view (cascade):
CREATE MATERIALIZED VIEW monthly_reading_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', day) AS month,
meter_installation_id,
tenant_id,
SUM(daily_delta) AS monthly_consumption,
AVG(avg_rssi) AS avg_rssi,
AVG(avg_battery) AS avg_battery,
SUM(reading_count) AS total_readings
FROM daily_reading_summary
GROUP BY 1, 2, 3
WITH NO DATA;
-- Real-time aggregation: Inkludér ikke-materialiserede data:
SELECT add_continuous_aggregate_policy(
'monthly_reading_summary',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day'
);
Query med continuous aggregates
# server/src/readings/router.py
@router.get("/monthly")
async def get_monthly_readings(
unit_id: str | None = None,
installation_id: str | None = None,
user: TokenPayload = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""
Månedligt forbrug — queries monthly_reading_summary (pre-aggregeret).
Resulterer i <5ms response vs. 500ms på rå aflæsninger.
"""
stmt = (
select(
func.date_trunc("month", MonthlyReadingSummary.month).label("month"),
MonthlyReadingSummary.meter_installation_id,
func.sum(MonthlyReadingSummary.monthly_consumption).label("consumption"),
)
.where(MonthlyReadingSummary.tenant_id == user.tenant_id)
.group_by(text("1"), MonthlyReadingSummary.meter_installation_id)
.order_by(text("1"))
)
if unit_id:
# Join til meter_installations for unit_id filter:
stmt = stmt.join(
MeterInstallation,
MeterInstallation.id == MonthlyReadingSummary.meter_installation_id,
).where(MeterInstallation.unit_id == unit_id)
result = await session.execute(stmt)
return [
{"month": row.month.isoformat(), "consumption": row.consumption}
for row in result
]
Komprimering: 93% pladsbesparelse
-- Aktivér TimescaleDB komprimering på reading-hypertablen:
ALTER TABLE reading SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'meter_installation_id',
timescaledb.compress_orderby = 'timestamp DESC'
);
-- Komprimér automatisk chunks ældre end 7 dage:
SELECT add_compression_policy('reading', INTERVAL '7 days');
-- Tjek komprimeringsratio:
SELECT
chunk_name,
pg_size_pretty(before_compression_total_bytes) AS before,
pg_size_pretty(after_compression_total_bytes) AS after,
round((1 - after_compression_total_bytes::numeric /
before_compression_total_bytes) * 100, 1) AS ratio_pct
FROM chunk_compression_stats('reading')
ORDER BY chunk_name;
-- Typisk output:
-- chunk_name before after ratio_pct
-- _hyper_1_1_chunk 12 MB 840 kB 93.0
Real-time: Inkludér ikke-materialiserede data
-- TimescaleDB continuous aggregates understøtter real-time aggregation:
-- Returnerer pre-aggregerede + live (ikke-refreshede) data i ét query.
-- Standard query — kun materialiserede data:
SELECT * FROM daily_reading_summary
WHERE day >= now() - INTERVAL '7 days';
-- Med real-time aggregation (aktiveret som default):
-- Samme query — inkluderer automatisk de seneste ikke-refreshede data
-- Ingen kodeændring nødvendig i applikationen
Vedligeholdelse og monitoring
-- Monitorer continuous aggregate backlog:
SELECT
view_name,
materialization_hypertable_name,
last_run_status,
job_status,
next_scheduled_run
FROM timescaledb_information.continuous_aggregates ca
JOIN timescaledb_information.jobs j ON j.job_id = (
SELECT job_id FROM timescaledb_information.continuous_aggregate_policies
WHERE materialization_hypertable = ca.materialization_hypertable_name
LIMIT 1
);
-- Manuel refresh (ved migration/recovery):
CALL refresh_continuous_aggregate('daily_reading_summary',
'2025-01-01', '2026-01-01');
Konklusion
TimescaleDB continuous aggregates reducerer query-tid fra 500ms til under 5ms for månedlige summaries — uden at ændre application-kode. Daglige aggregater bygges automatisk, månedlige aggregater cascades fra daglige. Komprimering giver 93% pladsbesparelse på historiske data. Real-time aggregation sikrer at seneste data altid er med.