Python ContextVar til multi-tenant isolation i asyncio
Python ContextVar til async multi-tenant isolation: request-scoped tenant_id, structlog binding, asyncio task propagation, vs threading.local, og FastAPI middleware integration.
Af M-Bus Gateway
ContextVar fra Python 3.7+ giver request-scoped state i async kode — uden race conditions. Essentielt til multi-tenant SaaS.
ContextVar vs. threading.local
# threading.local — virker IKKE korrekt i asyncio:
import threading
_local = threading.local()
async def handler():
_local.tenant_id = "t-123"
await some_async_work() # Andre requests kører her!
# _local.tenant_id kan nu tilhøre en anden request
# ContextVar — asyncio-korrekt:
from contextvars import ContextVar
_tenant_id: ContextVar[str | None] = ContextVar("tenant_id", default=None)
_role: ContextVar[str | None] = ContextVar("role", default=None)
async def handler():
token = _tenant_id.set("t-123")
try:
await some_async_work() # Stadig "t-123" i denne task
print(_tenant_id.get()) # "t-123" — garanteret
finally:
_tenant_id.reset(token) # Genopret forrige værdi
# Hvert asyncio-task har sin eget ContextVar-kopi:
async def task_a():
_tenant_id.set("tenant-A")
await asyncio.sleep(0.1)
assert _tenant_id.get() == "tenant-A" # Altid sandt
async def task_b():
_tenant_id.set("tenant-B")
await asyncio.sleep(0.1)
assert _tenant_id.get() == "tenant-B" # Altid sandt
# Kører parallelt — ingen konflikt:
await asyncio.gather(task_a(), task_b())
Multi-tenant context module
# server/src/context.py
from contextvars import ContextVar
from typing import Any
import uuid
# Alle tenant-relaterede context-variabler:
_tenant_id: ContextVar[str | None] = ContextVar("tenant_id", default=None)
_user_id: ContextVar[str | None] = ContextVar("user_id", default=None)
_role: ContextVar[str | None] = ContextVar("role", default=None)
_request_id: ContextVar[str | None] = ContextVar("request_id", default=None)
_is_impersonating: ContextVar[bool] = ContextVar("is_impersonating", default=False)
def get_tenant_id() -> str | None:
return _tenant_id.get()
def get_user_id() -> str | None:
return _user_id.get()
def get_role() -> str | None:
return _role.get()
def require_tenant_id() -> str:
"""Returnér tenant_id eller kast ValueError — bruges i services."""
tid = _tenant_id.get()
if tid is None:
raise ValueError("No tenant_id in context — missing auth middleware?")
return tid
class TenantContext:
"""Context manager til temporær tenant-skift (fx impersonering)."""
def __init__(
self,
tenant_id: str | None,
role: str | None = None,
user_id: str | None = None,
is_impersonating: bool = False,
):
self._values = {
_tenant_id: tenant_id,
_role: role,
_user_id: user_id,
_is_impersonating: is_impersonating,
}
self._tokens: list = []
def __enter__(self):
for var, value in self._values.items():
self._tokens.append(var.set(value))
return self
def __exit__(self, *_):
for var, token in zip(self._values.keys(), self._tokens):
var.reset(token)
async def __aenter__(self):
return self.__enter__()
async def __aexit__(self, *args):
return self.__exit__(*args)
FastAPI middleware integration
# server/src/middleware/tenant.py
from starlette.middleware.base import BaseHTTPMiddleware
from server.src.context import TenantContext
import structlog
class TenantContextMiddleware(BaseHTTPMiddleware):
"""
Sæt tenant-context fra JWT for HELE request-lifecycle.
ContextVar propageres automatisk til alle awaits inden for request.
"""
async def dispatch(self, request: Request, call_next) -> Response:
payload = _extract_jwt_payload(request)
async with TenantContext(
tenant_id=payload.get("tenant_id"),
role=payload.get("role"),
user_id=payload.get("sub"),
is_impersonating=payload.get("is_impersonating", False),
):
# Bind til structlog — alle log-linjer i denne request får tenant_id:
structlog.contextvars.bind_contextvars(
tenant_id=payload.get("tenant_id"),
role=payload.get("role"),
request_id=str(uuid.uuid4()),
)
try:
return await call_next(request)
finally:
structlog.contextvars.clear_contextvars()
ContextVar i service-lag
# server/src/readings/service.py
from server.src.context import get_tenant_id, require_tenant_id
class ReadingService:
"""
Service-laget bruger ContextVar direkte — ingen tenant_id parameter.
Tenant-isolation garanteres af middleware.
"""
async def get_all_readings(
self,
session: AsyncSession,
limit: int = 100,
) -> list[Reading]:
tenant_id = require_tenant_id() # Fejler ved manglende context
stmt = (
select(Reading)
.where(Reading.tenant_id == uuid.UUID(tenant_id))
.limit(limit)
)
return (await session.execute(stmt)).scalars().all()
async def create_reading(
self,
data: ReadingCreate,
session: AsyncSession,
) -> Reading:
tenant_id = require_tenant_id()
reading = Reading(
**data.model_dump(),
tenant_id=uuid.UUID(tenant_id), # Auto-sat fra context
)
session.add(reading)
await session.commit()
return reading
# FastAPI endpoint — ingen tenant_id i signatur:
@router.get("/readings")
async def list_readings(
session: AsyncSession = Depends(get_session),
_: User = Depends(require_role("landlord")), # Sætter context via middleware
) -> list[ReadingOut]:
service = ReadingService()
return await service.get_all_readings(session)
ContextVar propagation til asyncio.create_task
# ContextVar kopieres automatisk til child tasks i Python 3.7+:
async def process_readings_batch(readings: list) -> None:
tenant_id = require_tenant_id() # Henter fra context
# create_task kopierer context til child task:
tasks = [
asyncio.create_task(
_process_single(r), # Arver tenant_id fra parent
)
for r in readings
]
await asyncio.gather(*tasks)
async def _process_single(reading: dict) -> None:
tenant_id = get_tenant_id() # Stadig korrekt i child task
log.info("processing", tenant_id=tenant_id, reading_id=reading["id"])
# OBS: asyncio.to_thread kopierer IKKE context i alle Python-versioner.
# Brug explicit copy_context() ved behov:
import contextvars
async def run_sync_with_context(fn, *args):
ctx = contextvars.copy_context()
return await asyncio.to_thread(ctx.run, fn, *args)
Test med ContextVar
# server/tests/conftest.py
import pytest
from server.src.context import TenantContext
TENANT_ID = "550e8400-e29b-41d4-a716-446655440000"
@pytest.fixture
def tenant_context():
"""Sæt tenant-context for test."""
with TenantContext(tenant_id=TENANT_ID, role="landlord"):
yield
# Brug i test:
@pytest.mark.asyncio
async def test_reading_service_isolation(tenant_context, async_session):
service = ReadingService()
readings = await service.get_all_readings(async_session)
# Alle readings tilhører TENANT_ID — garanteret af ContextVar:
assert all(str(r.tenant_id) == TENANT_ID for r in readings)
@pytest.mark.asyncio
async def test_no_context_raises_error(async_session):
service = ReadingService()
# Ingen context = ValueError, ikke 500:
with pytest.raises(ValueError, match="No tenant_id in context"):
await service.get_all_readings(async_session)
Konklusion
ContextVar er det rigtige valg til multi-tenant isolation i asyncio — ikke threading.local. Hvert asyncio-task (og dermed hver FastAPI-request) får sin egen kopi af context-variablerne. TenantContext-context-manageren giver clean setup/teardown. Service-laget kalder require_tenant_id() direkte uden at modtage tenant_id som parameter — tenant-isolation er garanteret af middleware. asyncio.create_task propagerer context automatisk; asyncio.to_thread kræver explicit copy_context().
Se FastAPI middleware guide eller SQLModel multi-tenant patterns.