· MQTT· retain· last will· LWT· IoT· gateway· Mosquitto· Python· heartbeat
MQTT retain og last will — IoT gateway patterns
MQTT retain flag og Last Will and Testament (LWT) til IoT gateways: gateway online/offline detektion, status topics, retained heartbeat og Mosquitto konfiguration.
Af M-Bus Gateway
MQTT retain og Last Will and Testament (LWT) er to kritiske mekanismer til pålidelig IoT gateway status-tracking. Her er implementeringen i M-Bus Gateway platformen.
Retain flag — persisted status
# gateway/src/mqtt/client.py
import json
from datetime import datetime, timezone
def publish_online_status(client, gateway_id: str) -> None:
"""
Publish retained status-besked.
Nye subscribers modtager øjeblikkeligt den seneste status.
"""
status = {
"gateway_id": gateway_id,
"status": "online",
"timestamp": datetime.now(timezone.utc).isoformat(),
"firmware": "1.4.2",
"uptime_seconds": get_uptime_seconds(),
}
client.publish(
topic=f"meters/{gateway_id}/status",
payload=json.dumps(status),
qos=1,
retain=True, # Broker gemmer besked til nye subscribers
)
Last Will and Testament (LWT)
# Sættes FØR connect() — brokerens ansvar ved uventet disconnect
def setup_last_will(client, gateway_id: str) -> None:
"""
LWT: Broker sender denne besked automatisk hvis gateway
disconnecter unormalt (netværksfejl, strømsvigt, crash).
"""
will_payload = json.dumps({
"gateway_id": gateway_id,
"status": "offline",
"reason": "unexpected_disconnect",
"timestamp": None, # Broker kender ikke den præcise tid
})
client.will_set(
topic=f"meters/{gateway_id}/status",
payload=will_payload,
qos=1,
retain=True, # Offline-status persisteres indtil gateway reconnecter
)
# Fuld forbindelses-opsætning:
def connect_with_lwt(config: GatewayConfig) -> mqtt.Client:
client = mqtt.Client(
client_id=config.gateway_id,
protocol=mqtt.MQTTv5,
clean_session=False, # Bevar QoS 1 queue ved reconnect
)
setup_last_will(client, config.gateway_id) # Sæt LWT FØR connect
client.tls_set(ca_certs=config.ca_cert, certfile=config.cert, keyfile=config.key)
client.connect(config.mqtt_host, config.mqtt_port, keepalive=60)
return client
Heartbeat every 5 minutes
# gateway/src/mqtt/heartbeat.py
import asyncio
import json
from datetime import datetime, timezone
HEARTBEAT_INTERVAL = 300 # 5 minutter
async def heartbeat_loop(client, gateway_id: str, config: dict) -> None:
"""Kontinuerlig heartbeat — opdaterer retained status."""
while True:
payload = {
"gateway_id": gateway_id,
"status": "online",
"timestamp": datetime.now(timezone.utc).isoformat(),
"rssi_4g": get_signal_strength(),
"ram_free_mb": get_free_ram_mb(),
"temperature_c": get_cpu_temp(),
"uptime_s": get_uptime_seconds(),
"readings_today": get_reading_count_today(),
}
client.publish(
f"meters/{gateway_id}/status",
json.dumps(payload),
qos=1,
retain=True,
)
await asyncio.sleep(HEARTBEAT_INTERVAL)
Server-side: subscriber med stale detection
# server/src/mqtt/subscriber.py
import json
from datetime import datetime, timezone, timedelta
STALE_THRESHOLD = timedelta(hours=36)
async def on_status_message(msg: mqtt.MQTTMessage, session: AsyncSession) -> None:
"""
Opdatér gateway last_seen_at og online-status fra MQTT status topic.
"""
try:
data = json.loads(msg.payload)
except json.JSONDecodeError:
return
gateway_id = data.get("gateway_id")
status = data.get("status", "unknown")
stmt = (
update(Gateway)
.where(Gateway.id == gateway_id)
.values(
last_seen_at=datetime.now(timezone.utc),
online=status == "online",
firmware_version=data.get("firmware"),
last_status_payload=data,
)
.returning(Gateway)
)
await session.execute(stmt)
await session.commit()
async def check_stale_gateways(session: AsyncSession) -> list[str]:
"""
Find gateways der ikke har sendt heartbeat i 36+ timer.
Køres af Celery beat hvert 30. minut.
"""
threshold = datetime.now(timezone.utc) - STALE_THRESHOLD
result = await session.execute(
select(Gateway)
.where(Gateway.last_seen_at < threshold)
.where(Gateway.online == True) # Var online sidst
.where(Gateway.deleted_at.is_(None))
)
stale = result.scalars().all()
# Opdatér online-flag og send alarm
for gw in stale:
gw.online = False
await send_gateway_alarm(gw.tenant_id, gw.id, "stale")
await session.commit()
return [str(gw.id) for gw in stale]
Mosquitto: retained message cleanup
# retained messages persisteres i Mosquitto persistence fil
# Se alle retained messages på et topic:
mosquitto_sub -h localhost -p 1883 \
-t 'meters/+/status' \
--retained-only \
-C 100 \
-W 5
# Ryd retained message (publish tom payload):
mosquitto_pub -h localhost -p 1883 \
-t 'meters/GW-0001/status' \
-n \
--retain
# I Python:
client.publish("meters/GW-0001/status", payload=None, retain=True)
Mosquitto persistence konfiguration
# /etc/mosquitto/mosquitto.conf
# Persist retained messages til disk ved genstart:
persistence true
persistence_location /var/lib/mosquitto/
# QoS 1 in-flight messages og subscriptions:
persistence_db_mode mqtt5
autosave_interval 1800 # Gem til disk hvert 30. min
# Max antal retained messages:
max_inflight_messages 100
max_queued_messages 1000
# Keepalive: Broker markerer client disconnected hvis ingen PINGREQ:
# Gateway keepalive=60s → broker timeout efter 60*1.5=90s
# LWT sendes af broker efter 90s uden kommunikation
Clean session vs. persistent session
# clean_session=True (standard):
# → Broker glemmer subscriptions og QoS 1 queue ved disconnect
# → Gateway starter frisk ved reconnect
# → Mister QoS 1 beskeder sendt mens offline
# clean_session=False (persistent session):
# → Broker gemmer subscriptions og buffer QoS 1 beskeder
# → Gateway modtager bufferede OTA/kommando-beskeder ved reconnect
# → Kræver persistent Mosquitto konfiguration
# M-Bus Gateway bruger clean_session=False for at modtage
# OTA-kommandoer der blev sendt mens gateway var offline:
client = mqtt.Client(
client_id=config.gateway_id,
clean_session=False, # Persistent session
protocol=mqtt.MQTTv5,
)
Platform UI: Gateway online-status
// ui/src/pages/portal/Gateways.tsx (uddrag)
function GatewayStatusBadge({ gateway }: { gateway: Gateway }) {
const isStale = gateway.last_seen_at &&
new Date(gateway.last_seen_at) < new Date(Date.now() - 36 * 3600 * 1000);
if (!gateway.online || isStale) {
return <span className="badge badge-red">Offline</span>;
}
const minutesAgo = Math.floor(
(Date.now() - new Date(gateway.last_seen_at!).getTime()) / 60000
);
return (
<span className="badge badge-green">
Online · {minutesAgo}m siden
</span>
);
}
Konklusion
MQTT retain kombineret med LWT giver pålidelig gateway status-tracking: retain sikrer at nye subscribers straks ser seneste status, LWT sikrer at brokerens "offline"-besked publiceres automatisk ved uventet disconnect. clean_session=False sikrer at OTA-kommandoer og andre QoS 1 beskeder buffereres til gateway er online igen. Server-side stale detection (36t) trigger alarm uanset LWT.