· MQTT· QoS· IoT· paho-mqtt· Python· Mosquitto· retain· last will· gateway
MQTT QoS niveauer — guide til IoT gateway implementering
MQTT QoS 0/1/2 forklaret til IoT: hvornår bruge hvert niveau, retain flag, last will, clean session og implementering med paho-mqtt i Python.
Af M-Bus Gateway
M-Bus Gateway bruger MQTT QoS 1 til aflæsningsdata og QoS 0 til heartbeats. Her er rationalet og implementeringen.
QoS niveauer: Hvad garanterer de?
QoS 0 — At most once (fire and forget):
→ Ingen kvittering fra broker
→ Kan tabe beskeder ved netværksproblemer
→ Mindst overhead
→ Brug til: Heartbeats, status-opdateringer (OK at tabe)
QoS 1 — At least once (guaranteed delivery):
→ Broker sender PUBACK-kvittering
→ Publisher gemmer besked til PUBACK modtaget
→ Kan levere dubletter (ved PUBACK-tab)
→ Modtager skal idempotent håndtere dubletter
→ Brug til: Aflæsningsdata (skal frem, dubletter OK)
QoS 2 — Exactly once (4-way handshake):
→ PUBLISH → PUBREC → PUBREL → PUBCOMP
→ Garanterer præcis én levering
→ Dobbelt overhead vs QoS 1
→ Brug til: Betalingstransaktioner, OTA-kommandoer
Tommelfingerregel for IoT:
QoS 0: Hyppige, tabbare beskeder (>1/min)
QoS 1: Vigtige beskeder med idempotent modtager
QoS 2: Kritiske, ikke-idempotente operationer
Gateway MQTT-klient med paho-mqtt
# gateway/src/mqtt/client.py
import paho.mqtt.client as mqtt
import ssl
import json
import msgpack
from pathlib import Path
class MBusGatewayMQTT:
def __init__(self, config: dict):
self.gateway_id = config["GATEWAY_ID"]
self.host = config["MQTT_HOST"]
self.port = int(config.get("MQTT_PORT", 8883))
self.client = mqtt.Client(
client_id=self.gateway_id,
clean_session=False, # Persist subscriptions og QoS1-queue ved reconnect
protocol=mqtt.MQTTv311,
)
self._configure_tls(config)
self._configure_will()
self._configure_callbacks()
def _configure_tls(self, config: dict):
"""Mutual TLS: server-cert + client-cert/key."""
cert_dir = Path("/etc/mbus-gateway/certs")
self.client.tls_set(
ca_certs=str(cert_dir / "ca.crt"),
certfile=str(cert_dir / f"{self.gateway_id}.crt"),
keyfile=str(cert_dir / f"{self.gateway_id}.key"),
tls_version=ssl.PROTOCOL_TLS_CLIENT,
)
self.client.tls_insecure_set(False)
def _configure_will(self):
"""
Last Will: Broker sender denne besked automatisk
hvis gateway-forbindelsen mistes uventet.
"""
will_payload = json.dumps({
"gateway_id": self.gateway_id,
"online": False,
"reason": "unexpected_disconnect",
})
self.client.will_set(
topic=f"meters/{self.gateway_id}/status",
payload=will_payload.encode(),
qos=1,
retain=True, # Broker husker status til næste subscriber
)
def _configure_callbacks(self):
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_publish = self._on_publish
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
# Subscribe til kommandoer fra server:
self.client.subscribe(f"meters/{self.gateway_id}/cmd", qos=1)
self.client.subscribe(f"meters/{self.gateway_id}/ota", qos=2)
# Send online-status med retain:
self._publish_status(online=True)
else:
# rc != 0 → auth-fejl eller forbindelsesproblem
pass
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
# Uventet disconnect — paho-mqtt re-connecter automatisk
pass
def _on_publish(self, client, userdata, mid):
# mid er message-ID — bruges til at tracke QoS1-kvitteringer
pass
def publish_readings(self, payload: bytes):
"""
Daglig aflæsningspayload — QoS 1 (skal frem).
payload: MessagePack+zlib komprimeret bytes.
"""
result = self.client.publish(
topic=f"meters/{self.gateway_id}/data",
payload=payload,
qos=1,
retain=False, # Historisk data — ingen grund til retain
)
result.wait_for_publish(timeout=30.0) # Blokér til PUBACK modtaget
return result.is_published()
def publish_alarm(self, alarm: dict):
"""Alarm — QoS 1, retain=True (server ser seneste alarm ved reconnect)."""
payload = json.dumps(alarm).encode()
self.client.publish(
topic=f"meters/{self.gateway_id}/alarm",
payload=payload,
qos=1,
retain=True,
)
def _publish_status(self, online: bool, signal_dbm: int = 0):
"""Heartbeat — QoS 1, retain=True."""
payload = json.dumps({
"gateway_id": self.gateway_id,
"online": online,
"signal_dbm": signal_dbm,
}).encode()
self.client.publish(
topic=f"meters/{self.gateway_id}/status",
payload=payload,
qos=1,
retain=True,
)
Retain flag: Seneste kendte tilstand
Retain = True på et topic:
→ Broker gemmer seneste besked på topic
→ Ny subscriber modtager straks seneste besked (ikke venter)
→ Bruges til: Status, alarmer, konfiguration
Eksempel:
Gateway sender: status online=True (retain=True)
Server genstarter og subscriber på ny
→ Modtager straks "online=True" fra retained message
→ Ingen race condition — server ved altid aktuel status
Retain = False:
→ Besked leveres kun til aktive subscribers
→ Historiske beskeder modtages ikke
→ Bruges til: Aflæsningsdata, OTA-kommandoer
Mosquitto: QoS og persistent sessions
# /etc/mosquitto/mosquitto.conf
# Persistent sessions (clean_session=False):
persistence true
persistence_location /var/lib/mosquitto/
# QoS 1/2 beskeder gemmes til offline klient:
max_queued_messages 1000
queue_qos0_messages false # QoS 0 gemmes IKKE (fire-and-forget)
# Slet sessions efter 7 dage inaktivitet:
persistent_client_expiration 7d
# Maks beskedstørrelse (25KB er nok til komprimeret payload):
message_size_limit 65536
Server-side QoS-validering
# server/src/mqtt/subscriber.py
import paho.mqtt.client as mqtt
import structlog
logger = structlog.get_logger()
def on_message(client, userdata, message: mqtt.MQTTMessage):
"""
Server modtager beskeder fra gateways.
QoS 1: Kan modtage dubletter — brug upsert (idempotent).
"""
topic = message.topic
qos = message.qos
retain = message.retain
if "/data" in topic:
# QoS 1 aflæsningsdata — upsert er idempotent:
handle_readings(message.payload)
elif "/status" in topic:
# QoS 1 retain — kan være gensendt ved reconnect:
handle_status(message.payload)
elif "/alarm" in topic:
handle_alarm(message.payload)
logger.debug("mqtt_message", topic=topic, qos=qos, retain=retain, size=len(message.payload))
OTA via QoS 2
# server/src/ota/router.py
# OTA-kommandoer bruger QoS 2 — præcis én levering:
def trigger_ota(gateway_id: str, firmware_url: str, sha256: str, version: str):
payload = json.dumps({
"url": firmware_url,
"sha256": sha256,
"version": version,
}).encode()
mqtt_client.publish(
topic=f"meters/{gateway_id}/ota",
payload=payload,
qos=2, # Exactlly once — OTA må ikke trigges to gange
retain=False,
)
Konklusion
MQTT QoS-niveauet skal matche beskedens kritikalitet: QoS 0 til heartbeats, QoS 1 til aflæsningsdata (idempotent upsert håndterer dubletter), QoS 2 til OTA-kommandoer. Retain-flaget sikrer at server altid kender gateway-status — selv efter genstart. clean_session=False bevarer QoS 1-queue mens gateway er offline.