M-Bus Gateway
← Tilbage til blog
· JWT· autentificering· FastAPI· python-jose· TOTP· 2FA· sikkerhed· refresh token· Python

JWT-autentificering i FastAPI — implementation og sikkerhedsmønstre

JWT auth i FastAPI: token-generering med python-jose, refresh tokens, TOTP 2FA med pyotp, impersonering med audit log, token-rotation og rate limiting.

Af M-Bus Gateway

M-Bus Gateway platformen bruger JWT tokens med 6 roller, kortlivede access tokens, refresh-rotation og TOTP 2FA. Her er implementeringen.


Token-strategi

Access token:
  Levetid: 30 minutter
  Indhold: user_id, tenant_id, role, exp, iat
  Signatur: HS256 (HMAC-SHA256)

Refresh token:
  Levetid: 30 dage
  Indhold: user_id, token_family, exp
  Lagring: HTTPOnly cookie (ikke localStorage)
  Rotation: Nyt refresh token ved hvert kald

Magic link token (lejer-adgang):
  Levetid: 48 timer
  Indhold: settlement_id, tenant_id, scoped til ét formål
  Brug én gang (invalideres ved brug)

Token-generering

# server/src/auth/tokens.py
from jose import jwt, JWTError
from datetime import datetime, timedelta
import secrets

SECRET_KEY = settings.SECRET_KEY  # Min. 32 bytes random
ALGORITHM = "HS256"

def create_access_token(
    user_id: str,
    tenant_id: str | None,
    role: str,
    expires_minutes: int = 30,
) -> str:
    now = datetime.utcnow()
    payload = {
        "sub": user_id,
        "tenant_id": tenant_id,
        "role": role,
        "iat": now,
        "exp": now + timedelta(minutes=expires_minutes),
        "type": "access",
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)


def create_refresh_token(user_id: str, family: str | None = None) -> str:
    """
    Token family: Bruges til rotation-validering.
    Hvis en brugt refresh token genbruges → invalider hele familien.
    """
    family = family or secrets.token_urlsafe(16)
    payload = {
        "sub": user_id,
        "family": family,
        "exp": datetime.utcnow() + timedelta(days=30),
        "type": "refresh",
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM), family


def create_magic_link_token(
    settlement_id: str,
    tenant_id: str,
    expires_hours: int = 48,
) -> str:
    payload = {
        "settlement_id": settlement_id,
        "tenant_id": tenant_id,
        "exp": datetime.utcnow() + timedelta(hours=expires_hours),
        "type": "magic_link",
        "jti": secrets.token_urlsafe(16),  # Unique ID til single-use tracking
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

Login endpoint med 2FA

# server/src/auth/router.py
from fastapi import APIRouter, HTTPException, Response
from pydantic import BaseModel
import bcrypt
import pyotp

router = APIRouter(prefix="/auth", tags=["auth"])

class LoginRequest(BaseModel):
    email: str
    password: str
    totp_code: str | None = None

class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    role: str
    needs_password: bool = False  # Lejer-aktivering

@router.post("/login", response_model=TokenResponse)
async def login(
    request: LoginRequest,
    response: Response,
    session: AsyncSession = Depends(get_session),
):
    user = await get_user_by_email(request.email, session)
    if not user or user.deleted_at:
        raise HTTPException(401, "Ugyldige legitimationsoplysninger")

    # Verificer adgangskode:
    if not bcrypt.checkpw(
        request.password.encode(), user.password_hash.encode()
    ):
        raise HTTPException(401, "Ugyldige legitimationsoplysninger")

    # TOTP 2FA (hvis aktiveret):
    if user.totp_secret:
        if not request.totp_code:
            raise HTTPException(400, "2FA-kode påkrævet")
        totp = pyotp.TOTP(user.totp_secret)
        if not totp.verify(request.totp_code, valid_window=1):
            raise HTTPException(401, "Ugyldig 2FA-kode")

    # Generer tokens:
    access_token = create_access_token(
        user_id=str(user.id),
        tenant_id=str(user.tenant_id) if user.tenant_id else None,
        role=user.role,
    )
    refresh_token, family = create_refresh_token(str(user.id))

    # Gem refresh token family i DB:
    await store_refresh_token_family(user.id, family, session)

    # Sæt refresh token som HTTPOnly cookie:
    response.set_cookie(
        key="refresh_token",
        value=refresh_token,
        httponly=True,
        secure=True,
        samesite="strict",
        max_age=30 * 24 * 3600,
        path="/auth/refresh",
    )

    return TokenResponse(
        access_token=access_token,
        role=user.role,
        needs_password=not bool(user.password_hash),
    )

Token refresh med rotation

@router.post("/refresh", response_model=TokenResponse)
async def refresh(
    response: Response,
    refresh_token: str = Cookie(None),
    session: AsyncSession = Depends(get_session),
):
    if not refresh_token:
        raise HTTPException(401, "Ingen refresh token")

    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError:
        raise HTTPException(401, "Ugyldig refresh token")

    if payload.get("type") != "refresh":
        raise HTTPException(401, "Forkert token-type")

    user_id = payload["sub"]
    family = payload["family"]

    # Verificer at family stadig er gyldig:
    if not await verify_refresh_family(user_id, family, session):
        # Token-reuse → invalider hele familien (muligt angreb):
        await invalidate_token_family(user_id, family, session)
        raise HTTPException(401, "Refresh token genbrugt — log ind igen")

    # Rotation: Generer nyt refresh token i SAMME family:
    user = await session.get(User, user_id)
    new_refresh_token, _ = create_refresh_token(user_id, family=family)

    response.set_cookie(
        key="refresh_token",
        value=new_refresh_token,
        httponly=True,
        secure=True,
        samesite="strict",
        max_age=30 * 24 * 3600,
        path="/auth/refresh",
    )

    return TokenResponse(
        access_token=create_access_token(
            user_id=user_id,
            tenant_id=str(user.tenant_id) if user.tenant_id else None,
            role=user.role,
        ),
        role=user.role,
    )

TOTP 2FA opsætning

@router.post("/totp/setup")
async def setup_totp(user: User = Depends(get_current_user)):
    """Generer TOTP-secret og QR-kode."""
    secret = pyotp.random_base32()
    totp = pyotp.TOTP(secret)
    provisioning_uri = totp.provisioning_uri(
        name=user.email,
        issuer_name="M-Bus Gateway",
    )
    return {
        "secret": secret,
        "qr_uri": provisioning_uri,
        "backup_codes": [secrets.token_hex(4) for _ in range(8)],
    }

@router.post("/totp/verify")
async def verify_and_enable_totp(
    code: str,
    secret: str,
    user: User = Depends(get_current_user),
    session: AsyncSession = Depends(get_session),
):
    """Verificer første kode og aktivér 2FA."""
    totp = pyotp.TOTP(secret)
    if not totp.verify(code, valid_window=1):
        raise HTTPException(400, "Forkert kode")

    user.totp_secret = secret
    session.add(user)
    await session.commit()
    return {"message": "2FA aktiveret"}

Impersonering med audit log

@router.post("/impersonate/{tenant_id}")
async def impersonate_tenant(
    tenant_id: str,
    session: AsyncSession = Depends(get_session),
    user: User = Depends(require_role("super_admin", "external_admin")),
):
    """Opret kortlivet impersonerings-token + audit log."""
    # Audit log:
    await log_action(
        session=session,
        user=user,
        action="impersonate",
        entity_type="tenant",
        entity_id=tenant_id,
        after={
            "target_tenant_id": tenant_id,
            "impersonated_by_id": str(user.id),
            "role": "landlord",
        },
    )

    # Kortlivet token (2 timer):
    token = create_access_token(
        user_id=str(user.id),
        tenant_id=tenant_id,
        role="landlord",
        expires_minutes=120,
    )
    # Marker som impersonering:
    payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    payload["impersonated"] = True
    payload["impersonated_by"] = str(user.id)

    return {"access_token": jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)}

Konklusion

JWT-auth i FastAPI kræver fire elementer: kortlivede access tokens (30 min), HTTPOnly-cookie refresh tokens med rotation, TOTP 2FA via pyotp og audit log ved impersonering. Token-family-mønsteret detekterer stjålne refresh tokens ved at invalidere hele familien ved genbrugsforsøg.

Se RBAC og multi-tenant guide eller sikkerhedsoverblik.