· FastAPI· rate limiting· Redis· Python· sikkerhed· API· backend
FastAPI rate limiting — sliding window, token bucket og Redis
Rate limiting i FastAPI: sliding window counter i Redis, token bucket algoritme, IP-baseret og API-nøgle-baseret begrænsning, 429-respons med Retry-After og Middleware-implementation.
Af M-Bus Gateway
Rate limiting beskytter API'et mod misbrug og sikrer fair brug. Her er implementeringen med Redis sliding window og token bucket.
Sliding window counter (Redis)
# server/src/middleware/rate_limit.py
import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException
redis_client = aioredis.from_url("redis://localhost:6379")
async def sliding_window_rate_limit(
key: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, int]:
"""
Sliding window rate limiting via Redis sorted set.
Returnerer (allowed, requests_remaining).
"""
now = time.time()
window_start = now - window_seconds
pipe = redis_client.pipeline()
# Fjern udgåede requests fra vinduet:
pipe.zremrangebyscore(key, "-inf", window_start)
# Tæl aktuelle requests i vinduet:
pipe.zcard(key)
# Tilføj aktuel request:
pipe.zadd(key, {str(now): now})
# Sæt TTL:
pipe.expire(key, window_seconds + 1)
_, count, _, _ = await pipe.execute()
remaining = max(0, max_requests - count - 1)
allowed = count < max_requests
return allowed, remaining
# FastAPI dependency:
async def rate_limit_dependency(
request: Request,
max_requests: int = 100,
window_seconds: int = 60,
):
"""
Generisk rate limit dependency.
Key: IP + path for offentlige endpoints,
API-nøgle for autentificerede endpoints.
"""
client_ip = request.client.host if request.client else "unknown"
key = f"ratelimit:{client_ip}:{request.url.path}"
allowed, remaining = await sliding_window_rate_limit(
key, max_requests, window_seconds
)
if not allowed:
raise HTTPException(
status_code=429,
detail="For mange requests. Prøv igen om lidt.",
headers={
"Retry-After": str(window_seconds),
"X-RateLimit-Limit": str(max_requests),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(time.time() + window_seconds)),
},
)
return remaining
Route-specifik rate limiting
# server/src/auth/router.py
from functools import partial
# Strengere limit for auth-endpoints (brute force beskyttelse):
login_rate_limit = partial(rate_limit_dependency, max_requests=5, window_seconds=60)
register_rate_limit = partial(rate_limit_dependency, max_requests=3, window_seconds=300)
@router.post("/login")
async def login(
credentials: LoginRequest,
_: int = Depends(login_rate_limit),
) -> TokenResponse:
"""5 loginforsøg pr. minut pr. IP."""
...
@router.post("/register")
async def register(
data: RegisterRequest,
_: int = Depends(register_rate_limit),
) -> TokenResponse:
"""3 registreringer pr. 5 minutter pr. IP."""
...
# Mere liberal limit for normale API-kald:
@router.get("/readings")
async def list_readings(
remaining: int = Depends(
partial(rate_limit_dependency, max_requests=300, window_seconds=60)
),
current_user: User = Depends(get_current_user),
) -> Page[ReadingOut]:
"""300 kald pr. minut pr. IP."""
...
API-nøgle-baseret rate limiting
# Per-API-nøgle limit (tredjepartsintegrationer):
async def api_key_rate_limit(
request: Request,
api_key: str = Depends(get_api_key),
max_requests: int = 1000,
window_seconds: int = 60,
):
"""
Rate limit baseret på API-nøgle (ikke IP).
Tillader højere kvote for autentificerede klienter.
"""
key = f"ratelimit:apikey:{api_key}:{window_seconds}s"
allowed, remaining = await sliding_window_rate_limit(
key, max_requests, window_seconds
)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "rate_limit_exceeded",
"message": f"Max {max_requests} requests per {window_seconds}s",
"retry_after": window_seconds,
},
headers={"Retry-After": str(window_seconds)},
)
return remaining
Token bucket — burst-tolerant rate limiting
# Token bucket: Tillader korte bursts, begrænser langtidsforbrug
async def token_bucket_limit(
key: str,
capacity: int, # Max tokens (burst size)
refill_rate: float, # Tokens pr. sekund
) -> bool:
"""
Token bucket algoritme.
Tillader f.eks. 20 requests i burst men max 2/sek gennemsnitligt.
"""
now = time.time()
bucket_key = f"tokenbucket:{key}"
last_refill_key = f"tokenbucket:last:{key}"
# Hent nuværende tokens og sidst genopfyldning:
tokens_str, last_str = await redis_client.mget(bucket_key, last_refill_key)
tokens = float(tokens_str) if tokens_str else float(capacity)
last_refill = float(last_str) if last_str else now
# Genopfyld tokens baseret på forløbet tid:
elapsed = now - last_refill
tokens = min(capacity, tokens + elapsed * refill_rate)
if tokens < 1.0:
return False # Ikke tilladt
# Forbrug én token:
tokens -= 1.0
pipe = redis_client.pipeline()
pipe.set(bucket_key, tokens, ex=int(capacity / refill_rate) + 10)
pipe.set(last_refill_key, now, ex=int(capacity / refill_rate) + 10)
await pipe.execute()
return True
Middleware: Global rate limiting
# server/src/middleware/rate_limit_middleware.py
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class GlobalRateLimitMiddleware(BaseHTTPMiddleware):
"""
Middleware der rate-limiter alle requests globalt.
Konfigurér restriktivt her, giver specifikke endpoints mere.
"""
EXEMPT_PATHS = {"/api/health", "/metrics"}
async def dispatch(self, request: Request, call_next):
if request.url.path in self.EXEMPT_PATHS:
return await call_next(request)
client_ip = request.client.host if request.client else "unknown"
key = f"global:{client_ip}"
allowed, _ = await sliding_window_rate_limit(
key, max_requests=500, window_seconds=60
)
if not allowed:
return JSONResponse(
status_code=429,
content={"detail": "For mange requests"},
headers={"Retry-After": "60"},
)
return await call_next(request)
# main.py:
app.add_middleware(GlobalRateLimitMiddleware)
Test af rate limiting
# server/tests/test_rate_limiting.py
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_login_rate_limit(client):
"""5 forsøg tillades, 6. giver 429."""
with patch("server.src.middleware.rate_limit.redis_client") as mock_redis:
# Simulér at 5 forsøg allerede er brugt:
mock_redis.pipeline.return_value.__aenter__ = AsyncMock(
return_value=AsyncMock(
execute=AsyncMock(return_value=[None, 5, None, None])
)
)
resp = await client.post("/api/v1/auth/login",
json={"email": "x@y.dk", "password": "wrong"})
assert resp.status_code == 429
assert "Retry-After" in resp.headers
assert resp.headers["Retry-After"] == "60"
Konklusion
Redis sliding window med sorted set giver præcis rate limiting uden race conditions. Token bucket algoritmen er bedst til API'er med burst-tolerante klienter. Auth-endpoints (login/register) kræver strengere limits end normale API-kald. Retry-After-headeren informerer klienter om hvornår de kan prøve igen. Middleware-niveau global limiting supplerer route-specifikke limits som forsvar i dybden.