M-Bus Gateway
← Tilbage til blog
· MQTT· QoS· IoT· paho-mqtt· Python· Mosquitto· retain· last will· gateway

MQTT QoS niveauer — guide til IoT gateway implementering

MQTT QoS 0/1/2 forklaret til IoT: hvornår bruge hvert niveau, retain flag, last will, clean session og implementering med paho-mqtt i Python.

Af M-Bus Gateway

M-Bus Gateway bruger MQTT QoS 1 til aflæsningsdata og QoS 0 til heartbeats. Her er rationalet og implementeringen.


QoS niveauer: Hvad garanterer de?

QoS 0 — At most once (fire and forget):
  → Ingen kvittering fra broker
  → Kan tabe beskeder ved netværksproblemer
  → Mindst overhead
  → Brug til: Heartbeats, status-opdateringer (OK at tabe)

QoS 1 — At least once (guaranteed delivery):
  → Broker sender PUBACK-kvittering
  → Publisher gemmer besked til PUBACK modtaget
  → Kan levere dubletter (ved PUBACK-tab)
  → Modtager skal idempotent håndtere dubletter
  → Brug til: Aflæsningsdata (skal frem, dubletter OK)

QoS 2 — Exactly once (4-way handshake):
  → PUBLISH → PUBREC → PUBREL → PUBCOMP
  → Garanterer præcis én levering
  → Dobbelt overhead vs QoS 1
  → Brug til: Betalingstransaktioner, OTA-kommandoer

Tommelfingerregel for IoT:
  QoS 0: Hyppige, tabbare beskeder (>1/min)
  QoS 1: Vigtige beskeder med idempotent modtager
  QoS 2: Kritiske, ikke-idempotente operationer

Gateway MQTT-klient med paho-mqtt

# gateway/src/mqtt/client.py
import paho.mqtt.client as mqtt
import ssl
import json
import msgpack
from pathlib import Path

class MBusGatewayMQTT:
    def __init__(self, config: dict):
        self.gateway_id = config["GATEWAY_ID"]
        self.host = config["MQTT_HOST"]
        self.port = int(config.get("MQTT_PORT", 8883))

        self.client = mqtt.Client(
            client_id=self.gateway_id,
            clean_session=False,  # Persist subscriptions og QoS1-queue ved reconnect
            protocol=mqtt.MQTTv311,
        )

        self._configure_tls(config)
        self._configure_will()
        self._configure_callbacks()

    def _configure_tls(self, config: dict):
        """Mutual TLS: server-cert + client-cert/key."""
        cert_dir = Path("/etc/mbus-gateway/certs")
        self.client.tls_set(
            ca_certs=str(cert_dir / "ca.crt"),
            certfile=str(cert_dir / f"{self.gateway_id}.crt"),
            keyfile=str(cert_dir / f"{self.gateway_id}.key"),
            tls_version=ssl.PROTOCOL_TLS_CLIENT,
        )
        self.client.tls_insecure_set(False)

    def _configure_will(self):
        """
        Last Will: Broker sender denne besked automatisk
        hvis gateway-forbindelsen mistes uventet.
        """
        will_payload = json.dumps({
            "gateway_id": self.gateway_id,
            "online": False,
            "reason": "unexpected_disconnect",
        })
        self.client.will_set(
            topic=f"meters/{self.gateway_id}/status",
            payload=will_payload.encode(),
            qos=1,
            retain=True,  # Broker husker status til næste subscriber
        )

    def _configure_callbacks(self):
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect
        self.client.on_publish = self._on_publish

    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            # Subscribe til kommandoer fra server:
            self.client.subscribe(f"meters/{self.gateway_id}/cmd", qos=1)
            self.client.subscribe(f"meters/{self.gateway_id}/ota", qos=2)
            # Send online-status med retain:
            self._publish_status(online=True)
        else:
            # rc != 0 → auth-fejl eller forbindelsesproblem
            pass

    def _on_disconnect(self, client, userdata, rc):
        if rc != 0:
            # Uventet disconnect — paho-mqtt re-connecter automatisk
            pass

    def _on_publish(self, client, userdata, mid):
        # mid er message-ID — bruges til at tracke QoS1-kvitteringer
        pass

    def publish_readings(self, payload: bytes):
        """
        Daglig aflæsningspayload — QoS 1 (skal frem).
        payload: MessagePack+zlib komprimeret bytes.
        """
        result = self.client.publish(
            topic=f"meters/{self.gateway_id}/data",
            payload=payload,
            qos=1,
            retain=False,  # Historisk data — ingen grund til retain
        )
        result.wait_for_publish(timeout=30.0)  # Blokér til PUBACK modtaget
        return result.is_published()

    def publish_alarm(self, alarm: dict):
        """Alarm — QoS 1, retain=True (server ser seneste alarm ved reconnect)."""
        payload = json.dumps(alarm).encode()
        self.client.publish(
            topic=f"meters/{self.gateway_id}/alarm",
            payload=payload,
            qos=1,
            retain=True,
        )

    def _publish_status(self, online: bool, signal_dbm: int = 0):
        """Heartbeat — QoS 1, retain=True."""
        payload = json.dumps({
            "gateway_id": self.gateway_id,
            "online": online,
            "signal_dbm": signal_dbm,
        }).encode()
        self.client.publish(
            topic=f"meters/{self.gateway_id}/status",
            payload=payload,
            qos=1,
            retain=True,
        )

Retain flag: Seneste kendte tilstand

Retain = True på et topic:
  → Broker gemmer seneste besked på topic
  → Ny subscriber modtager straks seneste besked (ikke venter)
  → Bruges til: Status, alarmer, konfiguration

Eksempel:
  Gateway sender: status online=True (retain=True)
  Server genstarter og subscriber på ny
  → Modtager straks "online=True" fra retained message
  → Ingen race condition — server ved altid aktuel status

Retain = False:
  → Besked leveres kun til aktive subscribers
  → Historiske beskeder modtages ikke
  → Bruges til: Aflæsningsdata, OTA-kommandoer

Mosquitto: QoS og persistent sessions

# /etc/mosquitto/mosquitto.conf

# Persistent sessions (clean_session=False):
persistence true
persistence_location /var/lib/mosquitto/

# QoS 1/2 beskeder gemmes til offline klient:
max_queued_messages 1000
queue_qos0_messages false  # QoS 0 gemmes IKKE (fire-and-forget)

# Slet sessions efter 7 dage inaktivitet:
persistent_client_expiration 7d

# Maks beskedstørrelse (25KB er nok til komprimeret payload):
message_size_limit 65536

Server-side QoS-validering

# server/src/mqtt/subscriber.py
import paho.mqtt.client as mqtt
import structlog

logger = structlog.get_logger()

def on_message(client, userdata, message: mqtt.MQTTMessage):
    """
    Server modtager beskeder fra gateways.
    QoS 1: Kan modtage dubletter — brug upsert (idempotent).
    """
    topic = message.topic
    qos = message.qos
    retain = message.retain

    if "/data" in topic:
        # QoS 1 aflæsningsdata — upsert er idempotent:
        handle_readings(message.payload)

    elif "/status" in topic:
        # QoS 1 retain — kan være gensendt ved reconnect:
        handle_status(message.payload)

    elif "/alarm" in topic:
        handle_alarm(message.payload)

    logger.debug("mqtt_message", topic=topic, qos=qos, retain=retain, size=len(message.payload))

OTA via QoS 2

# server/src/ota/router.py
# OTA-kommandoer bruger QoS 2 — præcis én levering:

def trigger_ota(gateway_id: str, firmware_url: str, sha256: str, version: str):
    payload = json.dumps({
        "url": firmware_url,
        "sha256": sha256,
        "version": version,
    }).encode()

    mqtt_client.publish(
        topic=f"meters/{gateway_id}/ota",
        payload=payload,
        qos=2,   # Exactlly once — OTA må ikke trigges to gange
        retain=False,
    )

Konklusion

MQTT QoS-niveauet skal matche beskedens kritikalitet: QoS 0 til heartbeats, QoS 1 til aflæsningsdata (idempotent upsert håndterer dubletter), QoS 2 til OTA-kommandoer. Retain-flaget sikrer at server altid kender gateway-status — selv efter genstart. clean_session=False bevarer QoS 1-queue mens gateway er offline.

Se Mosquitto TLS guide eller gateway fejlsøgning.