· Redis· Streams· IoT· events· Python· FastAPI· Celery· backend
Redis Streams til IoT event-processing — komplet guide
Redis Streams til IoT event-processing: XADD, XREAD, consumer groups, acknowledgement, dead letter pattern og integration med FastAPI og Celery til gateway-events.
Af M-Bus Gateway
Redis Streams er ideelle til IoT event-processing: persistente, consumer-groups og automatisk acknowledgement. Her er mønstrene til gateway-data.
Streams vs. Pub/Sub vs. Lists
# Redis Pub/Sub: Fire-and-forget — ingen persistence
# Subscriber offline = beskeder mistes
r.publish("gateway:events", json.dumps(event)) # Mistes ved offline consumer
# Redis Lists (LPUSH/BRPOP): Simpel kø — ingen consumer groups
r.lpush("gateway:queue", json.dumps(event)) # Kun én consumer pr. besked
# Redis Streams (XADD): Persistent + consumer groups
# - Beskeder gemmes med auto-genereret ID (timestamp-baseret)
# - Flere consumer groups kan læse samme stream uafhængigt
# - ACK-mekanisme: Besked forbliver "pending" til den kvitteres
r.xadd("gateway:readings", {"payload": json.dumps(event)})
Producer: Gateway-events til stream
# server/src/mqtt/handlers.py
import redis.asyncio as aioredis
import msgpack
import json
from datetime import datetime
redis_client: aioredis.Redis | None = None
async def get_redis() -> aioredis.Redis:
global redis_client
if redis_client is None:
redis_client = aioredis.from_url(
"redis://localhost:6379",
decode_responses=False, # Binary for msgpack
)
return redis_client
async def publish_gateway_reading(gateway_id: str, payload: bytes) -> str:
"""Publicér gateway-aflæsning til Redis Stream."""
r = await get_redis()
# Decode msgpack payload fra MQTT
data = msgpack.unpackb(payload, raw=False)
# XADD: Tilføj til stream med auto-ID (timestamp-baseret)
stream_id = await r.xadd(
"gateway:readings",
{
"gateway_id": gateway_id,
"received_at": datetime.utcnow().isoformat(),
"payload": json.dumps(data),
},
maxlen=100_000, # Behold max 100.000 beskeder (ca. 2.7 år ved 100 gateways)
approximate=True, # ~ prefix: mere effektivt
)
return stream_id.decode()
async def publish_gateway_alarm(gateway_id: str, alarm: dict) -> str:
"""Alarmstream er separat fra readings."""
r = await get_redis()
return await r.xadd(
"gateway:alarms",
{
"gateway_id": gateway_id,
"alarm_type": alarm.get("type", "unknown"),
"severity": alarm.get("severity", "warning"),
"data": json.dumps(alarm),
"timestamp": datetime.utcnow().isoformat(),
},
maxlen=10_000,
)
Consumer group: Parallel processing
# server/src/workers/stream_consumer.py
import asyncio
import redis.asyncio as aioredis
import json
import structlog
log = structlog.get_logger()
READING_STREAM = "gateway:readings"
CONSUMER_GROUP = "reading-processors"
CONSUMER_NAME = f"worker-{os.getpid()}" # Unik pr. worker-process
async def setup_consumer_group(r: aioredis.Redis) -> None:
"""Opret consumer group — idempotent."""
try:
await r.xgroup_create(
READING_STREAM,
CONSUMER_GROUP,
id="0", # Læs fra begyndelsen ved opstart
mkstream=True, # Opret stream hvis den ikke eksisterer
)
except aioredis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise # Ignorer "group already exists"
async def consume_readings(r: aioredis.Redis) -> None:
"""Læs og processer readings fra stream med consumer group."""
await setup_consumer_group(r)
while True:
try:
# XREADGROUP: Blokerende læsning med 5s timeout
messages = await r.xreadgroup(
groupname=CONSUMER_GROUP,
consumername=CONSUMER_NAME,
streams={READING_STREAM: ">"}, # ">" = ulæste beskeder
count=10, # Max 10 beskeder pr. batch
block=5000, # Block 5 sekunder hvis ingen beskeder
)
if not messages:
continue
for stream_name, stream_messages in messages:
for msg_id, fields in stream_messages:
await process_reading(r, msg_id, fields)
except asyncio.CancelledError:
break
except Exception as exc:
log.error("Stream consumer error", exc=str(exc))
await asyncio.sleep(1)
async def process_reading(
r: aioredis.Redis,
msg_id: bytes,
fields: dict[bytes, bytes],
) -> None:
"""Processér én reading og ACK ved succes."""
try:
gateway_id = fields[b"gateway_id"].decode()
payload = json.loads(fields[b"payload"])
# Gem i database
from server.src.mqtt.ingest import ingest_gateway_data
await ingest_gateway_data(gateway_id, payload)
# ACK: Markér som processeret
await r.xack(READING_STREAM, CONSUMER_GROUP, msg_id)
log.info("Reading processed", gateway_id=gateway_id, msg_id=msg_id.decode())
except Exception as exc:
log.error("Failed to process reading",
msg_id=msg_id.decode(), exc=str(exc))
# Besked forbliver "pending" → håndteres af claim_idle_messages
Dead letter pattern: Håndtér fejlede beskeder
# server/src/workers/stream_consumer.py (fortsat)
DEAD_LETTER_STREAM = "gateway:readings:dead-letter"
MAX_DELIVERY_COUNT = 3
async def claim_idle_messages(r: aioredis.Redis) -> None:
"""
Kræv beskeder der har været pending i > 60 sekunder.
Kører periodisk (f.eks. hvert minut).
"""
try:
# XAUTOCLAIM: Kræv idle messages automatisk (Redis 7.0+)
result = await r.xautoclaim(
READING_STREAM,
CONSUMER_GROUP,
CONSUMER_NAME,
min_idle_time=60_000, # 60 sekunder (millisekunder)
start_id="0-0",
count=100,
)
claimed_messages = result[1] # [(msg_id, fields), ...]
for msg_id, fields in claimed_messages:
# Tjek delivery count via XPENDING
pending_info = await r.xpending_range(
READING_STREAM, CONSUMER_GROUP,
min=msg_id, max=msg_id, count=1
)
if pending_info and pending_info[0]["times_delivered"] >= MAX_DELIVERY_COUNT:
# Flyt til dead letter stream
await r.xadd(DEAD_LETTER_STREAM, {
**fields,
b"original_id": msg_id,
b"failure_reason": b"max_retries_exceeded",
})
await r.xack(READING_STREAM, CONSUMER_GROUP, msg_id)
log.warning("Message moved to dead letter", msg_id=msg_id.decode())
else:
# Forsøg igen
await process_reading(r, msg_id, fields)
except Exception as exc:
log.error("Claim idle messages failed", exc=str(exc))
Celery integration
# server/src/workers/tasks/stream.py
from server.src.celery_app import celery_app
from celery.schedules import crontab
@celery_app.task(name="streams.process_pending")
def process_pending_stream_messages():
"""Celery task: Kræv og retry pending stream-beskeder (kører hvert minut)."""
import asyncio
from server.src.workers.stream_consumer import claim_idle_messages
from server.src.cache import get_sync_redis
r = get_sync_redis()
asyncio.run(claim_idle_messages(r))
# Celery beat schedule:
CELERYBEAT_SCHEDULE = {
"process-pending-stream-messages": {
"task": "streams.process_pending",
"schedule": crontab(minute="*"), # Hvert minut
},
}
Stream monitoring endpoint
# server/src/monitoring/router.py
@router.get("/streams/status")
async def stream_status():
"""Overblik over stream-sundhed til operations dashboard."""
r = aioredis.from_url("redis://localhost:6379", decode_responses=True)
info = await r.xinfo_stream("gateway:readings")
groups = await r.xinfo_groups("gateway:readings")
pending_total = sum(g["pending"] for g in groups)
return {
"stream_length": info["length"],
"oldest_entry_ms": info.get("first-entry", [None])[0],
"consumer_groups": [
{
"name": g["name"],
"pending": g["pending"],
"consumers": g["consumers"],
}
for g in groups
],
"total_pending": pending_total,
"health": "ok" if pending_total < 1000 else "degraded",
}
Konklusion
Redis Streams er overlegne til IoT event-processing sammenlignet med Pub/Sub (ingen persistence) og Lists (ingen consumer groups). Consumer groups muliggør parallel processing med garanteret delivery. Dead letter pattern håndterer persistente fejl uden at blokere streamen. Brug XAUTOCLAIM til automatisk at kræve idle messages og maxlen til at begrænse stream-størrelsen.