M-Bus Gateway
← Tilbage til blog
· python· redis· aioredis· cache· IoT· SaaS· pubsub· TTL· FastAPI· Celery

Python Redis og aioredis — cache-mønstre til IoT og SaaS

Redis cache-mønstre til Python IoT og SaaS: aioredis async, tenant-scopede cache-nøgler, TTL-strategier, portfolio KPI-caching, pub/sub til MQTT→SSE bro og connection pool tuning.

Af M-Bus Gateway

Redis er ikke kun Celery-backend — det er platformen til IoT-real-time alarmer, portfolio KPI-cache og tenant-isoleret pub/sub. Her er mønstrene.


aioredis — async Redis klient

# server/src/cache/redis_client.py
import redis.asyncio as aioredis
from server.src.config import get_settings
from typing import AsyncGenerator


_redis_pool: aioredis.Redis | None = None


async def get_redis() -> AsyncGenerator[aioredis.Redis, None]:
    """
    FastAPI dependency: Leverer aioredis-klient fra connection pool.
    Pool oprettes én gang ved app-start (lifespan).
    """
    global _redis_pool
    if _redis_pool is None:
        settings = get_settings()
        _redis_pool = aioredis.from_url(
            str(settings.redis_url),
            max_connections=20,
            decode_responses=True,       # Returnér str, ikke bytes
            socket_timeout=5.0,
            socket_connect_timeout=5.0,
        )
    yield _redis_pool


async def close_redis() -> None:
    """Kald ved app shutdown (lifespan cleanup)."""
    global _redis_pool
    if _redis_pool:
        await _redis_pool.aclose()
        _redis_pool = None

Tenant-scopede cache-nøgler

# server/src/cache/keys.py
# Nøgle-schema: {prefix}:{tenant_id}:{resource}:{id}
# Sikrer at tenant A aldrig kan læse tenant B's cache

def portfolio_kpi_key(tenant_id: str) -> str:
    return f"kpi:{tenant_id}:portfolio"


def gateway_health_key(tenant_id: str, gateway_id: str) -> str:
    return f"health:{tenant_id}:gateway:{gateway_id}"


def reading_monthly_key(tenant_id: str, unit_id: str, year: int, month: int) -> str:
    return f"readings:{tenant_id}:monthly:{unit_id}:{year}:{month:02d}"


def settlement_pdf_key(tenant_id: str, settlement_id: str) -> str:
    return f"pdf:{tenant_id}:settlement:{settlement_id}"


# Pattern til wildcard-sletning (ved ejendoms-ændring):
def tenant_pattern(tenant_id: str) -> str:
    return f"*:{tenant_id}:*"

Portfolio KPI-caching

# server/src/portfolio/kpi_cache.py
import json
from datetime import datetime
from fastapi import Depends
from redis.asyncio import Redis
from server.src.cache.redis_client import get_redis
from server.src.cache.keys import portfolio_kpi_key
from server.src.db.engine import get_db


KPI_TTL_SECONDS = 300    # 5 minutter — KPI er ikke real-time kritisk


async def get_portfolio_kpi(
    tenant_id: str,
    redis: Redis = Depends(get_redis),
    db=Depends(get_db),
) -> dict:
    """
    1. Tjek cache → returnér straks hvis hit
    2. Cache-miss → beregn fra DB → gem i Redis → returnér
    """
    cache_key = portfolio_kpi_key(tenant_id)

    # Cache hit
    cached = await redis.get(cache_key)
    if cached:
        data = json.loads(cached)
        data["_cached"] = True
        return data

    # Cache miss → beregn
    kpi = await _compute_portfolio_kpi(tenant_id, db)
    kpi["computed_at"] = datetime.utcnow().isoformat()

    # Gem med TTL
    await redis.setex(cache_key, KPI_TTL_SECONDS, json.dumps(kpi))

    return kpi


async def invalidate_portfolio_kpi(tenant_id: str, redis: Redis) -> None:
    """Kald ved ny aflæsning, ny afregning o.lign."""
    await redis.delete(portfolio_kpi_key(tenant_id))


async def _compute_portfolio_kpi(tenant_id: str, db) -> dict:
    """Dyr DB-query — køres kun ved cache-miss."""
    from sqlalchemy import text
    result = await db.execute(
        text("""
            SELECT
                COUNT(DISTINCT p.id) AS property_count,
                COUNT(DISTINCT u.id) AS unit_count,
                COUNT(DISTINCT mi.id) AS installation_count,
                MAX(r.timestamp) AS last_reading
            FROM properties p
            LEFT JOIN units u ON u.property_id = p.id AND u.deleted_at IS NULL
            LEFT JOIN meter_installations mi ON mi.unit_id = u.id AND mi.removed_at IS NULL
            LEFT JOIN readings r ON r.meter_installation_id = mi.id
                AND r.timestamp > NOW() - INTERVAL '48 hours'
            WHERE p.tenant_id = :tenant_id AND p.deleted_at IS NULL
        """),
        {"tenant_id": tenant_id},
    )
    row = result.fetchone()
    return dict(row._mapping) if row else {}

Pub/Sub — MQTT→SSE alarm-bro

# server/src/mqtt/alarm_bridge.py
# MQTT-subscriber → Redis pub/sub → SSE til React UI
# Tenant-isoleret: Hvert alarm-kanal pr. tenant

import asyncio
import json
import structlog
from redis.asyncio import Redis

log = structlog.get_logger()


async def publish_alarm(
    redis: Redis,
    tenant_id: str,
    alarm: dict,
) -> None:
    """
    MQTT subscriber kalder dette ved indkommende alarm.
    Publicerer til tenant-specifik kanal.
    """
    channel = f"alarms:{tenant_id}"
    message = json.dumps(alarm)
    await redis.publish(channel, message)
    log.info("alarm_published", tenant_id=tenant_id, alarm_type=alarm.get("type"))


async def subscribe_alarms(
    redis: Redis,
    tenant_id: str,
) -> asyncio.AsyncGenerator[dict, None]:
    """
    SSE endpoint abonnerer på alarm-kanal.
    Generér alarms som async generator til StreamingResponse.
    """
    channel = f"alarms:{tenant_id}"
    pubsub = redis.pubsub()

    try:
        await pubsub.subscribe(channel)
        log.info("sse_subscribed", tenant_id=tenant_id, channel=channel)

        async for message in pubsub.listen():
            if message["type"] == "message":
                try:
                    yield json.loads(message["data"])
                except json.JSONDecodeError:
                    log.warning("alarm_json_decode_error", data=message["data"])
    finally:
        await pubsub.unsubscribe(channel)
        await pubsub.aclose()
        log.info("sse_unsubscribed", tenant_id=tenant_id)


# FastAPI SSE endpoint:
# @router.get("/alarms/stream")
# async def alarm_stream(
#     current_user=Depends(get_current_user),
#     redis: Redis = Depends(get_redis),
# ):
#     async def event_generator():
#         async for alarm in subscribe_alarms(redis, current_user.tenant_id):
#             yield f"data: {json.dumps(alarm)}\n\n"
#     return StreamingResponse(event_generator(), media_type="text/event-stream")

Rate limiting med Redis

# server/src/middleware/rate_limit.py
# Sliding window rate limiter: 100 requests/minute pr. tenant

import time
from fastapi import Request, HTTPException
from redis.asyncio import Redis


async def check_rate_limit(
    request: Request,
    redis: Redis,
    limit: int = 100,
    window_seconds: int = 60,
) -> None:
    """
    Sliding window med Redis ZADD + ZREMRANGEBYSCORE.
    Virker korrekt på tværs af multiple server-instanser.
    """
    tenant_id = getattr(request.state, "tenant_id", "anonymous")
    key = f"ratelimit:{tenant_id}:{request.url.path}"
    now = time.time()
    window_start = now - window_seconds

    pipe = redis.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)     # Fjern gamle requests
    pipe.zadd(key, {str(now): now})                 # Tilføj denne request
    pipe.zcard(key)                                  # Tæl requests i vinduet
    pipe.expire(key, window_seconds)
    results = await pipe.execute()

    request_count = results[2]
    if request_count > limit:
        raise HTTPException(
            status_code=429,
            detail=f"Rate limit exceeded: {limit} requests per {window_seconds}s",
            headers={"Retry-After": str(window_seconds)},
        )

Wildcard invalidering (SCAN-baseret)

# server/src/cache/invalidation.py
# Slet alle cache-poster for en tenant ved kritiske ændringer
# ALDRIG brug KEYS * i produktion — brug SCAN (non-blocking)

async def invalidate_tenant_cache(redis: Redis, tenant_id: str) -> int:
    """
    Slet alle cache-nøgler for en tenant.
    Bruges ved: ejendomsændring, ny gateway, manuel cache-clear.
    Returns: antal slettede nøgler
    """
    pattern = f"*:{tenant_id}:*"
    deleted = 0
    cursor = 0

    while True:
        cursor, keys = await redis.scan(
            cursor=cursor,
            match=pattern,
            count=100,      # Batch-størrelse — undgår Redis-blocking
        )
        if keys:
            deleted += await redis.delete(*keys)
        if cursor == 0:
            break

    return deleted

Konklusion

aioredis (nu redis.asyncio) giver non-blocking Redis-adgang i FastAPI. Tenant-scopede nøgle-skemaer ({prefix}:{tenant_id}:*) isolerer caches sikkert. Portfolio KPI-caching (5 min TTL) reducerer DB-load med 90 % ved mange samtidige dashboards. Redis pub/sub er den rette arkitektur til MQTT→SSE-alarm-bro i multi-tenant SaaS — tenant-isoleret og skalerbar. Rate limiting med sliding window ZADD virker korrekt på tværs af multiple server-instanser. SCAN frem for KEYS til wildcard-invalidering.

Se Python asyncio patterns guide eller FastAPI WebSocket guide.