MQTT Sparkplug B — struktureret IoT-datamodel over MQTT
MQTT Sparkplug B protokollen: Protobuf-payload, topic namespace, NBIRTH/DBIRTH/DDATA/DDEATH sekvenser, state management og sammenligning med raw MQTT til industriel IoT og gateway-arkitektur.
Af M-Bus Gateway
Sparkplug B tilføjer struktureret datamodel, state management og Protobuf-komprimering til vanilla MQTT. Her er hvornår det er relevant og hvornår raw MQTT er bedre.
Sparkplug B — hvad det løser
Vanilla MQTT — problemer i stor skala:
→ Ingen standard for payload-format (JSON? MessagePack? hex?)
→ Ingen mekanisme til at opdage om node er online/offline
→ Ingen selvbeskrivende data (hvad er 1234.5? kWh? kPa?)
→ Broker har ingen state om hvilke enheder der er aktive
Sparkplug B løser dette med:
→ Standardiseret Protobuf payload (Sparkplug.proto)
→ Definereret topic namespace: spBv1.0/{group}/{type}/{edge_node}/{device}
→ NBIRTH/DBIRTH: Node/Device annoncerer sig med metadata
→ NDEATH/DDEATH: Automatisk offline-detektion via LWT
→ Primær Application (SCADA): Supervisory control med state sync
Hvornår Sparkplug B:
✓ Industriel SCADA-integration (Inductive Automation Ignition, Eclipse Tahu)
✓ Stor flåde med 1000+ enheder og kompleks state
✓ Multi-vendor (gateway + PLC + sensor i same namespace)
✗ Simple consumer IoT (overhead er ikke det værd)
✗ Eksisterende MQTT-infrastruktur (breaking change)
Topic namespace
Sparkplug B topic format:
spBv1.0/{group_id}/{message_type}/{edge_node_id}/{device_id}
Eksempel for M-Bus Gateway:
spBv1.0/mbus-platform/NBIRTH/GW-0001 ← Node birth
spBv1.0/mbus-platform/NBIRTH/GW-0001/Meter-12345678 ← Device birth
spBv1.0/mbus-platform/DDATA/GW-0001/Meter-12345678 ← Data
spBv1.0/mbus-platform/DDEATH/GW-0001/Meter-12345678 ← Device offline
spBv1.0/mbus-platform/NDEATH/GW-0001 ← Node offline (LWT)
Message types:
NBIRTH → Edge node online + metadata
NDEATH → Edge node offline (konfigureret som LWT)
DBIRTH → Device (meter) online + metric definitions
DDATA → Data update (delta, kun ændrede metrics)
DDEATH → Device offline
NCMD → Server → Node kommando
DCMD → Server → Device kommando
STATE → Primary Application state
Protobuf payload
# gateway/src/mqtt/sparkplug_payload.py
# Kræver: pip install sparkplug-b
from sparkplug_b import sparkplug_b_pb2 as sparkplug
from sparkplug_b.sparkplug_b_pb2 import Payload
import time
def create_dbirth_payload(meter_id: str, meter_type: str) -> bytes:
"""
DBIRTH — Device Birth: Annoncér måler med metric-definitioner.
Kaldés én gang når gateway starter eller måler opdages.
"""
payload = Payload()
payload.timestamp = int(time.time() * 1000)
payload.seq = 0 # Sekvensnummer (0 for BIRTH)
# Definer metrics der vil komme i DDATA:
_add_metric(payload, "heat_energy_kwh", sparkplug.MetaData.FLOAT, 0.0)
_add_metric(payload, "battery_level_pct", sparkplug.MetaData.INT32, 100)
_add_metric(payload, "rssi_dbm", sparkplug.MetaData.INT32, -87)
_add_metric(payload, "timestamp_utc", sparkplug.MetaData.STRING, "")
# Properties:
payload.metrics[0].properties.keys.append("meter_type")
payload.metrics[0].properties.values.append(
sparkplug.PropertyValue(type=sparkplug.MetaData.STRING, string_value=meter_type)
)
return payload.SerializeToString()
def create_ddata_payload(seq: int, readings: dict) -> bytes:
"""
DDATA — Delta update med kun ændrede metrics.
seq øges for hvert DDATA (0-255, derefter wrap til 0).
"""
payload = Payload()
payload.timestamp = int(time.time() * 1000)
payload.seq = seq % 256
for name, value in readings.items():
metric = payload.metrics.add()
metric.name = name
metric.timestamp = payload.timestamp
if isinstance(value, float):
metric.float_value = value
elif isinstance(value, int):
metric.int_value = value
elif isinstance(value, str):
metric.string_value = value
return payload.SerializeToString()
def _add_metric(payload, name, datatype, initial_value):
metric = payload.metrics.add()
metric.name = name
metric.timestamp = payload.timestamp
metric.datatype = datatype
# Sæt initial value:
if datatype == sparkplug.MetaData.FLOAT:
metric.float_value = float(initial_value)
elif datatype == sparkplug.MetaData.INT32:
metric.int_value = int(initial_value)
elif datatype == sparkplug.MetaData.STRING:
metric.string_value = str(initial_value)
State management — NDEATH via LWT
# LWT (Last Will and Testament) konfigureret til NDEATH:
import paho.mqtt.client as mqtt
from sparkplug_b import sparkplug_b_pb2 as sparkplug
def create_ndeath_payload() -> bytes:
"""NDEATH payload — minimal (kun bd_seq)."""
payload = sparkplug.Payload()
payload.timestamp = int(time.time() * 1000)
metric = payload.metrics.add()
metric.name = "bdSeq"
metric.int_value = 0
return payload.SerializeToString()
def connect_sparkplug(client: mqtt.Client, group_id: str, node_id: str):
"""Konfigurér Sparkplug B LWT og opret forbindelse."""
ndeath_topic = f"spBv1.0/{group_id}/NDEATH/{node_id}"
ndeath_payload = create_ndeath_payload()
# LWT = automatisk NDEATH ved uventet disconnect:
client.will_set(
ndeath_topic,
payload=ndeath_payload,
qos=1,
retain=False,
)
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
# Efter connect: Send NBIRTH for at annoncere node:
nbirth_topic = f"spBv1.0/{group_id}/NBIRTH/{node_id}"
nbirth_payload = create_nbirth_payload(node_id)
client.publish(nbirth_topic, nbirth_payload, qos=1)
M-Bus Gateway: Raw MQTT vs. Sparkplug B
Nuværende platform: Raw MQTT med MessagePack+zlib
Pro raw MQTT (nuværende):
→ Simpel implementation (paho-mqtt, ingen Sparkplug library)
→ MessagePack+zlib: 3KB/dag → optimal for 1NCE 500MB SIM
→ Fuld kontrol over topic-struktur og payload
→ Ingen SCADA-integration krævet
Pro Sparkplug B (fremtid):
→ Automatisk state tracking (gateway offline/online)
→ Self-describing data (ingen ekstern mapping)
→ Ignition/Kepware integration til industrikunder
→ Standardiseret → nemmere third-party integration
Beslutning for M-Bus Gateway:
→ Raw MQTT: Behold til alle gateways (simplicitet + SIM-budget)
→ Sparkplug B bridge: Mulig fremtidig Fase 6 feature
Gateway sender raw → Server publisher bridge til Sparkplug
→ Industrielle BMS-integrationer understøttes uden Pi-ændring
Konklusion
Sparkplug B tilføjer selvbeskrivende Protobuf payloads, standardiseret topic namespace og automatisk state management til MQTT — ideelt til industriel IoT med SCADA-integration. For forbruger-IoT som wM-Bus gateways med SIM-budget er raw MQTT + MessagePack+zlib mere hensigtsmæssigt. En fremtidig server-side bridge fra raw MQTT til Sparkplug B giver industriel kompatibilitet uden at ændre gateway-koden.