· JWT· autentificering· FastAPI· python-jose· TOTP· 2FA· sikkerhed· refresh token· Python
JWT-autentificering i FastAPI — implementation og sikkerhedsmønstre
JWT auth i FastAPI: token-generering med python-jose, refresh tokens, TOTP 2FA med pyotp, impersonering med audit log, token-rotation og rate limiting.
Af M-Bus Gateway
M-Bus Gateway platformen bruger JWT tokens med 6 roller, kortlivede access tokens, refresh-rotation og TOTP 2FA. Her er implementeringen.
Token-strategi
Access token:
Levetid: 30 minutter
Indhold: user_id, tenant_id, role, exp, iat
Signatur: HS256 (HMAC-SHA256)
Refresh token:
Levetid: 30 dage
Indhold: user_id, token_family, exp
Lagring: HTTPOnly cookie (ikke localStorage)
Rotation: Nyt refresh token ved hvert kald
Magic link token (lejer-adgang):
Levetid: 48 timer
Indhold: settlement_id, tenant_id, scoped til ét formål
Brug én gang (invalideres ved brug)
Token-generering
# server/src/auth/tokens.py
from jose import jwt, JWTError
from datetime import datetime, timedelta
import secrets
SECRET_KEY = settings.SECRET_KEY # Min. 32 bytes random
ALGORITHM = "HS256"
def create_access_token(
user_id: str,
tenant_id: str | None,
role: str,
expires_minutes: int = 30,
) -> str:
now = datetime.utcnow()
payload = {
"sub": user_id,
"tenant_id": tenant_id,
"role": role,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
"type": "access",
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str, family: str | None = None) -> str:
"""
Token family: Bruges til rotation-validering.
Hvis en brugt refresh token genbruges → invalider hele familien.
"""
family = family or secrets.token_urlsafe(16)
payload = {
"sub": user_id,
"family": family,
"exp": datetime.utcnow() + timedelta(days=30),
"type": "refresh",
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM), family
def create_magic_link_token(
settlement_id: str,
tenant_id: str,
expires_hours: int = 48,
) -> str:
payload = {
"settlement_id": settlement_id,
"tenant_id": tenant_id,
"exp": datetime.utcnow() + timedelta(hours=expires_hours),
"type": "magic_link",
"jti": secrets.token_urlsafe(16), # Unique ID til single-use tracking
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
Login endpoint med 2FA
# server/src/auth/router.py
from fastapi import APIRouter, HTTPException, Response
from pydantic import BaseModel
import bcrypt
import pyotp
router = APIRouter(prefix="/auth", tags=["auth"])
class LoginRequest(BaseModel):
email: str
password: str
totp_code: str | None = None
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
role: str
needs_password: bool = False # Lejer-aktivering
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
response: Response,
session: AsyncSession = Depends(get_session),
):
user = await get_user_by_email(request.email, session)
if not user or user.deleted_at:
raise HTTPException(401, "Ugyldige legitimationsoplysninger")
# Verificer adgangskode:
if not bcrypt.checkpw(
request.password.encode(), user.password_hash.encode()
):
raise HTTPException(401, "Ugyldige legitimationsoplysninger")
# TOTP 2FA (hvis aktiveret):
if user.totp_secret:
if not request.totp_code:
raise HTTPException(400, "2FA-kode påkrævet")
totp = pyotp.TOTP(user.totp_secret)
if not totp.verify(request.totp_code, valid_window=1):
raise HTTPException(401, "Ugyldig 2FA-kode")
# Generer tokens:
access_token = create_access_token(
user_id=str(user.id),
tenant_id=str(user.tenant_id) if user.tenant_id else None,
role=user.role,
)
refresh_token, family = create_refresh_token(str(user.id))
# Gem refresh token family i DB:
await store_refresh_token_family(user.id, family, session)
# Sæt refresh token som HTTPOnly cookie:
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=30 * 24 * 3600,
path="/auth/refresh",
)
return TokenResponse(
access_token=access_token,
role=user.role,
needs_password=not bool(user.password_hash),
)
Token refresh med rotation
@router.post("/refresh", response_model=TokenResponse)
async def refresh(
response: Response,
refresh_token: str = Cookie(None),
session: AsyncSession = Depends(get_session),
):
if not refresh_token:
raise HTTPException(401, "Ingen refresh token")
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(401, "Ugyldig refresh token")
if payload.get("type") != "refresh":
raise HTTPException(401, "Forkert token-type")
user_id = payload["sub"]
family = payload["family"]
# Verificer at family stadig er gyldig:
if not await verify_refresh_family(user_id, family, session):
# Token-reuse → invalider hele familien (muligt angreb):
await invalidate_token_family(user_id, family, session)
raise HTTPException(401, "Refresh token genbrugt — log ind igen")
# Rotation: Generer nyt refresh token i SAMME family:
user = await session.get(User, user_id)
new_refresh_token, _ = create_refresh_token(user_id, family=family)
response.set_cookie(
key="refresh_token",
value=new_refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=30 * 24 * 3600,
path="/auth/refresh",
)
return TokenResponse(
access_token=create_access_token(
user_id=user_id,
tenant_id=str(user.tenant_id) if user.tenant_id else None,
role=user.role,
),
role=user.role,
)
TOTP 2FA opsætning
@router.post("/totp/setup")
async def setup_totp(user: User = Depends(get_current_user)):
"""Generer TOTP-secret og QR-kode."""
secret = pyotp.random_base32()
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user.email,
issuer_name="M-Bus Gateway",
)
return {
"secret": secret,
"qr_uri": provisioning_uri,
"backup_codes": [secrets.token_hex(4) for _ in range(8)],
}
@router.post("/totp/verify")
async def verify_and_enable_totp(
code: str,
secret: str,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Verificer første kode og aktivér 2FA."""
totp = pyotp.TOTP(secret)
if not totp.verify(code, valid_window=1):
raise HTTPException(400, "Forkert kode")
user.totp_secret = secret
session.add(user)
await session.commit()
return {"message": "2FA aktiveret"}
Impersonering med audit log
@router.post("/impersonate/{tenant_id}")
async def impersonate_tenant(
tenant_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(require_role("super_admin", "external_admin")),
):
"""Opret kortlivet impersonerings-token + audit log."""
# Audit log:
await log_action(
session=session,
user=user,
action="impersonate",
entity_type="tenant",
entity_id=tenant_id,
after={
"target_tenant_id": tenant_id,
"impersonated_by_id": str(user.id),
"role": "landlord",
},
)
# Kortlivet token (2 timer):
token = create_access_token(
user_id=str(user.id),
tenant_id=tenant_id,
role="landlord",
expires_minutes=120,
)
# Marker som impersonering:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload["impersonated"] = True
payload["impersonated_by"] = str(user.id)
return {"access_token": jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)}
Konklusion
JWT-auth i FastAPI kræver fire elementer: kortlivede access tokens (30 min), HTTPOnly-cookie refresh tokens med rotation, TOTP 2FA via pyotp og audit log ved impersonering. Token-family-mønsteret detekterer stjålne refresh tokens ved at invalidere hele familien ved genbrugsforsøg.