Python Redis og aioredis — cache-mønstre til IoT og SaaS
Redis cache-mønstre til Python IoT og SaaS: aioredis async, tenant-scopede cache-nøgler, TTL-strategier, portfolio KPI-caching, pub/sub til MQTT→SSE bro og connection pool tuning.
Af M-Bus Gateway
Redis er ikke kun Celery-backend — det er platformen til IoT-real-time alarmer, portfolio KPI-cache og tenant-isoleret pub/sub. Her er mønstrene.
aioredis — async Redis klient
# server/src/cache/redis_client.py
import redis.asyncio as aioredis
from server.src.config import get_settings
from typing import AsyncGenerator
_redis_pool: aioredis.Redis | None = None
async def get_redis() -> AsyncGenerator[aioredis.Redis, None]:
"""
FastAPI dependency: Leverer aioredis-klient fra connection pool.
Pool oprettes én gang ved app-start (lifespan).
"""
global _redis_pool
if _redis_pool is None:
settings = get_settings()
_redis_pool = aioredis.from_url(
str(settings.redis_url),
max_connections=20,
decode_responses=True, # Returnér str, ikke bytes
socket_timeout=5.0,
socket_connect_timeout=5.0,
)
yield _redis_pool
async def close_redis() -> None:
"""Kald ved app shutdown (lifespan cleanup)."""
global _redis_pool
if _redis_pool:
await _redis_pool.aclose()
_redis_pool = None
Tenant-scopede cache-nøgler
# server/src/cache/keys.py
# Nøgle-schema: {prefix}:{tenant_id}:{resource}:{id}
# Sikrer at tenant A aldrig kan læse tenant B's cache
def portfolio_kpi_key(tenant_id: str) -> str:
return f"kpi:{tenant_id}:portfolio"
def gateway_health_key(tenant_id: str, gateway_id: str) -> str:
return f"health:{tenant_id}:gateway:{gateway_id}"
def reading_monthly_key(tenant_id: str, unit_id: str, year: int, month: int) -> str:
return f"readings:{tenant_id}:monthly:{unit_id}:{year}:{month:02d}"
def settlement_pdf_key(tenant_id: str, settlement_id: str) -> str:
return f"pdf:{tenant_id}:settlement:{settlement_id}"
# Pattern til wildcard-sletning (ved ejendoms-ændring):
def tenant_pattern(tenant_id: str) -> str:
return f"*:{tenant_id}:*"
Portfolio KPI-caching
# server/src/portfolio/kpi_cache.py
import json
from datetime import datetime
from fastapi import Depends
from redis.asyncio import Redis
from server.src.cache.redis_client import get_redis
from server.src.cache.keys import portfolio_kpi_key
from server.src.db.engine import get_db
KPI_TTL_SECONDS = 300 # 5 minutter — KPI er ikke real-time kritisk
async def get_portfolio_kpi(
tenant_id: str,
redis: Redis = Depends(get_redis),
db=Depends(get_db),
) -> dict:
"""
1. Tjek cache → returnér straks hvis hit
2. Cache-miss → beregn fra DB → gem i Redis → returnér
"""
cache_key = portfolio_kpi_key(tenant_id)
# Cache hit
cached = await redis.get(cache_key)
if cached:
data = json.loads(cached)
data["_cached"] = True
return data
# Cache miss → beregn
kpi = await _compute_portfolio_kpi(tenant_id, db)
kpi["computed_at"] = datetime.utcnow().isoformat()
# Gem med TTL
await redis.setex(cache_key, KPI_TTL_SECONDS, json.dumps(kpi))
return kpi
async def invalidate_portfolio_kpi(tenant_id: str, redis: Redis) -> None:
"""Kald ved ny aflæsning, ny afregning o.lign."""
await redis.delete(portfolio_kpi_key(tenant_id))
async def _compute_portfolio_kpi(tenant_id: str, db) -> dict:
"""Dyr DB-query — køres kun ved cache-miss."""
from sqlalchemy import text
result = await db.execute(
text("""
SELECT
COUNT(DISTINCT p.id) AS property_count,
COUNT(DISTINCT u.id) AS unit_count,
COUNT(DISTINCT mi.id) AS installation_count,
MAX(r.timestamp) AS last_reading
FROM properties p
LEFT JOIN units u ON u.property_id = p.id AND u.deleted_at IS NULL
LEFT JOIN meter_installations mi ON mi.unit_id = u.id AND mi.removed_at IS NULL
LEFT JOIN readings r ON r.meter_installation_id = mi.id
AND r.timestamp > NOW() - INTERVAL '48 hours'
WHERE p.tenant_id = :tenant_id AND p.deleted_at IS NULL
"""),
{"tenant_id": tenant_id},
)
row = result.fetchone()
return dict(row._mapping) if row else {}
Pub/Sub — MQTT→SSE alarm-bro
# server/src/mqtt/alarm_bridge.py
# MQTT-subscriber → Redis pub/sub → SSE til React UI
# Tenant-isoleret: Hvert alarm-kanal pr. tenant
import asyncio
import json
import structlog
from redis.asyncio import Redis
log = structlog.get_logger()
async def publish_alarm(
redis: Redis,
tenant_id: str,
alarm: dict,
) -> None:
"""
MQTT subscriber kalder dette ved indkommende alarm.
Publicerer til tenant-specifik kanal.
"""
channel = f"alarms:{tenant_id}"
message = json.dumps(alarm)
await redis.publish(channel, message)
log.info("alarm_published", tenant_id=tenant_id, alarm_type=alarm.get("type"))
async def subscribe_alarms(
redis: Redis,
tenant_id: str,
) -> asyncio.AsyncGenerator[dict, None]:
"""
SSE endpoint abonnerer på alarm-kanal.
Generér alarms som async generator til StreamingResponse.
"""
channel = f"alarms:{tenant_id}"
pubsub = redis.pubsub()
try:
await pubsub.subscribe(channel)
log.info("sse_subscribed", tenant_id=tenant_id, channel=channel)
async for message in pubsub.listen():
if message["type"] == "message":
try:
yield json.loads(message["data"])
except json.JSONDecodeError:
log.warning("alarm_json_decode_error", data=message["data"])
finally:
await pubsub.unsubscribe(channel)
await pubsub.aclose()
log.info("sse_unsubscribed", tenant_id=tenant_id)
# FastAPI SSE endpoint:
# @router.get("/alarms/stream")
# async def alarm_stream(
# current_user=Depends(get_current_user),
# redis: Redis = Depends(get_redis),
# ):
# async def event_generator():
# async for alarm in subscribe_alarms(redis, current_user.tenant_id):
# yield f"data: {json.dumps(alarm)}\n\n"
# return StreamingResponse(event_generator(), media_type="text/event-stream")
Rate limiting med Redis
# server/src/middleware/rate_limit.py
# Sliding window rate limiter: 100 requests/minute pr. tenant
import time
from fastapi import Request, HTTPException
from redis.asyncio import Redis
async def check_rate_limit(
request: Request,
redis: Redis,
limit: int = 100,
window_seconds: int = 60,
) -> None:
"""
Sliding window med Redis ZADD + ZREMRANGEBYSCORE.
Virker korrekt på tværs af multiple server-instanser.
"""
tenant_id = getattr(request.state, "tenant_id", "anonymous")
key = f"ratelimit:{tenant_id}:{request.url.path}"
now = time.time()
window_start = now - window_seconds
pipe = redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # Fjern gamle requests
pipe.zadd(key, {str(now): now}) # Tilføj denne request
pipe.zcard(key) # Tæl requests i vinduet
pipe.expire(key, window_seconds)
results = await pipe.execute()
request_count = results[2]
if request_count > limit:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded: {limit} requests per {window_seconds}s",
headers={"Retry-After": str(window_seconds)},
)
Wildcard invalidering (SCAN-baseret)
# server/src/cache/invalidation.py
# Slet alle cache-poster for en tenant ved kritiske ændringer
# ALDRIG brug KEYS * i produktion — brug SCAN (non-blocking)
async def invalidate_tenant_cache(redis: Redis, tenant_id: str) -> int:
"""
Slet alle cache-nøgler for en tenant.
Bruges ved: ejendomsændring, ny gateway, manuel cache-clear.
Returns: antal slettede nøgler
"""
pattern = f"*:{tenant_id}:*"
deleted = 0
cursor = 0
while True:
cursor, keys = await redis.scan(
cursor=cursor,
match=pattern,
count=100, # Batch-størrelse — undgår Redis-blocking
)
if keys:
deleted += await redis.delete(*keys)
if cursor == 0:
break
return deleted
Konklusion
aioredis (nu redis.asyncio) giver non-blocking Redis-adgang i FastAPI. Tenant-scopede nøgle-skemaer ({prefix}:{tenant_id}:*) isolerer caches sikkert. Portfolio KPI-caching (5 min TTL) reducerer DB-load med 90 % ved mange samtidige dashboards. Redis pub/sub er den rette arkitektur til MQTT→SSE-alarm-bro i multi-tenant SaaS — tenant-isoleret og skalerbar. Rate limiting med sliding window ZADD virker korrekt på tværs af multiple server-instanser. SCAN frem for KEYS til wildcard-invalidering.
Se Python asyncio patterns guide eller FastAPI WebSocket guide.