M-Bus Gateway
← Tilbage til blog
· wM-Bus· telegram· parsing· Python· DIB· VIB· AES· wmbusmeters· OMS· EN 13757

wM-Bus telegram parsing i Python — fra rå bytes til aflæsning

Parse wM-Bus telegrammer i Python: telegram-struktur, Data Information Block (DIB), Value Information Block (VIB), AES-CTR dekryptering og wmbusmeters output.

Af M-Bus Gateway

wM-Bus telegrammer er binære datapakker der overholder OMS EN 13757 standarden. Her er strukturen og hvordan M-Bus Gateway parser dem.


Telegram-struktur (OMS EN 13757-4)

Eksempel rå telegram (hex):
2C 44 97 26 83 20 67 30 04 17 32 A3 B4 ...
│  │  ├──────────────┤ │  │  │  ├────────────
│  │  Meter-ID        │  │  │  Krypteret data
│  CI-field           │  Status
└─ L (payload length) Access number

Felt-overblik:
  L:  1 byte — pakkens samlede længde
  C:  1 byte — control field (SND_NR = 0x44)
  M:  2 bytes — Manufacturer ID (FAB ID → "ELS" = Engelmann)
  A:  6 bytes — Meter address (serienummer + type + version)
  CI: 1 byte — Command/Information (0x7A = short header)
  AN: 1 byte — Access number (tæller)
  ST: 1 byte — Status
  CC: 2 bytes — Configuration (krypteringsinfo)
  Data: N bytes — AES-CTR krypteret payload

Manufacturer ID dekodning

# M-felt: 2 bytes → 3-bogstavs ASCII kode

def decode_manufacturer_id(m_field: bytes) -> str:
    """
    OMS-standard: 2 bytes → 3 chars fra A-Z.
    Eksempel: 0x9726 → "ELS" (Engelmann)
    """
    word = int.from_bytes(m_field, byteorder="little")
    c3 = (word & 0x001F) + 64          # Bits 0-4
    c2 = ((word >> 5) & 0x001F) + 64   # Bits 5-9
    c1 = ((word >> 10) & 0x001F) + 64  # Bits 10-14
    return chr(c1) + chr(c2) + chr(c3)

# Kendte FAB IDs:
FAB_IDS = {
    "ELS": "Engelmann",
    "KAM": "Kamstrup",
    "SON": "Sontex",
    "TCH": "Techem",
    "BRU": "Brunata",
    "DIE": "Diehl",
    "AXI": "Axioma",
    "ZEN": "Zenner",
}

Data Information Block (DIB) og VIB

# DIB + VIB struktur (ukrypteret del af telegram):
#
# DIB:
#   DIF:  1 byte — Data type + Tariff + Storage number
#   DIFE: 0-10 bytes — Extension bytes (hvis DIF bit 7 = 1)
#
# VIB:
#   VIF:  1 byte — Value type (energi, volumen, temperatur...)
#   VIFE: 0-10 bytes — Extension bytes

VIF_UNITS = {
    0x00: ("Wh", 1e-3),    # 1 Wh (faktor × 10^-3 kWh)
    0x01: ("Wh", 1e-2),
    0x02: ("Wh", 1e-1),
    0x03: ("Wh", 1.0),     # 1 Wh = 1 Wh
    0x04: ("Wh", 10.0),
    0x05: ("Wh", 100.0),
    0x06: ("kWh", 1.0),    # 1 kWh
    0x07: ("MWh", 1e-3),   # 0.001 MWh

    0x10: ("J", 1.0),      # Joule (alternativt til Wh)
    0x16: ("GJ", 1e-3),    # GJ

    0x14: ("m3", 1e-6),    # Kubikm med 6 decimaler
    0x15: ("m3", 1e-5),
    0x16: ("m3", 1e-4),
    0x17: ("m3", 1e-3),    # Liter-præcision

    0x58: ("°C", 1e-3),    # Temperatur (flow)
    0x5A: ("°C", 1e-2),
    0x5B: ("°C", 1e-1),

    0x6D: ("datetime", 1), # Dato+tid (4 bytes BCD)
}

def parse_vif(vif_byte: int) -> tuple[str, float]:
    """Returner (enhed, skaleringsfaktor)."""
    base_vif = vif_byte & 0x7F  # Bit 7 = extension flag
    if base_vif in VIF_UNITS:
        return VIF_UNITS[base_vif]
    return ("unknown", 1.0)

wmbusmeters: Praktisk parsing

# wmbusmeters gør parsing automatisk:
wmbusmeters --format=json /dev/wmbus auto:t1 \
  "myHCA" "ELS:MANUFACTURER:97268320:04" "NOKEY"

# JSON output:
{
  "media": "heat_cost_allocator",
  "meter": "myHCA",
  "name": "myHCA",
  "id": "20836797",
  "current_hca": 127,
  "previous_hca": 0,
  "current_hca_consumption_at_set_date_hca": 0,
  "set_date": "2024-09-01",
  "timestamp": "2026-05-24T06:00:00Z",
  "rssi_dbm": -67,
  "battery_level_pct": 91
}
# gateway/src/wmbus/listener.py
import subprocess
import json
from pathlib import Path

def parse_wmbus_output(line: str) -> dict | None:
    """Parse én JSON-linje fra wmbusmeters stdout."""
    try:
        data = json.loads(line)
        return {
            "meter_id": data["id"],
            "manufacturer": data.get("meter", "unknown"),
            "media": data["media"],
            "value": data.get("current_hca") or data.get("current_water_m3") or data.get("current_kwh"),
            "unit": _determine_unit(data["media"]),
            "rssi_dbm": data.get("rssi_dbm"),
            "battery_pct": data.get("battery_level_pct"),
            "timestamp": data["timestamp"],
        }
    except (json.JSONDecodeError, KeyError):
        return None

def _determine_unit(media: str) -> str:
    return {
        "heat_cost_allocator": "HCA",
        "heat": "kWh",
        "water": "m3",
        "cold_water": "m3",
        "electricity": "kWh",
        "gas": "m3",
    }.get(media, "unknown")

AES-CTR dekryptering (kun til diagnostik)

# Normalt håndterer wmbusmeters AES automatisk.
# Denne kode bruges til diagnostik/fejlsøgning:

from Crypto.Cipher import AES
import struct

def decrypt_oms_aes_ctr(
    ciphertext: bytes,
    key: bytes,   # 16 bytes AES-128 nøgle
    meter_id: bytes,   # 6 bytes (M-felt + A-felt)
    access_number: int,
) -> bytes:
    """
    OMS AES-128-CTR IV-konstruktion:
    IV = M(2) + A(6) + CC(2) + AN(1) + 0x00×5
    """
    # Konstruér IV (16 bytes):
    iv = meter_id[:8] + bytes([access_number]) + bytes(7)
    assert len(iv) == 16, "IV skal være 16 bytes"

    cipher = AES.new(key, AES.MODE_CTR, initial_value=iv, nonce=b"")
    return cipher.decrypt(ciphertext)

Konklusion

wM-Bus telegram-parsing er komplekst i rå form — brug wmbusmeters der implementerer hele OMS EN 13757 standarden inkl. DIB/VIB parsing, AES-CTR dekryptering og alle kendte drivertyper. Python-laget wraper wmbusmeters' JSON-output og kortlægger det til platformens datamodel. Forstå telegram-strukturen for at debugge ukendte målere.

Se AES-128 CTR dekryptering guide eller wmbusmeters installationsguide.