M-Bus Gateway
← Tilbage til blog
· PostgreSQL· JSONB· IoT· konfiguration· GIN· JSON· SQLModel· Pydantic· gateway· metadata

PostgreSQL JSONB til IoT gateway-konfiguration — fleksibel metadata-lagring

PostgreSQL JSONB til IoT: gateway-konfiguration, GIN-indekser, jsonb_set opdateringer, konfigurationsdrift-detektion og SQLModel hybrid-tilgang med Pydantic-validering.

Af M-Bus Gateway

M-Bus Gateway bruger PostgreSQL JSONB til at lagre gateway-konfiguration og device-metadata. Her er mønstrene der balancerer fleksibilitet med typesikkerhed.


Hvornår JSONB frem for normale kolonner?

Normale kolonner:
  ✅ Kendte felter med fast skema (tenant_id, name, address)
  ✅ Felter der bruges i WHERE-klausuler (indexable)
  ✅ Felter der aggregeres (SUM, AVG)
  ✗ Varierende struktur pr. record
  ✗ Felter der ændrer sig over tid (schema evolution)

JSONB:
  ✅ Semi-struktureret data (varierer pr. gateway-model)
  ✅ Konfiguration med mange valgfrie felter
  ✅ Metadata der ændrer sig uden migration
  ✅ Hurtigt GIN-index til JSON-queries
  ✗ Ikke egnet til felter med stærke typekrav (beløb, tid)
  ✗ Sværere at validere i database-lag

Gateway-config eksempel:
  Fast: gateway_id, tenant_id, created_at (→ normale kolonner)
  Variabelt: MQTT-host, send-cron, WMBUS-mode, firmware-version (→ JSONB)

Gateway model med JSONB config

# server/src/db/models.py
from sqlmodel import SQLModel, Field, Column
from sqlalchemy import JSON
from typing import Any
from pydantic import BaseModel, field_validator

class GatewayConfig(BaseModel):
    """Pydantic-validering af gateway-konfiguration."""
    MQTT_HOST: str = "178.105.90.8"
    MQTT_PORT: int = 8883
    SEND_CRON: str = "0 6 * * *"
    WMBUS_MODE: str = "c1"
    LOG_LEVEL: str = "INFO"
    CLOUDFLARE_TOKEN: str | None = None
    PLATFORM_API_URL: str | None = None

    @field_validator("WMBUS_MODE")
    @classmethod
    def validate_wmbus_mode(cls, v: str) -> str:
        if v not in ("c1", "t1", "s1", "c1t1"):
            raise ValueError(f"Ugyldig wM-Bus mode: {v}")
        return v

    @field_validator("MQTT_PORT")
    @classmethod
    def validate_port(cls, v: int) -> int:
        if not (1 <= v <= 65535):
            raise ValueError("Port skal være 1-65535")
        return v


class Gateway(SQLModel, table=True):
    __tablename__ = "gateway"

    id: str = Field(primary_key=True)  # GW-0001
    tenant_id: UUID = Field(index=True)
    # JSONB kolonne — fleksibelt, ingen migration ved nye felter:
    config: dict[str, Any] = Field(
        default_factory=dict,
        sa_column=Column(JSON),
    )
    firmware_version: str | None = None
    last_seen_at: datetime | None = None

    @property
    def typed_config(self) -> GatewayConfig:
        """Returnér Pydantic-valideret konfiguration."""
        return GatewayConfig(**self.config)

JSONB queries: Driftsovervågning

# Hent alle gateways med forkert WMBUS_MODE:
from sqlalchemy import text

async def find_config_drift(
    expected_config: dict,
    session: AsyncSession,
) -> list[dict]:
    """
    Find gateways med konfigurationsafvigelse fra platform-baseline.
    Bruger PostgreSQL JSONB ->> operator til nøgle-adgang.
    """
    conditions = []
    params = {}

    for key, expected_value in expected_config.items():
        param_name = f"expected_{key}"
        # config->>'KEY' returnerer JSONB-værdien som tekst:
        conditions.append(f"config->>''{key}'' != :{param_name}")
        params[param_name] = str(expected_value)

    if not conditions:
        return []

    where_clause = " OR ".join(conditions)
    sql = text(f"""
        SELECT id, config, firmware_version, last_seen_at
        FROM gateway
        WHERE deleted_at IS NULL
          AND ({where_clause})
    """)

    result = await session.execute(sql, params)
    return [dict(row._mapping) for row in result]


# Eksempel:
baseline = {
    "MQTT_HOST": "178.105.90.8",
    "MQTT_PORT": "8883",
    "SEND_CRON": "0 6 * * *",
    "WMBUS_MODE": "c1",
}
drifted = await find_config_drift(baseline, session)

GIN index: Hurtige JSONB-søgninger

-- GIN-index til hurtige JSONB-containment queries (@>):
CREATE INDEX idx_gateway_config_gin ON gateway USING GIN (config);

-- Containment query (alle gateways med WMBUS_MODE=t1):
SELECT id FROM gateway
WHERE config @> '{"WMBUS_MODE": "t1"}';
-- → Bruger GIN-index: O(log n) i stedet for full scan

-- Eksistens-check (@?):
SELECT id FROM gateway
WHERE config @? '$.CLOUDFLARE_TOKEN';
-- → Alle gateways med en Cloudflare token konfigureret

-- Specifikke felter (btree-index er hurtigere for én nøgle):
CREATE INDEX idx_gateway_config_wmbus
ON gateway ((config->>'WMBUS_MODE'));
-- → Hurtig WHERE config->>'WMBUS_MODE' = 'c1'

jsonb_set: Opdatér enkelt felt

# Opdatér SEND_CRON uden at overskrive resten af config:
from sqlalchemy import update, func

async def update_gateway_config_field(
    gateway_id: str,
    field: str,
    value: str,
    session: AsyncSession,
) -> None:
    """
    Opdatér ét felt i gateway config JSONB.
    Bruger PostgreSQL jsonb_set — atomisk, ingen race condition.
    """
    await session.execute(
        update(Gateway)
        .where(Gateway.id == gateway_id)
        .values(
            config=func.jsonb_set(
                Gateway.config,
                f"{{{field}}}",  # PostgreSQL path notation: {KEY}
                f'"{value}"',    # JSON-encoded value
                True,            # create_missing=True
            ),
            updated_at=datetime.utcnow(),
        )
    )

Konfigurationsdrift-endpoint

# server/src/gateway/router.py

PLATFORM_CONFIG_BASELINE = {
    "MQTT_HOST": "178.105.90.8",
    "MQTT_PORT": 8883,
    "SEND_CRON": "0 6 * * *",
    "WMBUS_MODE": "c1",
    "LOG_LEVEL": "INFO",
}

@router.get("/config-drift")
async def get_config_drift(
    session: AsyncSession = Depends(get_session),
    user: TokenPayload = Depends(require_role("super_admin", "technician")),
):
    """Find gateways med konfigurationsafvigelse fra baseline."""
    gateways = await session.execute(
        select(Gateway).where(
            Gateway.tenant_id == user.tenant_id,
            Gateway.deleted_at.is_(None),
        )
    )

    drift_results = []
    for gw in gateways.scalars():
        diffs = {}
        for key, expected in PLATFORM_CONFIG_BASELINE.items():
            actual = gw.config.get(key)
            if actual != expected:
                diffs[key] = {"expected": expected, "actual": actual}
        if diffs:
            drift_results.append({
                "gateway_id": gw.id,
                "diffs": diffs,
                "firmware_version": gw.firmware_version,
            })

    return {"total_drifted": len(drift_results), "gateways": drift_results}

Konklusion

PostgreSQL JSONB er ideelt til semi-strukturerede IoT-konfigurationer der ændrer sig over tid — ingen migration nødvendig ved nye felter. GIN-indekser giver hurtige containment-queries. jsonb_set() tillader atomiske enkeltfelt-opdateringer. Pydantic-validering via typed_config property giver typesikkerhed i applikations-laget uden at miste JSONB-fleksibiliteten.

Se asyncpg PostgreSQL guide eller gateway konfig-drift guide.