M-Bus Gateway
← Tilbage til blog
· Redis· FastAPI· caching· Python· SaaS· rate limiting· Pub/Sub· Docker

Redis caching med FastAPI — mønstre for SaaS-platforme

Redis caching i FastAPI: response caching, session store, rate limiting, Pub/Sub til alarmer, cache invalidering ved mutation og Docker Compose opsætning.

Af M-Bus Gateway

Redis bruges til tre formål i M-Bus Gateway platformen: caching af dyre database-queries, rate limiting og Pub/Sub til realtids-alarmer. Her er mønstrene.


Redis forbindelse med aioredis

# server/src/core/redis.py

import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from fastapi import FastAPI

redis_client: aioredis.Redis | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_client
    redis_client = aioredis.from_url(
        "redis://redis:6379/0",
        encoding="utf-8",
        decode_responses=True,
        max_connections=20,
    )
    yield
    await redis_client.aclose()

def get_redis() -> aioredis.Redis:
    if redis_client is None:
        raise RuntimeError("Redis not initialized")
    return redis_client

Response caching af portfolio-endpoints

# server/src/portfolio/router.py

import json
from fastapi import Depends
from redis.asyncio import Redis

CACHE_TTL = 300  # 5 minutter

@router.get("/kpi")
async def get_portfolio_kpi(
    user: TokenPayload = Depends(get_current_user),
    session: AsyncSession = Depends(get_session),
    redis: Redis = Depends(get_redis),
):
    cache_key = f"portfolio:kpi:{user.tenant_id}"

    # Forsøg cache-hit
    cached = await redis.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache-miss → beregn fra DB
    result = await _calculate_kpi(session, user.tenant_id)

    # Gem i cache med TTL
    await redis.setex(cache_key, CACHE_TTL, json.dumps(result))
    return result

async def invalidate_portfolio_cache(redis: Redis, tenant_id: str) -> None:
    """Kald ved mutation (ny ejendom, ny aflæsning osv.)."""
    pattern = f"portfolio:*:{tenant_id}"
    keys = await redis.keys(pattern)
    if keys:
        await redis.delete(*keys)

Cache invalidering ved datamutation

# Invalidér cache automatisk når data ændres:

@router.post("/properties")
async def create_property(
    body: PropertyIn,
    user: TokenPayload = Depends(require_role("landlord", "super_admin")),
    session: AsyncSession = Depends(get_session),
    redis: Redis = Depends(get_redis),
):
    prop = await _create_property(session, body, user)
    # Invalidér alle portfolio-caches for denne tenant
    await invalidate_portfolio_cache(redis, str(user.tenant_id))
    return prop

Rate limiting med Redis sliding window

# server/src/middleware/rate_limit.py

import time
from fastapi import Request, HTTPException
from redis.asyncio import Redis

async def check_rate_limit(
    request: Request,
    redis: Redis,
    limit: int = 100,
    window_seconds: int = 60,
) -> None:
    """Sliding window rate limiter: limit requests pr. IP pr. window."""
    client_ip = request.client.host
    key = f"rate:{client_ip}:{request.url.path}"
    now = time.time()
    window_start = now - window_seconds

    pipe = redis.pipeline()
    # Fjern gamle entries
    pipe.zremrangebyscore(key, 0, window_start)
    # Tilføj nuværende request
    pipe.zadd(key, {str(now): now})
    # Tæl requests i window
    pipe.zcard(key)
    # Sæt TTL
    pipe.expire(key, window_seconds)
    results = await pipe.execute()

    count = results[2]
    if count > limit:
        raise HTTPException(
            status_code=429,
            detail=f"Rate limit exceeded: {limit} requests per {window_seconds}s",
            headers={"Retry-After": str(window_seconds)},
        )

Session store (JWT refresh tokens)

# server/src/auth/session.py

REFRESH_TTL = 7 * 24 * 3600  # 7 dage

async def store_refresh_token(
    redis: Redis,
    user_id: str,
    token_family: str,
    token_hash: str,
) -> None:
    """Gem refresh token family for rotation detection."""
    key = f"refresh:{user_id}:{token_family}"
    await redis.setex(key, REFRESH_TTL, token_hash)

async def validate_and_rotate_refresh_token(
    redis: Redis,
    user_id: str,
    token_family: str,
    token_hash: str,
) -> bool:
    """Valider token og roter — reuse detection."""
    key = f"refresh:{user_id}:{token_family}"
    stored = await redis.get(key)
    if stored != token_hash:
        # Mulig token reuse — invalider hele familien
        await redis.delete(key)
        return False
    return True

async def revoke_all_sessions(redis: Redis, user_id: str) -> None:
    """Brug ved logout-all eller sikkerhedshændelse."""
    pattern = f"refresh:{user_id}:*"
    keys = await redis.keys(pattern)
    if keys:
        await redis.delete(*keys)

Pub/Sub til realtids-alarmer

# server/src/alarms/pubsub.py

import json
from redis.asyncio import Redis
from fastapi.responses import StreamingResponse

ALARM_CHANNEL = "alarms:{tenant_id}"

async def publish_alarm(redis: Redis, tenant_id: str, alarm: dict) -> None:
    channel = ALARM_CHANNEL.format(tenant_id=tenant_id)
    await redis.publish(channel, json.dumps(alarm))

async def alarm_stream(redis: Redis, tenant_id: str):
    """SSE generator — stream alarmer til browser via EventSource."""
    channel = ALARM_CHANNEL.format(tenant_id=tenant_id)
    pubsub = redis.pubsub()
    await pubsub.subscribe(channel)
    try:
        async for message in pubsub.listen():
            if message["type"] == "message":
                data = message["data"]
                yield f"data: {data}\n\n"
    finally:
        await pubsub.unsubscribe(channel)
        await pubsub.aclose()

@router.get("/stream")
async def alarm_sse(
    user: TokenPayload = Depends(get_current_user),
    redis: Redis = Depends(get_redis),
):
    return StreamingResponse(
        alarm_stream(redis, str(user.tenant_id)),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Nginx: disable buffering
        },
    )

Distribueret lock (Celery task deduplication)

# server/src/workers/locks.py

import asyncio
from redis.asyncio import Redis

async def acquire_lock(
    redis: Redis,
    key: str,
    ttl_seconds: int = 300,
) -> bool:
    """Sæt kun hvis ikke eksisterer — atomisk NX+EX."""
    result = await redis.set(key, "1", ex=ttl_seconds, nx=True)
    return result is True

async def with_lock(redis: Redis, key: str, ttl: int = 300):
    """Context manager til distribueret lock."""
    acquired = await acquire_lock(redis, key, ttl)
    if not acquired:
        return
    try:
        yield
    finally:
        await redis.delete(key)

# Brug i Celery task:
# async with with_lock(redis, f"annual-calc:{tenant_id}:{period}"):
#     await run_distribution_calculation(...)

Docker Compose konfiguration

# docker-compose.yml (uddrag)
services:
  redis:
    image: redis:7-alpine
    command: >
      redis-server
      --maxmemory 256mb
      --maxmemory-policy allkeys-lru
      --save ""
      --appendonly no
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 3
    networks:
      - internal

  server:
    depends_on:
      redis:
        condition: service_healthy
    environment:
      REDIS_URL: redis://redis:6379/0

Monitoring: Redis memory og hit-rate

# Hit-rate og memory stats:
redis-cli info stats | grep -E "keyspace_hits|keyspace_misses|used_memory_human"

# Eksempel output:
# keyspace_hits:15420
# keyspace_misses:1230
# used_memory_human:18.4M

# Hit-rate = 15420 / (15420 + 1230) = 92.6%

# Se alle cache-nøgler for en tenant:
redis-cli keys "portfolio:*:550e8400-e29b-41d4-a716-446655440000"

Konklusion

Redis løser tre uafhængige problemer i M-Bus Gateway platformen: 300s TTL-caching reducerer portfolio-endpoint query-tid fra ~200ms til under 1ms; sliding window rate limiting beskytter public endpoints mod misbrug; Pub/Sub driver SSE alarmstrømmen til browser uden polling. Cache-invalidering ved mutation sikrer datakonsistens.

Se Celery beat guide eller FastAPI multi-tenant guide.