M-Bus Gateway
← Tilbage til blog
· FastAPI· rate limiting· Redis· Python· sikkerhed· API· backend

FastAPI rate limiting — sliding window, token bucket og Redis

Rate limiting i FastAPI: sliding window counter i Redis, token bucket algoritme, IP-baseret og API-nøgle-baseret begrænsning, 429-respons med Retry-After og Middleware-implementation.

Af M-Bus Gateway

Rate limiting beskytter API'et mod misbrug og sikrer fair brug. Her er implementeringen med Redis sliding window og token bucket.


Sliding window counter (Redis)

# server/src/middleware/rate_limit.py

import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException

redis_client = aioredis.from_url("redis://localhost:6379")


async def sliding_window_rate_limit(
    key: str,
    max_requests: int,
    window_seconds: int,
) -> tuple[bool, int]:
    """
    Sliding window rate limiting via Redis sorted set.
    Returnerer (allowed, requests_remaining).
    """
    now = time.time()
    window_start = now - window_seconds

    pipe = redis_client.pipeline()
    # Fjern udgåede requests fra vinduet:
    pipe.zremrangebyscore(key, "-inf", window_start)
    # Tæl aktuelle requests i vinduet:
    pipe.zcard(key)
    # Tilføj aktuel request:
    pipe.zadd(key, {str(now): now})
    # Sæt TTL:
    pipe.expire(key, window_seconds + 1)
    _, count, _, _ = await pipe.execute()

    remaining = max(0, max_requests - count - 1)
    allowed = count < max_requests
    return allowed, remaining


# FastAPI dependency:
async def rate_limit_dependency(
    request: Request,
    max_requests: int = 100,
    window_seconds: int = 60,
):
    """
    Generisk rate limit dependency.
    Key: IP + path for offentlige endpoints,
         API-nøgle for autentificerede endpoints.
    """
    client_ip = request.client.host if request.client else "unknown"
    key = f"ratelimit:{client_ip}:{request.url.path}"

    allowed, remaining = await sliding_window_rate_limit(
        key, max_requests, window_seconds
    )

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="For mange requests. Prøv igen om lidt.",
            headers={
                "Retry-After": str(window_seconds),
                "X-RateLimit-Limit": str(max_requests),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(time.time() + window_seconds)),
            },
        )

    return remaining

Route-specifik rate limiting

# server/src/auth/router.py

from functools import partial

# Strengere limit for auth-endpoints (brute force beskyttelse):
login_rate_limit = partial(rate_limit_dependency, max_requests=5, window_seconds=60)
register_rate_limit = partial(rate_limit_dependency, max_requests=3, window_seconds=300)

@router.post("/login")
async def login(
    credentials: LoginRequest,
    _: int = Depends(login_rate_limit),
) -> TokenResponse:
    """5 loginforsøg pr. minut pr. IP."""
    ...


@router.post("/register")
async def register(
    data: RegisterRequest,
    _: int = Depends(register_rate_limit),
) -> TokenResponse:
    """3 registreringer pr. 5 minutter pr. IP."""
    ...


# Mere liberal limit for normale API-kald:
@router.get("/readings")
async def list_readings(
    remaining: int = Depends(
        partial(rate_limit_dependency, max_requests=300, window_seconds=60)
    ),
    current_user: User = Depends(get_current_user),
) -> Page[ReadingOut]:
    """300 kald pr. minut pr. IP."""
    ...

API-nøgle-baseret rate limiting

# Per-API-nøgle limit (tredjepartsintegrationer):

async def api_key_rate_limit(
    request: Request,
    api_key: str = Depends(get_api_key),
    max_requests: int = 1000,
    window_seconds: int = 60,
):
    """
    Rate limit baseret på API-nøgle (ikke IP).
    Tillader højere kvote for autentificerede klienter.
    """
    key = f"ratelimit:apikey:{api_key}:{window_seconds}s"
    allowed, remaining = await sliding_window_rate_limit(
        key, max_requests, window_seconds
    )

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail={
                "error": "rate_limit_exceeded",
                "message": f"Max {max_requests} requests per {window_seconds}s",
                "retry_after": window_seconds,
            },
            headers={"Retry-After": str(window_seconds)},
        )

    return remaining

Token bucket — burst-tolerant rate limiting

# Token bucket: Tillader korte bursts, begrænser langtidsforbrug

async def token_bucket_limit(
    key: str,
    capacity: int,       # Max tokens (burst size)
    refill_rate: float,  # Tokens pr. sekund
) -> bool:
    """
    Token bucket algoritme.
    Tillader f.eks. 20 requests i burst men max 2/sek gennemsnitligt.
    """
    now = time.time()
    bucket_key = f"tokenbucket:{key}"
    last_refill_key = f"tokenbucket:last:{key}"

    # Hent nuværende tokens og sidst genopfyldning:
    tokens_str, last_str = await redis_client.mget(bucket_key, last_refill_key)
    tokens = float(tokens_str) if tokens_str else float(capacity)
    last_refill = float(last_str) if last_str else now

    # Genopfyld tokens baseret på forløbet tid:
    elapsed = now - last_refill
    tokens = min(capacity, tokens + elapsed * refill_rate)

    if tokens < 1.0:
        return False   # Ikke tilladt

    # Forbrug én token:
    tokens -= 1.0
    pipe = redis_client.pipeline()
    pipe.set(bucket_key, tokens, ex=int(capacity / refill_rate) + 10)
    pipe.set(last_refill_key, now, ex=int(capacity / refill_rate) + 10)
    await pipe.execute()
    return True

Middleware: Global rate limiting

# server/src/middleware/rate_limit_middleware.py

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse


class GlobalRateLimitMiddleware(BaseHTTPMiddleware):
    """
    Middleware der rate-limiter alle requests globalt.
    Konfigurér restriktivt her, giver specifikke endpoints mere.
    """

    EXEMPT_PATHS = {"/api/health", "/metrics"}

    async def dispatch(self, request: Request, call_next):
        if request.url.path in self.EXEMPT_PATHS:
            return await call_next(request)

        client_ip = request.client.host if request.client else "unknown"
        key = f"global:{client_ip}"

        allowed, _ = await sliding_window_rate_limit(
            key, max_requests=500, window_seconds=60
        )

        if not allowed:
            return JSONResponse(
                status_code=429,
                content={"detail": "For mange requests"},
                headers={"Retry-After": "60"},
            )

        return await call_next(request)


# main.py:
app.add_middleware(GlobalRateLimitMiddleware)

Test af rate limiting

# server/tests/test_rate_limiting.py

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_login_rate_limit(client):
    """5 forsøg tillades, 6. giver 429."""
    with patch("server.src.middleware.rate_limit.redis_client") as mock_redis:
        # Simulér at 5 forsøg allerede er brugt:
        mock_redis.pipeline.return_value.__aenter__ = AsyncMock(
            return_value=AsyncMock(
                execute=AsyncMock(return_value=[None, 5, None, None])
            )
        )

        resp = await client.post("/api/v1/auth/login",
                                  json={"email": "x@y.dk", "password": "wrong"})
        assert resp.status_code == 429
        assert "Retry-After" in resp.headers
        assert resp.headers["Retry-After"] == "60"

Konklusion

Redis sliding window med sorted set giver præcis rate limiting uden race conditions. Token bucket algoritmen er bedst til API'er med burst-tolerante klienter. Auth-endpoints (login/register) kræver strengere limits end normale API-kald. Retry-After-headeren informerer klienter om hvornår de kan prøve igen. Middleware-niveau global limiting supplerer route-specifikke limits som forsvar i dybden.

Se Redis Celery guide eller IoT netværkssikkerhed guide.