M-Bus Gateway
← Tilbage til blog
· MQTT· Sparkplug B· IoT· Protobuf· gateway· protokol· industriel IoT

MQTT Sparkplug B — struktureret IoT-datamodel over MQTT

MQTT Sparkplug B protokollen: Protobuf-payload, topic namespace, NBIRTH/DBIRTH/DDATA/DDEATH sekvenser, state management og sammenligning med raw MQTT til industriel IoT og gateway-arkitektur.

Af M-Bus Gateway

Sparkplug B tilføjer struktureret datamodel, state management og Protobuf-komprimering til vanilla MQTT. Her er hvornår det er relevant og hvornår raw MQTT er bedre.


Sparkplug B — hvad det løser

Vanilla MQTT — problemer i stor skala:
  → Ingen standard for payload-format (JSON? MessagePack? hex?)
  → Ingen mekanisme til at opdage om node er online/offline
  → Ingen selvbeskrivende data (hvad er 1234.5? kWh? kPa?)
  → Broker har ingen state om hvilke enheder der er aktive

Sparkplug B løser dette med:
  → Standardiseret Protobuf payload (Sparkplug.proto)
  → Definereret topic namespace: spBv1.0/{group}/{type}/{edge_node}/{device}
  → NBIRTH/DBIRTH: Node/Device annoncerer sig med metadata
  → NDEATH/DDEATH: Automatisk offline-detektion via LWT
  → Primær Application (SCADA): Supervisory control med state sync

Hvornår Sparkplug B:
  ✓ Industriel SCADA-integration (Inductive Automation Ignition, Eclipse Tahu)
  ✓ Stor flåde med 1000+ enheder og kompleks state
  ✓ Multi-vendor (gateway + PLC + sensor i same namespace)
  ✗ Simple consumer IoT (overhead er ikke det værd)
  ✗ Eksisterende MQTT-infrastruktur (breaking change)

Topic namespace

Sparkplug B topic format:

spBv1.0/{group_id}/{message_type}/{edge_node_id}/{device_id}

Eksempel for M-Bus Gateway:
  spBv1.0/mbus-platform/NBIRTH/GW-0001          ← Node birth
  spBv1.0/mbus-platform/NBIRTH/GW-0001/Meter-12345678  ← Device birth
  spBv1.0/mbus-platform/DDATA/GW-0001/Meter-12345678   ← Data
  spBv1.0/mbus-platform/DDEATH/GW-0001/Meter-12345678  ← Device offline
  spBv1.0/mbus-platform/NDEATH/GW-0001          ← Node offline (LWT)

Message types:
  NBIRTH  → Edge node online + metadata
  NDEATH  → Edge node offline (konfigureret som LWT)
  DBIRTH  → Device (meter) online + metric definitions
  DDATA   → Data update (delta, kun ændrede metrics)
  DDEATH  → Device offline
  NCMD    → Server → Node kommando
  DCMD    → Server → Device kommando
  STATE   → Primary Application state

Protobuf payload

# gateway/src/mqtt/sparkplug_payload.py
# Kræver: pip install sparkplug-b

from sparkplug_b import sparkplug_b_pb2 as sparkplug
from sparkplug_b.sparkplug_b_pb2 import Payload
import time


def create_dbirth_payload(meter_id: str, meter_type: str) -> bytes:
    """
    DBIRTH — Device Birth: Annoncér måler med metric-definitioner.
    Kaldés én gang når gateway starter eller måler opdages.
    """
    payload = Payload()
    payload.timestamp = int(time.time() * 1000)
    payload.seq = 0   # Sekvensnummer (0 for BIRTH)

    # Definer metrics der vil komme i DDATA:
    _add_metric(payload, "heat_energy_kwh", sparkplug.MetaData.FLOAT, 0.0)
    _add_metric(payload, "battery_level_pct", sparkplug.MetaData.INT32, 100)
    _add_metric(payload, "rssi_dbm", sparkplug.MetaData.INT32, -87)
    _add_metric(payload, "timestamp_utc", sparkplug.MetaData.STRING, "")

    # Properties:
    payload.metrics[0].properties.keys.append("meter_type")
    payload.metrics[0].properties.values.append(
        sparkplug.PropertyValue(type=sparkplug.MetaData.STRING, string_value=meter_type)
    )

    return payload.SerializeToString()


def create_ddata_payload(seq: int, readings: dict) -> bytes:
    """
    DDATA — Delta update med kun ændrede metrics.
    seq øges for hvert DDATA (0-255, derefter wrap til 0).
    """
    payload = Payload()
    payload.timestamp = int(time.time() * 1000)
    payload.seq = seq % 256

    for name, value in readings.items():
        metric = payload.metrics.add()
        metric.name = name
        metric.timestamp = payload.timestamp
        if isinstance(value, float):
            metric.float_value = value
        elif isinstance(value, int):
            metric.int_value = value
        elif isinstance(value, str):
            metric.string_value = value

    return payload.SerializeToString()


def _add_metric(payload, name, datatype, initial_value):
    metric = payload.metrics.add()
    metric.name = name
    metric.timestamp = payload.timestamp
    metric.datatype = datatype
    # Sæt initial value:
    if datatype == sparkplug.MetaData.FLOAT:
        metric.float_value = float(initial_value)
    elif datatype == sparkplug.MetaData.INT32:
        metric.int_value = int(initial_value)
    elif datatype == sparkplug.MetaData.STRING:
        metric.string_value = str(initial_value)

State management — NDEATH via LWT

# LWT (Last Will and Testament) konfigureret til NDEATH:

import paho.mqtt.client as mqtt
from sparkplug_b import sparkplug_b_pb2 as sparkplug


def create_ndeath_payload() -> bytes:
    """NDEATH payload — minimal (kun bd_seq)."""
    payload = sparkplug.Payload()
    payload.timestamp = int(time.time() * 1000)
    metric = payload.metrics.add()
    metric.name = "bdSeq"
    metric.int_value = 0
    return payload.SerializeToString()


def connect_sparkplug(client: mqtt.Client, group_id: str, node_id: str):
    """Konfigurér Sparkplug B LWT og opret forbindelse."""
    ndeath_topic = f"spBv1.0/{group_id}/NDEATH/{node_id}"
    ndeath_payload = create_ndeath_payload()

    # LWT = automatisk NDEATH ved uventet disconnect:
    client.will_set(
        ndeath_topic,
        payload=ndeath_payload,
        qos=1,
        retain=False,
    )

    client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)

    # Efter connect: Send NBIRTH for at annoncere node:
    nbirth_topic = f"spBv1.0/{group_id}/NBIRTH/{node_id}"
    nbirth_payload = create_nbirth_payload(node_id)
    client.publish(nbirth_topic, nbirth_payload, qos=1)

M-Bus Gateway: Raw MQTT vs. Sparkplug B

Nuværende platform: Raw MQTT med MessagePack+zlib

Pro raw MQTT (nuværende):
  → Simpel implementation (paho-mqtt, ingen Sparkplug library)
  → MessagePack+zlib: 3KB/dag → optimal for 1NCE 500MB SIM
  → Fuld kontrol over topic-struktur og payload
  → Ingen SCADA-integration krævet

Pro Sparkplug B (fremtid):
  → Automatisk state tracking (gateway offline/online)
  → Self-describing data (ingen ekstern mapping)
  → Ignition/Kepware integration til industrikunder
  → Standardiseret → nemmere third-party integration

Beslutning for M-Bus Gateway:
  → Raw MQTT: Behold til alle gateways (simplicitet + SIM-budget)
  → Sparkplug B bridge: Mulig fremtidig Fase 6 feature
    Gateway sender raw → Server publisher bridge til Sparkplug
    → Industrielle BMS-integrationer understøttes uden Pi-ændring

Konklusion

Sparkplug B tilføjer selvbeskrivende Protobuf payloads, standardiseret topic namespace og automatisk state management til MQTT — ideelt til industriel IoT med SCADA-integration. For forbruger-IoT som wM-Bus gateways med SIM-budget er raw MQTT + MessagePack+zlib mere hensigtsmæssigt. En fremtidig server-side bridge fra raw MQTT til Sparkplug B giver industriel kompatibilitet uden at ændre gateway-koden.

Se MQTT broker opsætning guide eller MQTT QoS guide.