M-Bus Gateway
← Tilbage til blog
· MQTT· retain· last will· LWT· IoT· gateway· Mosquitto· Python· heartbeat

MQTT retain og last will — IoT gateway patterns

MQTT retain flag og Last Will and Testament (LWT) til IoT gateways: gateway online/offline detektion, status topics, retained heartbeat og Mosquitto konfiguration.

Af M-Bus Gateway

MQTT retain og Last Will and Testament (LWT) er to kritiske mekanismer til pålidelig IoT gateway status-tracking. Her er implementeringen i M-Bus Gateway platformen.


Retain flag — persisted status

# gateway/src/mqtt/client.py

import json
from datetime import datetime, timezone

def publish_online_status(client, gateway_id: str) -> None:
    """
    Publish retained status-besked.
    Nye subscribers modtager øjeblikkeligt den seneste status.
    """
    status = {
        "gateway_id": gateway_id,
        "status": "online",
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "firmware": "1.4.2",
        "uptime_seconds": get_uptime_seconds(),
    }
    client.publish(
        topic=f"meters/{gateway_id}/status",
        payload=json.dumps(status),
        qos=1,
        retain=True,  # Broker gemmer besked til nye subscribers
    )

Last Will and Testament (LWT)

# Sættes FØR connect() — brokerens ansvar ved uventet disconnect

def setup_last_will(client, gateway_id: str) -> None:
    """
    LWT: Broker sender denne besked automatisk hvis gateway
    disconnecter unormalt (netværksfejl, strømsvigt, crash).
    """
    will_payload = json.dumps({
        "gateway_id": gateway_id,
        "status": "offline",
        "reason": "unexpected_disconnect",
        "timestamp": None,  # Broker kender ikke den præcise tid
    })
    client.will_set(
        topic=f"meters/{gateway_id}/status",
        payload=will_payload,
        qos=1,
        retain=True,  # Offline-status persisteres indtil gateway reconnecter
    )

# Fuld forbindelses-opsætning:
def connect_with_lwt(config: GatewayConfig) -> mqtt.Client:
    client = mqtt.Client(
        client_id=config.gateway_id,
        protocol=mqtt.MQTTv5,
        clean_session=False,  # Bevar QoS 1 queue ved reconnect
    )
    setup_last_will(client, config.gateway_id)  # Sæt LWT FØR connect
    client.tls_set(ca_certs=config.ca_cert, certfile=config.cert, keyfile=config.key)
    client.connect(config.mqtt_host, config.mqtt_port, keepalive=60)
    return client

Heartbeat every 5 minutes

# gateway/src/mqtt/heartbeat.py

import asyncio
import json
from datetime import datetime, timezone

HEARTBEAT_INTERVAL = 300  # 5 minutter

async def heartbeat_loop(client, gateway_id: str, config: dict) -> None:
    """Kontinuerlig heartbeat — opdaterer retained status."""
    while True:
        payload = {
            "gateway_id": gateway_id,
            "status": "online",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "rssi_4g": get_signal_strength(),
            "ram_free_mb": get_free_ram_mb(),
            "temperature_c": get_cpu_temp(),
            "uptime_s": get_uptime_seconds(),
            "readings_today": get_reading_count_today(),
        }
        client.publish(
            f"meters/{gateway_id}/status",
            json.dumps(payload),
            qos=1,
            retain=True,
        )
        await asyncio.sleep(HEARTBEAT_INTERVAL)

Server-side: subscriber med stale detection

# server/src/mqtt/subscriber.py

import json
from datetime import datetime, timezone, timedelta

STALE_THRESHOLD = timedelta(hours=36)

async def on_status_message(msg: mqtt.MQTTMessage, session: AsyncSession) -> None:
    """
    Opdatér gateway last_seen_at og online-status fra MQTT status topic.
    """
    try:
        data = json.loads(msg.payload)
    except json.JSONDecodeError:
        return

    gateway_id = data.get("gateway_id")
    status = data.get("status", "unknown")

    stmt = (
        update(Gateway)
        .where(Gateway.id == gateway_id)
        .values(
            last_seen_at=datetime.now(timezone.utc),
            online=status == "online",
            firmware_version=data.get("firmware"),
            last_status_payload=data,
        )
        .returning(Gateway)
    )
    await session.execute(stmt)
    await session.commit()

async def check_stale_gateways(session: AsyncSession) -> list[str]:
    """
    Find gateways der ikke har sendt heartbeat i 36+ timer.
    Køres af Celery beat hvert 30. minut.
    """
    threshold = datetime.now(timezone.utc) - STALE_THRESHOLD
    result = await session.execute(
        select(Gateway)
        .where(Gateway.last_seen_at < threshold)
        .where(Gateway.online == True)  # Var online sidst
        .where(Gateway.deleted_at.is_(None))
    )
    stale = result.scalars().all()

    # Opdatér online-flag og send alarm
    for gw in stale:
        gw.online = False
        await send_gateway_alarm(gw.tenant_id, gw.id, "stale")

    await session.commit()
    return [str(gw.id) for gw in stale]

Mosquitto: retained message cleanup

# retained messages persisteres i Mosquitto persistence fil
# Se alle retained messages på et topic:
mosquitto_sub -h localhost -p 1883 \
  -t 'meters/+/status' \
  --retained-only \
  -C 100 \
  -W 5

# Ryd retained message (publish tom payload):
mosquitto_pub -h localhost -p 1883 \
  -t 'meters/GW-0001/status' \
  -n \
  --retain

# I Python:
client.publish("meters/GW-0001/status", payload=None, retain=True)

Mosquitto persistence konfiguration

# /etc/mosquitto/mosquitto.conf

# Persist retained messages til disk ved genstart:
persistence true
persistence_location /var/lib/mosquitto/

# QoS 1 in-flight messages og subscriptions:
persistence_db_mode mqtt5
autosave_interval 1800  # Gem til disk hvert 30. min

# Max antal retained messages:
max_inflight_messages 100
max_queued_messages 1000

# Keepalive: Broker markerer client disconnected hvis ingen PINGREQ:
# Gateway keepalive=60s → broker timeout efter 60*1.5=90s
# LWT sendes af broker efter 90s uden kommunikation

Clean session vs. persistent session

# clean_session=True (standard):
# → Broker glemmer subscriptions og QoS 1 queue ved disconnect
# → Gateway starter frisk ved reconnect
# → Mister QoS 1 beskeder sendt mens offline

# clean_session=False (persistent session):
# → Broker gemmer subscriptions og buffer QoS 1 beskeder
# → Gateway modtager bufferede OTA/kommando-beskeder ved reconnect
# → Kræver persistent Mosquitto konfiguration

# M-Bus Gateway bruger clean_session=False for at modtage
# OTA-kommandoer der blev sendt mens gateway var offline:
client = mqtt.Client(
    client_id=config.gateway_id,
    clean_session=False,  # Persistent session
    protocol=mqtt.MQTTv5,
)

Platform UI: Gateway online-status

// ui/src/pages/portal/Gateways.tsx (uddrag)

function GatewayStatusBadge({ gateway }: { gateway: Gateway }) {
  const isStale = gateway.last_seen_at &&
    new Date(gateway.last_seen_at) < new Date(Date.now() - 36 * 3600 * 1000);

  if (!gateway.online || isStale) {
    return <span className="badge badge-red">Offline</span>;
  }
  const minutesAgo = Math.floor(
    (Date.now() - new Date(gateway.last_seen_at!).getTime()) / 60000
  );
  return (
    <span className="badge badge-green">
      Online · {minutesAgo}m siden
    </span>
  );
}

Konklusion

MQTT retain kombineret med LWT giver pålidelig gateway status-tracking: retain sikrer at nye subscribers straks ser seneste status, LWT sikrer at brokerens "offline"-besked publiceres automatisk ved uventet disconnect. clean_session=False sikrer at OTA-kommandoer og andre QoS 1 beskeder buffereres til gateway er online igen. Server-side stale detection (36t) trigger alarm uanset LWT.

Se Mosquitto MQTT broker guide eller MQTT QoS guide.