M-Bus Gateway
← Tilbage til blog
· MessagePack· zlib· IoT· 4G· komprimering· gateway· 1NCE· SIM· Python· payload

MessagePack + zlib til IoT gateway — minimal databrug over 4G

MessagePack binær serialisering og zlib-komprimering til IoT: sammenligning med JSON, Python implementering, gateway payload-struktur og 10-årig SIM-budget beregning.

Af M-Bus Gateway

M-Bus Gateway sender præcis 1 payload pr. dag til Hetzner via 4G IoT-SIM. MessagePack + zlib reducerer payloaden fra ~180KB (JSON) til ~3KB. Her er implementeringen.


Problemet: 500MB / 10 år SIM-budget

1NCE IoT SIM: 500MB i 10 år (inkluderet i hardware-pris)
Ingen genopfyldning — 10-årsplan er kritisk

Krav:
  → 50 målere × 365 dage = 18.250 daglige sends
  → Budget: 500MB / 18.250 = 27KB pr. dag max
  → Med 25% buffer: Max 20KB pr. dag

JSON payload (ukomprimeret):
  50 målere × 3.500 bytes pr. måler = 175.000 bytes = 170KB
  → Overstiger dagbudget 8,5× !

MessagePack + zlib:
  MessagePack: 50 målere × 1.100 bytes = 55.000 bytes (binær)
  zlib compress (level 6): 55.000 → ~3.200 bytes
  → 94% komprimering vs. JSON
  → 18% af dagbudget → 300MB buffer over 10 år

MessagePack vs. JSON

import json
import msgpack
import zlib

# Eksempel-payload (én målers data):
meter_data = {
    "id": "12345678",
    "fab": "ELS",
    "type": "hca",
    "ts": 1748044800,       # Unix timestamp (int, ikke ISO string)
    "v": 312.0,             # Value (float)
    "prev": 285.0,
    "rssi": -72,
    "bat": 98,
    "flags": 0,
}

# JSON:
json_bytes = json.dumps(meter_data, separators=(",", ":")).encode()
print(f"JSON:        {len(json_bytes):>6} bytes")  # 113 bytes

# MessagePack:
msgpack_bytes = msgpack.packb(meter_data, use_bin_type=True)
print(f"MessagePack: {len(msgpack_bytes):>6} bytes")  # 71 bytes (37% smaller)

# MessagePack + zlib (level 6):
compressed = zlib.compress(msgpack_bytes, level=6)
print(f"MP + zlib:   {len(compressed):>6} bytes")  # 73 bytes (lille payload = overhead)

# For fuld payload (50 målere):
full_payload = msgpack.packb(
    {"gw": "GW-0001", "ts": 1748044800, "meters": [meter_data] * 50},
    use_bin_type=True,
)
compressed_full = zlib.compress(full_payload, level=6)
print(f"Full 50m JSON:   {len(json.dumps({'gw': 'GW-0001', 'ts': 1748044800, 'meters': [meter_data] * 50}).encode()):>6} bytes")
print(f"Full 50m MP+zlib:{len(compressed_full):>6} bytes")
# Full 50m JSON:   5.890 bytes
# Full 50m MP+zlib: 1.184 bytes  ← 80% reduction for repeated structure

Gateway payload-struktur

# gateway/src/mqtt/payload.py
import msgpack
import zlib
import time
from dataclasses import dataclass
from typing import Optional


@dataclass
class MeterReading:
    meter_id: str          # Serienummer (hex, stripped)
    fab: str               # Fabrikant-ID (3 bogstaver)
    media: str             # "hca" | "heat" | "water" | "electricity"
    timestamp: int         # Unix timestamp (int, ikke ISO)
    value: float           # Primær aflæsning
    previous: Optional[float] = None  # Forrige periode
    rssi: Optional[int] = None        # dBm (int, ikke float)
    battery: Optional[int] = None     # Procent (0-100)
    flags: int = 0         # Bitflag: 0=OK, 1=error, 2=sabotage, 4=low_battery


def build_payload(gateway_id: str, readings: list[MeterReading]) -> bytes:
    """
    Byg daglig MQTT-payload.
    Komprimeret MessagePack — target < 5KB ved 50 målere.
    """
    data = {
        "v": 3,              # Payload version (skemaversionen)
        "gw": gateway_id,
        "ts": int(time.time()),
        "m": [               # "m" i stedet for "meters" (kortere nøgle)
            {
                "id": r.meter_id,
                "f": r.fab,
                "t": r.media,
                "ts": r.timestamp,
                "v": r.value,
                "p": r.previous,
                "r": r.rssi,
                "b": r.battery,
                "fl": r.flags,
            }
            for r in readings
        ],
    }

    packed = msgpack.packb(data, use_bin_type=True)
    compressed = zlib.compress(packed, level=6)

    # Logging:
    ratio = len(packed) / len(compressed)
    print(f"Payload: {len(packed)} bytes → {len(compressed)} bytes (ratio {ratio:.1f}x)")

    return compressed


def parse_payload(raw: bytes) -> dict:
    """Parse indkommende payload fra gateway."""
    decompressed = zlib.decompress(raw)
    return msgpack.unpackb(decompressed, raw=False)

Server-side parsing og validering

# server/src/mqtt/subscriber.py

async def process_gateway_payload(gateway_id: str, raw_payload: bytes) -> None:
    """
    MQTT subscriber-callback for meters/{gateway_id}/data topic.
    Decompresses og parser MessagePack payload.
    """
    try:
        data = zlib.decompress(raw_payload)
        payload = msgpack.unpackb(data, raw=False)
    except (zlib.error, msgpack.UnpackException) as exc:
        logger.error("payload_parse_error", gateway_id=gateway_id, error=str(exc))
        return

    # Valider payload version:
    if payload.get("v") not in (2, 3):
        logger.warning("unknown_payload_version", v=payload.get("v"))

    readings = payload.get("m", [])
    gateway_ts = payload.get("ts")

    # Konvertér til interne datastructurer:
    for meter in readings:
        await upsert_reading(
            gateway_id=gateway_id,
            meter_id=meter["id"],
            fab=meter.get("f"),
            media=meter.get("t"),
            timestamp=datetime.fromtimestamp(meter["ts"], tz=timezone.utc),
            value=meter["v"],
            rssi_dbm=meter.get("r"),
            battery_level_pct=meter.get("b"),
            flags=meter.get("fl", 0),
        )

    logger.info(
        "payload_processed",
        gateway_id=gateway_id,
        meter_count=len(readings),
        payload_bytes=len(raw_payload),
    )

10-årig SIM-budget beregning

def calculate_sim_budget(
    meters_per_gateway: int = 50,
    gateways: int = 1,
    days: int = 3650,  # 10 år
    bytes_per_day: int = 3200,
    sim_mb: float = 500.0,
) -> dict:
    """Beregn SIM-forbrugsprognose."""
    daily_total = bytes_per_day * gateways
    total_bytes = daily_total * days
    total_mb = total_bytes / (1024 * 1024)
    remaining_mb = sim_mb - total_mb
    usage_pct = (total_mb / sim_mb) * 100

    return {
        "total_mb": round(total_mb, 1),
        "remaining_mb": round(remaining_mb, 1),
        "usage_pct": round(usage_pct, 1),
        "daily_kb": round(daily_total / 1024, 1),
        "ok": remaining_mb > 0,
    }

# calculate_sim_budget(meters_per_gateway=50, days=3650, bytes_per_day=3200)
# → {"total_mb": 11.2, "remaining_mb": 488.8, "usage_pct": 2.2, "daily_kb": 3.1, "ok": True}
# → 488MB buffer efter 10 år — meget god margen

Konklusion

MessagePack reducerer payload fra JSON-størrelse med 35-40%. zlib level 6 giver yderligere 60-80% komprimering ved repetitive strukturer som 50-måler-payloads. Samlet resultat: ~3KB pr. dag mod ~170KB ukomprimeret JSON — en reduktion på 98%. Med 1NCE 500MB/10-år SIM giver dette 488MB buffer over gateway-livstiden.

Se 4G IoT SIM guide eller MQTT QoS guide.