M-Bus Gateway
← Tilbage til blog
· Redis· Streams· IoT· events· Python· FastAPI· Celery· backend

Redis Streams til IoT event-processing — komplet guide

Redis Streams til IoT event-processing: XADD, XREAD, consumer groups, acknowledgement, dead letter pattern og integration med FastAPI og Celery til gateway-events.

Af M-Bus Gateway

Redis Streams er ideelle til IoT event-processing: persistente, consumer-groups og automatisk acknowledgement. Her er mønstrene til gateway-data.


Streams vs. Pub/Sub vs. Lists

# Redis Pub/Sub: Fire-and-forget — ingen persistence
# Subscriber offline = beskeder mistes
r.publish("gateway:events", json.dumps(event))  # Mistes ved offline consumer

# Redis Lists (LPUSH/BRPOP): Simpel kø — ingen consumer groups
r.lpush("gateway:queue", json.dumps(event))      # Kun én consumer pr. besked

# Redis Streams (XADD): Persistent + consumer groups
# - Beskeder gemmes med auto-genereret ID (timestamp-baseret)
# - Flere consumer groups kan læse samme stream uafhængigt
# - ACK-mekanisme: Besked forbliver "pending" til den kvitteres
r.xadd("gateway:readings", {"payload": json.dumps(event)})

Producer: Gateway-events til stream

# server/src/mqtt/handlers.py

import redis.asyncio as aioredis
import msgpack
import json
from datetime import datetime

redis_client: aioredis.Redis | None = None


async def get_redis() -> aioredis.Redis:
    global redis_client
    if redis_client is None:
        redis_client = aioredis.from_url(
            "redis://localhost:6379",
            decode_responses=False,  # Binary for msgpack
        )
    return redis_client


async def publish_gateway_reading(gateway_id: str, payload: bytes) -> str:
    """Publicér gateway-aflæsning til Redis Stream."""
    r = await get_redis()

    # Decode msgpack payload fra MQTT
    data = msgpack.unpackb(payload, raw=False)

    # XADD: Tilføj til stream med auto-ID (timestamp-baseret)
    stream_id = await r.xadd(
        "gateway:readings",
        {
            "gateway_id": gateway_id,
            "received_at": datetime.utcnow().isoformat(),
            "payload": json.dumps(data),
        },
        maxlen=100_000,  # Behold max 100.000 beskeder (ca. 2.7 år ved 100 gateways)
        approximate=True,  # ~ prefix: mere effektivt
    )
    return stream_id.decode()


async def publish_gateway_alarm(gateway_id: str, alarm: dict) -> str:
    """Alarmstream er separat fra readings."""
    r = await get_redis()
    return await r.xadd(
        "gateway:alarms",
        {
            "gateway_id": gateway_id,
            "alarm_type": alarm.get("type", "unknown"),
            "severity": alarm.get("severity", "warning"),
            "data": json.dumps(alarm),
            "timestamp": datetime.utcnow().isoformat(),
        },
        maxlen=10_000,
    )

Consumer group: Parallel processing

# server/src/workers/stream_consumer.py

import asyncio
import redis.asyncio as aioredis
import json
import structlog

log = structlog.get_logger()

READING_STREAM = "gateway:readings"
CONSUMER_GROUP = "reading-processors"
CONSUMER_NAME = f"worker-{os.getpid()}"  # Unik pr. worker-process


async def setup_consumer_group(r: aioredis.Redis) -> None:
    """Opret consumer group — idempotent."""
    try:
        await r.xgroup_create(
            READING_STREAM,
            CONSUMER_GROUP,
            id="0",          # Læs fra begyndelsen ved opstart
            mkstream=True,   # Opret stream hvis den ikke eksisterer
        )
    except aioredis.ResponseError as e:
        if "BUSYGROUP" not in str(e):
            raise  # Ignorer "group already exists"


async def consume_readings(r: aioredis.Redis) -> None:
    """Læs og processer readings fra stream med consumer group."""
    await setup_consumer_group(r)

    while True:
        try:
            # XREADGROUP: Blokerende læsning med 5s timeout
            messages = await r.xreadgroup(
                groupname=CONSUMER_GROUP,
                consumername=CONSUMER_NAME,
                streams={READING_STREAM: ">"},  # ">" = ulæste beskeder
                count=10,          # Max 10 beskeder pr. batch
                block=5000,        # Block 5 sekunder hvis ingen beskeder
            )

            if not messages:
                continue

            for stream_name, stream_messages in messages:
                for msg_id, fields in stream_messages:
                    await process_reading(r, msg_id, fields)

        except asyncio.CancelledError:
            break
        except Exception as exc:
            log.error("Stream consumer error", exc=str(exc))
            await asyncio.sleep(1)


async def process_reading(
    r: aioredis.Redis,
    msg_id: bytes,
    fields: dict[bytes, bytes],
) -> None:
    """Processér én reading og ACK ved succes."""
    try:
        gateway_id = fields[b"gateway_id"].decode()
        payload = json.loads(fields[b"payload"])

        # Gem i database
        from server.src.mqtt.ingest import ingest_gateway_data
        await ingest_gateway_data(gateway_id, payload)

        # ACK: Markér som processeret
        await r.xack(READING_STREAM, CONSUMER_GROUP, msg_id)
        log.info("Reading processed", gateway_id=gateway_id, msg_id=msg_id.decode())

    except Exception as exc:
        log.error("Failed to process reading",
                  msg_id=msg_id.decode(), exc=str(exc))
        # Besked forbliver "pending" → håndteres af claim_idle_messages

Dead letter pattern: Håndtér fejlede beskeder

# server/src/workers/stream_consumer.py (fortsat)

DEAD_LETTER_STREAM = "gateway:readings:dead-letter"
MAX_DELIVERY_COUNT = 3


async def claim_idle_messages(r: aioredis.Redis) -> None:
    """
    Kræv beskeder der har været pending i > 60 sekunder.
    Kører periodisk (f.eks. hvert minut).
    """
    try:
        # XAUTOCLAIM: Kræv idle messages automatisk (Redis 7.0+)
        result = await r.xautoclaim(
            READING_STREAM,
            CONSUMER_GROUP,
            CONSUMER_NAME,
            min_idle_time=60_000,  # 60 sekunder (millisekunder)
            start_id="0-0",
            count=100,
        )
        claimed_messages = result[1]  # [(msg_id, fields), ...]

        for msg_id, fields in claimed_messages:
            # Tjek delivery count via XPENDING
            pending_info = await r.xpending_range(
                READING_STREAM, CONSUMER_GROUP,
                min=msg_id, max=msg_id, count=1
            )
            if pending_info and pending_info[0]["times_delivered"] >= MAX_DELIVERY_COUNT:
                # Flyt til dead letter stream
                await r.xadd(DEAD_LETTER_STREAM, {
                    **fields,
                    b"original_id": msg_id,
                    b"failure_reason": b"max_retries_exceeded",
                })
                await r.xack(READING_STREAM, CONSUMER_GROUP, msg_id)
                log.warning("Message moved to dead letter", msg_id=msg_id.decode())
            else:
                # Forsøg igen
                await process_reading(r, msg_id, fields)

    except Exception as exc:
        log.error("Claim idle messages failed", exc=str(exc))

Celery integration

# server/src/workers/tasks/stream.py

from server.src.celery_app import celery_app
from celery.schedules import crontab

@celery_app.task(name="streams.process_pending")
def process_pending_stream_messages():
    """Celery task: Kræv og retry pending stream-beskeder (kører hvert minut)."""
    import asyncio
    from server.src.workers.stream_consumer import claim_idle_messages
    from server.src.cache import get_sync_redis

    r = get_sync_redis()
    asyncio.run(claim_idle_messages(r))


# Celery beat schedule:
CELERYBEAT_SCHEDULE = {
    "process-pending-stream-messages": {
        "task": "streams.process_pending",
        "schedule": crontab(minute="*"),  # Hvert minut
    },
}

Stream monitoring endpoint

# server/src/monitoring/router.py

@router.get("/streams/status")
async def stream_status():
    """Overblik over stream-sundhed til operations dashboard."""
    r = aioredis.from_url("redis://localhost:6379", decode_responses=True)

    info = await r.xinfo_stream("gateway:readings")
    groups = await r.xinfo_groups("gateway:readings")

    pending_total = sum(g["pending"] for g in groups)

    return {
        "stream_length": info["length"],
        "oldest_entry_ms": info.get("first-entry", [None])[0],
        "consumer_groups": [
            {
                "name": g["name"],
                "pending": g["pending"],
                "consumers": g["consumers"],
            }
            for g in groups
        ],
        "total_pending": pending_total,
        "health": "ok" if pending_total < 1000 else "degraded",
    }

Konklusion

Redis Streams er overlegne til IoT event-processing sammenlignet med Pub/Sub (ingen persistence) og Lists (ingen consumer groups). Consumer groups muliggør parallel processing med garanteret delivery. Dead letter pattern håndterer persistente fejl uden at blokere streamen. Brug XAUTOCLAIM til automatisk at kræve idle messages og maxlen til at begrænse stream-størrelsen.

Se Redis caching FastAPI guide eller Celery beat guide.