· Redis· FastAPI· caching· Python· SaaS· rate limiting· Pub/Sub· Docker
Redis caching med FastAPI — mønstre for SaaS-platforme
Redis caching i FastAPI: response caching, session store, rate limiting, Pub/Sub til alarmer, cache invalidering ved mutation og Docker Compose opsætning.
Af M-Bus Gateway
Redis bruges til tre formål i M-Bus Gateway platformen: caching af dyre database-queries, rate limiting og Pub/Sub til realtids-alarmer. Her er mønstrene.
Redis forbindelse med aioredis
# server/src/core/redis.py
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from fastapi import FastAPI
redis_client: aioredis.Redis | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client
redis_client = aioredis.from_url(
"redis://redis:6379/0",
encoding="utf-8",
decode_responses=True,
max_connections=20,
)
yield
await redis_client.aclose()
def get_redis() -> aioredis.Redis:
if redis_client is None:
raise RuntimeError("Redis not initialized")
return redis_client
Response caching af portfolio-endpoints
# server/src/portfolio/router.py
import json
from fastapi import Depends
from redis.asyncio import Redis
CACHE_TTL = 300 # 5 minutter
@router.get("/kpi")
async def get_portfolio_kpi(
user: TokenPayload = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
redis: Redis = Depends(get_redis),
):
cache_key = f"portfolio:kpi:{user.tenant_id}"
# Forsøg cache-hit
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Cache-miss → beregn fra DB
result = await _calculate_kpi(session, user.tenant_id)
# Gem i cache med TTL
await redis.setex(cache_key, CACHE_TTL, json.dumps(result))
return result
async def invalidate_portfolio_cache(redis: Redis, tenant_id: str) -> None:
"""Kald ved mutation (ny ejendom, ny aflæsning osv.)."""
pattern = f"portfolio:*:{tenant_id}"
keys = await redis.keys(pattern)
if keys:
await redis.delete(*keys)
Cache invalidering ved datamutation
# Invalidér cache automatisk når data ændres:
@router.post("/properties")
async def create_property(
body: PropertyIn,
user: TokenPayload = Depends(require_role("landlord", "super_admin")),
session: AsyncSession = Depends(get_session),
redis: Redis = Depends(get_redis),
):
prop = await _create_property(session, body, user)
# Invalidér alle portfolio-caches for denne tenant
await invalidate_portfolio_cache(redis, str(user.tenant_id))
return prop
Rate limiting med Redis sliding window
# server/src/middleware/rate_limit.py
import time
from fastapi import Request, HTTPException
from redis.asyncio import Redis
async def check_rate_limit(
request: Request,
redis: Redis,
limit: int = 100,
window_seconds: int = 60,
) -> None:
"""Sliding window rate limiter: limit requests pr. IP pr. window."""
client_ip = request.client.host
key = f"rate:{client_ip}:{request.url.path}"
now = time.time()
window_start = now - window_seconds
pipe = redis.pipeline()
# Fjern gamle entries
pipe.zremrangebyscore(key, 0, window_start)
# Tilføj nuværende request
pipe.zadd(key, {str(now): now})
# Tæl requests i window
pipe.zcard(key)
# Sæt TTL
pipe.expire(key, window_seconds)
results = await pipe.execute()
count = results[2]
if count > limit:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded: {limit} requests per {window_seconds}s",
headers={"Retry-After": str(window_seconds)},
)
Session store (JWT refresh tokens)
# server/src/auth/session.py
REFRESH_TTL = 7 * 24 * 3600 # 7 dage
async def store_refresh_token(
redis: Redis,
user_id: str,
token_family: str,
token_hash: str,
) -> None:
"""Gem refresh token family for rotation detection."""
key = f"refresh:{user_id}:{token_family}"
await redis.setex(key, REFRESH_TTL, token_hash)
async def validate_and_rotate_refresh_token(
redis: Redis,
user_id: str,
token_family: str,
token_hash: str,
) -> bool:
"""Valider token og roter — reuse detection."""
key = f"refresh:{user_id}:{token_family}"
stored = await redis.get(key)
if stored != token_hash:
# Mulig token reuse — invalider hele familien
await redis.delete(key)
return False
return True
async def revoke_all_sessions(redis: Redis, user_id: str) -> None:
"""Brug ved logout-all eller sikkerhedshændelse."""
pattern = f"refresh:{user_id}:*"
keys = await redis.keys(pattern)
if keys:
await redis.delete(*keys)
Pub/Sub til realtids-alarmer
# server/src/alarms/pubsub.py
import json
from redis.asyncio import Redis
from fastapi.responses import StreamingResponse
ALARM_CHANNEL = "alarms:{tenant_id}"
async def publish_alarm(redis: Redis, tenant_id: str, alarm: dict) -> None:
channel = ALARM_CHANNEL.format(tenant_id=tenant_id)
await redis.publish(channel, json.dumps(alarm))
async def alarm_stream(redis: Redis, tenant_id: str):
"""SSE generator — stream alarmer til browser via EventSource."""
channel = ALARM_CHANNEL.format(tenant_id=tenant_id)
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
try:
async for message in pubsub.listen():
if message["type"] == "message":
data = message["data"]
yield f"data: {data}\n\n"
finally:
await pubsub.unsubscribe(channel)
await pubsub.aclose()
@router.get("/stream")
async def alarm_sse(
user: TokenPayload = Depends(get_current_user),
redis: Redis = Depends(get_redis),
):
return StreamingResponse(
alarm_stream(redis, str(user.tenant_id)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx: disable buffering
},
)
Distribueret lock (Celery task deduplication)
# server/src/workers/locks.py
import asyncio
from redis.asyncio import Redis
async def acquire_lock(
redis: Redis,
key: str,
ttl_seconds: int = 300,
) -> bool:
"""Sæt kun hvis ikke eksisterer — atomisk NX+EX."""
result = await redis.set(key, "1", ex=ttl_seconds, nx=True)
return result is True
async def with_lock(redis: Redis, key: str, ttl: int = 300):
"""Context manager til distribueret lock."""
acquired = await acquire_lock(redis, key, ttl)
if not acquired:
return
try:
yield
finally:
await redis.delete(key)
# Brug i Celery task:
# async with with_lock(redis, f"annual-calc:{tenant_id}:{period}"):
# await run_distribution_calculation(...)
Docker Compose konfiguration
# docker-compose.yml (uddrag)
services:
redis:
image: redis:7-alpine
command: >
redis-server
--maxmemory 256mb
--maxmemory-policy allkeys-lru
--save ""
--appendonly no
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
networks:
- internal
server:
depends_on:
redis:
condition: service_healthy
environment:
REDIS_URL: redis://redis:6379/0
Monitoring: Redis memory og hit-rate
# Hit-rate og memory stats:
redis-cli info stats | grep -E "keyspace_hits|keyspace_misses|used_memory_human"
# Eksempel output:
# keyspace_hits:15420
# keyspace_misses:1230
# used_memory_human:18.4M
# Hit-rate = 15420 / (15420 + 1230) = 92.6%
# Se alle cache-nøgler for en tenant:
redis-cli keys "portfolio:*:550e8400-e29b-41d4-a716-446655440000"
Konklusion
Redis løser tre uafhængige problemer i M-Bus Gateway platformen: 300s TTL-caching reducerer portfolio-endpoint query-tid fra ~200ms til under 1ms; sliding window rate limiting beskytter public endpoints mod misbrug; Pub/Sub driver SSE alarmstrømmen til browser uden polling. Cache-invalidering ved mutation sikrer datakonsistens.