M-Bus Gateway
← Tilbage til blog
· Python· ContextVar· asyncio· multi-tenant· FastAPI· middleware· backend

Python ContextVar til multi-tenant isolation i asyncio

Python ContextVar til async multi-tenant isolation: request-scoped tenant_id, structlog binding, asyncio task propagation, vs threading.local, og FastAPI middleware integration.

Af M-Bus Gateway

ContextVar fra Python 3.7+ giver request-scoped state i async kode — uden race conditions. Essentielt til multi-tenant SaaS.


ContextVar vs. threading.local

# threading.local — virker IKKE korrekt i asyncio:
import threading
_local = threading.local()

async def handler():
    _local.tenant_id = "t-123"
    await some_async_work()    # Andre requests kører her!
    # _local.tenant_id kan nu tilhøre en anden request


# ContextVar — asyncio-korrekt:
from contextvars import ContextVar

_tenant_id: ContextVar[str | None] = ContextVar("tenant_id", default=None)
_role: ContextVar[str | None] = ContextVar("role", default=None)

async def handler():
    token = _tenant_id.set("t-123")
    try:
        await some_async_work()    # Stadig "t-123" i denne task
        print(_tenant_id.get())    # "t-123" — garanteret
    finally:
        _tenant_id.reset(token)    # Genopret forrige værdi


# Hvert asyncio-task har sin eget ContextVar-kopi:
async def task_a():
    _tenant_id.set("tenant-A")
    await asyncio.sleep(0.1)
    assert _tenant_id.get() == "tenant-A"    # Altid sandt

async def task_b():
    _tenant_id.set("tenant-B")
    await asyncio.sleep(0.1)
    assert _tenant_id.get() == "tenant-B"    # Altid sandt

# Kører parallelt — ingen konflikt:
await asyncio.gather(task_a(), task_b())

Multi-tenant context module

# server/src/context.py

from contextvars import ContextVar
from typing import Any
import uuid

# Alle tenant-relaterede context-variabler:
_tenant_id: ContextVar[str | None] = ContextVar("tenant_id", default=None)
_user_id: ContextVar[str | None] = ContextVar("user_id", default=None)
_role: ContextVar[str | None] = ContextVar("role", default=None)
_request_id: ContextVar[str | None] = ContextVar("request_id", default=None)
_is_impersonating: ContextVar[bool] = ContextVar("is_impersonating", default=False)


def get_tenant_id() -> str | None:
    return _tenant_id.get()


def get_user_id() -> str | None:
    return _user_id.get()


def get_role() -> str | None:
    return _role.get()


def require_tenant_id() -> str:
    """Returnér tenant_id eller kast ValueError — bruges i services."""
    tid = _tenant_id.get()
    if tid is None:
        raise ValueError("No tenant_id in context — missing auth middleware?")
    return tid


class TenantContext:
    """Context manager til temporær tenant-skift (fx impersonering)."""
    def __init__(
        self,
        tenant_id: str | None,
        role: str | None = None,
        user_id: str | None = None,
        is_impersonating: bool = False,
    ):
        self._values = {
            _tenant_id: tenant_id,
            _role: role,
            _user_id: user_id,
            _is_impersonating: is_impersonating,
        }
        self._tokens: list = []

    def __enter__(self):
        for var, value in self._values.items():
            self._tokens.append(var.set(value))
        return self

    def __exit__(self, *_):
        for var, token in zip(self._values.keys(), self._tokens):
            var.reset(token)

    async def __aenter__(self):
        return self.__enter__()

    async def __aexit__(self, *args):
        return self.__exit__(*args)

FastAPI middleware integration

# server/src/middleware/tenant.py

from starlette.middleware.base import BaseHTTPMiddleware
from server.src.context import TenantContext
import structlog


class TenantContextMiddleware(BaseHTTPMiddleware):
    """
    Sæt tenant-context fra JWT for HELE request-lifecycle.
    ContextVar propageres automatisk til alle awaits inden for request.
    """
    async def dispatch(self, request: Request, call_next) -> Response:
        payload = _extract_jwt_payload(request)

        async with TenantContext(
            tenant_id=payload.get("tenant_id"),
            role=payload.get("role"),
            user_id=payload.get("sub"),
            is_impersonating=payload.get("is_impersonating", False),
        ):
            # Bind til structlog — alle log-linjer i denne request får tenant_id:
            structlog.contextvars.bind_contextvars(
                tenant_id=payload.get("tenant_id"),
                role=payload.get("role"),
                request_id=str(uuid.uuid4()),
            )

            try:
                return await call_next(request)
            finally:
                structlog.contextvars.clear_contextvars()

ContextVar i service-lag

# server/src/readings/service.py

from server.src.context import get_tenant_id, require_tenant_id


class ReadingService:
    """
    Service-laget bruger ContextVar direkte — ingen tenant_id parameter.
    Tenant-isolation garanteres af middleware.
    """
    async def get_all_readings(
        self,
        session: AsyncSession,
        limit: int = 100,
    ) -> list[Reading]:
        tenant_id = require_tenant_id()    # Fejler ved manglende context

        stmt = (
            select(Reading)
            .where(Reading.tenant_id == uuid.UUID(tenant_id))
            .limit(limit)
        )
        return (await session.execute(stmt)).scalars().all()

    async def create_reading(
        self,
        data: ReadingCreate,
        session: AsyncSession,
    ) -> Reading:
        tenant_id = require_tenant_id()

        reading = Reading(
            **data.model_dump(),
            tenant_id=uuid.UUID(tenant_id),    # Auto-sat fra context
        )
        session.add(reading)
        await session.commit()
        return reading


# FastAPI endpoint — ingen tenant_id i signatur:
@router.get("/readings")
async def list_readings(
    session: AsyncSession = Depends(get_session),
    _: User = Depends(require_role("landlord")),    # Sætter context via middleware
) -> list[ReadingOut]:
    service = ReadingService()
    return await service.get_all_readings(session)

ContextVar propagation til asyncio.create_task

# ContextVar kopieres automatisk til child tasks i Python 3.7+:

async def process_readings_batch(readings: list) -> None:
    tenant_id = require_tenant_id()    # Henter fra context

    # create_task kopierer context til child task:
    tasks = [
        asyncio.create_task(
            _process_single(r),    # Arver tenant_id fra parent
        )
        for r in readings
    ]
    await asyncio.gather(*tasks)


async def _process_single(reading: dict) -> None:
    tenant_id = get_tenant_id()    # Stadig korrekt i child task
    log.info("processing", tenant_id=tenant_id, reading_id=reading["id"])


# OBS: asyncio.to_thread kopierer IKKE context i alle Python-versioner.
# Brug explicit copy_context() ved behov:
import contextvars

async def run_sync_with_context(fn, *args):
    ctx = contextvars.copy_context()
    return await asyncio.to_thread(ctx.run, fn, *args)

Test med ContextVar

# server/tests/conftest.py

import pytest
from server.src.context import TenantContext

TENANT_ID = "550e8400-e29b-41d4-a716-446655440000"


@pytest.fixture
def tenant_context():
    """Sæt tenant-context for test."""
    with TenantContext(tenant_id=TENANT_ID, role="landlord"):
        yield


# Brug i test:
@pytest.mark.asyncio
async def test_reading_service_isolation(tenant_context, async_session):
    service = ReadingService()
    readings = await service.get_all_readings(async_session)

    # Alle readings tilhører TENANT_ID — garanteret af ContextVar:
    assert all(str(r.tenant_id) == TENANT_ID for r in readings)


@pytest.mark.asyncio
async def test_no_context_raises_error(async_session):
    service = ReadingService()

    # Ingen context = ValueError, ikke 500:
    with pytest.raises(ValueError, match="No tenant_id in context"):
        await service.get_all_readings(async_session)

Konklusion

ContextVar er det rigtige valg til multi-tenant isolation i asyncio — ikke threading.local. Hvert asyncio-task (og dermed hver FastAPI-request) får sin egen kopi af context-variablerne. TenantContext-context-manageren giver clean setup/teardown. Service-laget kalder require_tenant_id() direkte uden at modtage tenant_id som parameter — tenant-isolation er garanteret af middleware. asyncio.create_task propagerer context automatisk; asyncio.to_thread kræver explicit copy_context().

Se FastAPI middleware guide eller SQLModel multi-tenant patterns.