Historisk datamigration til varmeregnskabsplatform — fra Excel og CSV
Migration af historisk varmedata til platform: Excel-import af HCA-aflæsninger, historiske årsafregninger, AES-nøgler, graddage-rekonstruktion, validering og juridisk opbevaringskrav.
Af M-Bus Gateway
Migration af historisk varmedata er den vigtigste og mest tidskrævende del af platform-onboarding. Her er den strukturerede proces.
Hvad skal migreres
Historisk data — 5-årig opbevaringspligt (bogføringsloven):
Kategori 1: Årsafregninger (KRITISK)
→ PDF-afregninger 2020-2025 pr. lejlighed
→ Fordeling: HCA-enheder, fast del, m²
→ Grundlag: AnnualInput (fjernvarmeregning + fordeling)
→ Platform: Upload som PropertyDocument (category="settlement")
Kategori 2: HCA-aflæsningshistorik
→ Slutaflæsninger pr. regnskabsår pr. måler
→ Vigtigt for: Pro-rata beregning ved fraflytning (5-årig bagkant)
→ Format: Excel med (dato, måler-ID, aflæsningsværdi, enhed)
Kategori 3: Lejer-historik
→ Ind- og fraflytningsdatoer (pro-rata grundlag)
→ Á conto-beløb pr. lejer pr. periode
→ Vigtig for: Indsigelsesret 5 år tilbage
Kategori 4: Teknisk data
→ Installatørdata: Måler-ID, placering, installationsdato
→ AES-nøgler pr. krypteret måler
→ Kalibreringscertifikater (MID-dokumentation)
Excel-import af HCA-historik
# server/src/tools/import_historical_readings.py
import pandas as pd
from datetime import date
from decimal import Decimal
def import_hca_excel(
excel_path: str,
property_id: str,
meter_mapping: dict[str, str], # Excel-ID → meter_installation_id
) -> list[dict]:
"""
Importér historiske HCA-slutaflæsninger fra Excel.
Forventet Excel-format:
| Dato | Måler-ID | Aflæsning | Enhed | Lejer |
| 2025-05-31 | 12345678 | 4521 | units | Jensen|
"""
df = pd.read_excel(excel_path, parse_dates=["Dato"])
readings = []
errors = []
for _, row in df.iterrows():
meter_id = str(row["Måler-ID"]).strip()
installation_id = meter_mapping.get(meter_id)
if not installation_id:
errors.append(f"Ukendt måler-ID: {meter_id} (række {_})")
continue
readings.append({
"meter_installation_id": installation_id,
"timestamp": row["Dato"].to_pydatetime(),
"value": Decimal(str(row["Aflæsning"])),
"unit": str(row.get("Enhed", "units")),
"source": "historical_import",
"imported_from": excel_path,
})
if errors:
raise ValueError(f"Import-fejl:\n" + "\n".join(errors))
return readings
# Bulk-insert via asyncpg COPY:
async def bulk_insert_historical(session: AsyncSession, readings: list[dict]):
from sqlalchemy import insert
await session.execute(
insert(Reading).on_conflict_do_nothing(
index_elements=["meter_installation_id", "timestamp"]
),
readings,
)
await session.commit()
Historiske årsafregninger som documents
# Migration: Upload eksisterende PDF-afregninger
import pathlib
async def migrate_settlement_pdfs(
property_id: uuid.UUID,
pdf_dir: pathlib.Path,
session: AsyncSession,
s3_client,
) -> int:
"""
Upload historiske PDF-afregninger til S3 + opret PropertyDocument-poster.
Navngivningskonvention: {unit_number}_{year}.pdf
"""
count = 0
for pdf_path in pdf_dir.glob("*.pdf"):
# Parse filnavn: "3A_2023.pdf"
parts = pdf_path.stem.split("_")
if len(parts) != 2:
log.warning("Ukendt filnavn-format", path=str(pdf_path))
continue
unit_number, year = parts[0], int(parts[1])
unit = await get_unit_by_number(session, property_id, unit_number)
if not unit:
log.warning("Enhed ikke fundet", unit_number=unit_number)
continue
# Upload til S3:
s3_key = f"documents/{property_id}/historical/{pdf_path.name}"
async with open(pdf_path, "rb") as f:
await s3_client.upload_fileobj(f, settings.s3_bucket, s3_key)
# Gem som PropertyDocument:
await session.execute(
insert(PropertyDocument).values(
property_id=property_id,
unit_id=unit.id,
category="settlement",
filename=pdf_path.name,
s3_key=s3_key,
document_date=date(year, 9, 30), # Typisk september
notes=f"Historisk import fra {pdf_dir.name}",
)
)
count += 1
await session.commit()
return count
Graddage-rekonstruktion for historiske perioder
# Historiske graddage nødvendige for normalisering af gamle år:
async def backfill_degree_days(
zip_code: str,
years_back: int = 5,
) -> int:
"""
Hent historiske graddage fra DMI for de seneste N år.
Kræves til korrekt år-til-år sammenligning i analyse-dashboards.
"""
from datetime import date, timedelta
end = date.today()
start = date(end.year - years_back, 1, 1)
degree_days = await fetch_degree_days_range(zip_code, start, end)
async with get_async_session() as session:
for entry in degree_days:
await session.execute(
insert(DegreeDay)
.values(
date=entry["date"],
zip_code=zip_code,
country="DK",
heating_degree_days=entry["hdd"],
)
.on_conflict_do_nothing(
index_elements=["date", "zip_code", "country"]
)
)
await session.commit()
return len(degree_days)
Valideringsrapport
# Migrationens validering:
@router.get("/admin/migration/validate/{property_id}")
async def validate_migration(
property_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(require_role("super_admin")),
) -> dict:
"""
Valider om migreret data er komplet og konsistent.
Køres af super_admin efter import.
"""
checks = {}
# 1. Alle enheder har occupancy-historik:
units = await get_all_units(session, property_id)
units_with_occ = await count_units_with_occupancy(session, property_id)
checks["occupancy_coverage"] = {
"ok": units_with_occ == len(units),
"value": f"{units_with_occ}/{len(units)}",
}
# 2. HCA-aflæsninger dækker 5 regnskabsår:
oldest_reading = await get_oldest_reading(session, property_id)
checks["reading_history"] = {
"ok": oldest_reading and oldest_reading.year <= date.today().year - 4,
"value": str(oldest_reading) if oldest_reading else "ingen",
}
# 3. Ingen meter-installation-huller:
gaps = await find_installation_gaps(session, property_id)
checks["installation_continuity"] = {
"ok": len(gaps) == 0,
"gaps": len(gaps),
}
# 4. AES-nøgler til alle krypterede målere:
encrypted = await count_encrypted_meters(session, property_id)
with_keys = await count_meters_with_keys(session, property_id)
checks["aes_key_coverage"] = {
"ok": encrypted == with_keys,
"value": f"{with_keys}/{encrypted}",
}
return {
"property_id": str(property_id),
"all_ok": all(c["ok"] for c in checks.values()),
"checks": checks,
}
Juridisk opbevaringskrav
Bogføringsloven § 10: 5 år fra regnskabsårets afslutning
BEK 563 § 9 stk. 4: Dokumentation opbevares mindst 5 år
Praktisk konsekvens:
2026 migration: Importer data fra 2021-2025 (5 år)
2020 og ældre: Juridisk ikke påkrævet men anbefalet
Hvad gemmes med immutable flag i S3:
→ PDF-afregninger: s3://bucket/documents/{property_id}/settlements/
→ AES-nøgler: Krypteret i DB (aldrig i S3 i klartekst)
→ Excel-imports: Rå filer som audit-spor
Sletning:
→ Data ældre end 5 år: Anonymiseres (GDPR) men slettes IKKE fra S3
→ Immutable backup: Sletning kræver super_admin + bestyrelsesbeslutning
→ Udgået occupancy: Soft delete med anonymiseret lejer-navn
Platform advarsel:
→ Datasæt der mangler årsafregninger i 5-årsperioden → orange advarsel
→ Målerdata der starter efter 2022 → "Historisk data mangler"
Konklusion
Historisk datamigration kræver struktureret Excel-import af HCA-aflæsninger, bulk-upload af PDF-afregninger til S3 og backfill af DMI-graddage for korrekt år-til-år normalisering. Valideringsrapporten (/admin/migration/validate/{id}) identificerer huller i occupancy-historik, aflæsningsdækning og AES-nøgler. Bogføringsloven kræver 5 års opbevaring — S3 immutable flag sikrer GDPR-kompatibel langtidsopbevaring.
Se Hetzner Object Storage guide eller ejendomsadministrator-skift guide.