· wM-Bus· telegram· parsing· Python· DIB· VIB· AES· wmbusmeters· OMS· EN 13757
wM-Bus telegram parsing i Python — fra rå bytes til aflæsning
Parse wM-Bus telegrammer i Python: telegram-struktur, Data Information Block (DIB), Value Information Block (VIB), AES-CTR dekryptering og wmbusmeters output.
Af M-Bus Gateway
wM-Bus telegrammer er binære datapakker der overholder OMS EN 13757 standarden. Her er strukturen og hvordan M-Bus Gateway parser dem.
Telegram-struktur (OMS EN 13757-4)
Eksempel rå telegram (hex):
2C 44 97 26 83 20 67 30 04 17 32 A3 B4 ...
│ │ ├──────────────┤ │ │ │ ├────────────
│ │ Meter-ID │ │ │ Krypteret data
│ CI-field │ Status
└─ L (payload length) Access number
Felt-overblik:
L: 1 byte — pakkens samlede længde
C: 1 byte — control field (SND_NR = 0x44)
M: 2 bytes — Manufacturer ID (FAB ID → "ELS" = Engelmann)
A: 6 bytes — Meter address (serienummer + type + version)
CI: 1 byte — Command/Information (0x7A = short header)
AN: 1 byte — Access number (tæller)
ST: 1 byte — Status
CC: 2 bytes — Configuration (krypteringsinfo)
Data: N bytes — AES-CTR krypteret payload
Manufacturer ID dekodning
# M-felt: 2 bytes → 3-bogstavs ASCII kode
def decode_manufacturer_id(m_field: bytes) -> str:
"""
OMS-standard: 2 bytes → 3 chars fra A-Z.
Eksempel: 0x9726 → "ELS" (Engelmann)
"""
word = int.from_bytes(m_field, byteorder="little")
c3 = (word & 0x001F) + 64 # Bits 0-4
c2 = ((word >> 5) & 0x001F) + 64 # Bits 5-9
c1 = ((word >> 10) & 0x001F) + 64 # Bits 10-14
return chr(c1) + chr(c2) + chr(c3)
# Kendte FAB IDs:
FAB_IDS = {
"ELS": "Engelmann",
"KAM": "Kamstrup",
"SON": "Sontex",
"TCH": "Techem",
"BRU": "Brunata",
"DIE": "Diehl",
"AXI": "Axioma",
"ZEN": "Zenner",
}
Data Information Block (DIB) og VIB
# DIB + VIB struktur (ukrypteret del af telegram):
#
# DIB:
# DIF: 1 byte — Data type + Tariff + Storage number
# DIFE: 0-10 bytes — Extension bytes (hvis DIF bit 7 = 1)
#
# VIB:
# VIF: 1 byte — Value type (energi, volumen, temperatur...)
# VIFE: 0-10 bytes — Extension bytes
VIF_UNITS = {
0x00: ("Wh", 1e-3), # 1 Wh (faktor × 10^-3 kWh)
0x01: ("Wh", 1e-2),
0x02: ("Wh", 1e-1),
0x03: ("Wh", 1.0), # 1 Wh = 1 Wh
0x04: ("Wh", 10.0),
0x05: ("Wh", 100.0),
0x06: ("kWh", 1.0), # 1 kWh
0x07: ("MWh", 1e-3), # 0.001 MWh
0x10: ("J", 1.0), # Joule (alternativt til Wh)
0x16: ("GJ", 1e-3), # GJ
0x14: ("m3", 1e-6), # Kubikm med 6 decimaler
0x15: ("m3", 1e-5),
0x16: ("m3", 1e-4),
0x17: ("m3", 1e-3), # Liter-præcision
0x58: ("°C", 1e-3), # Temperatur (flow)
0x5A: ("°C", 1e-2),
0x5B: ("°C", 1e-1),
0x6D: ("datetime", 1), # Dato+tid (4 bytes BCD)
}
def parse_vif(vif_byte: int) -> tuple[str, float]:
"""Returner (enhed, skaleringsfaktor)."""
base_vif = vif_byte & 0x7F # Bit 7 = extension flag
if base_vif in VIF_UNITS:
return VIF_UNITS[base_vif]
return ("unknown", 1.0)
wmbusmeters: Praktisk parsing
# wmbusmeters gør parsing automatisk:
wmbusmeters --format=json /dev/wmbus auto:t1 \
"myHCA" "ELS:MANUFACTURER:97268320:04" "NOKEY"
# JSON output:
{
"media": "heat_cost_allocator",
"meter": "myHCA",
"name": "myHCA",
"id": "20836797",
"current_hca": 127,
"previous_hca": 0,
"current_hca_consumption_at_set_date_hca": 0,
"set_date": "2024-09-01",
"timestamp": "2026-05-24T06:00:00Z",
"rssi_dbm": -67,
"battery_level_pct": 91
}
# gateway/src/wmbus/listener.py
import subprocess
import json
from pathlib import Path
def parse_wmbus_output(line: str) -> dict | None:
"""Parse én JSON-linje fra wmbusmeters stdout."""
try:
data = json.loads(line)
return {
"meter_id": data["id"],
"manufacturer": data.get("meter", "unknown"),
"media": data["media"],
"value": data.get("current_hca") or data.get("current_water_m3") or data.get("current_kwh"),
"unit": _determine_unit(data["media"]),
"rssi_dbm": data.get("rssi_dbm"),
"battery_pct": data.get("battery_level_pct"),
"timestamp": data["timestamp"],
}
except (json.JSONDecodeError, KeyError):
return None
def _determine_unit(media: str) -> str:
return {
"heat_cost_allocator": "HCA",
"heat": "kWh",
"water": "m3",
"cold_water": "m3",
"electricity": "kWh",
"gas": "m3",
}.get(media, "unknown")
AES-CTR dekryptering (kun til diagnostik)
# Normalt håndterer wmbusmeters AES automatisk.
# Denne kode bruges til diagnostik/fejlsøgning:
from Crypto.Cipher import AES
import struct
def decrypt_oms_aes_ctr(
ciphertext: bytes,
key: bytes, # 16 bytes AES-128 nøgle
meter_id: bytes, # 6 bytes (M-felt + A-felt)
access_number: int,
) -> bytes:
"""
OMS AES-128-CTR IV-konstruktion:
IV = M(2) + A(6) + CC(2) + AN(1) + 0x00×5
"""
# Konstruér IV (16 bytes):
iv = meter_id[:8] + bytes([access_number]) + bytes(7)
assert len(iv) == 16, "IV skal være 16 bytes"
cipher = AES.new(key, AES.MODE_CTR, initial_value=iv, nonce=b"")
return cipher.decrypt(ciphertext)
Konklusion
wM-Bus telegram-parsing er komplekst i rå form — brug wmbusmeters der implementerer hele OMS EN 13757 standarden inkl. DIB/VIB parsing, AES-CTR dekryptering og alle kendte drivertyper. Python-laget wraper wmbusmeters' JSON-output og kortlægger det til platformens datamodel. Forstå telegram-strukturen for at debugge ukendte målere.
Se AES-128 CTR dekryptering guide eller wmbusmeters installationsguide.