M-Bus Gateway
← Tilbage til blog
· structlog· logging· Python· FastAPI· Celery· JSON· Grafana· Loki· observability

Python structlog til produktion — JSON logging i FastAPI og Celery

structlog til produktions-logging i Python: JSON output, kontekst-berigende processors, FastAPI middleware, Celery worker integration og Grafana Loki setup.

Af M-Bus Gateway

M-Bus Gateway platformen bruger structlog til al logging. Her er opsætningen der giver søgbare JSON-logs i produktion og læsbare farvede logs i development.


Hvorfor structlog frem for standard logging?

# Standard Python logging — svær at parse i produktion:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Settlement generated: id={settlement_id}, tenant={tenant_id}, lines={len(lines)}")
# Output: 2026-05-24 12:34:56,789 INFO server.settlements.router Settlement generated: ...
# Problem: Svær at query på settlement_id, tenant_id etc.

# structlog — struktureret nøgle/værdi:
import structlog
logger = structlog.get_logger()
logger.info("settlement_generated",
    settlement_id=settlement_id,
    tenant_id=str(tenant_id),
    line_count=len(lines),
)
# JSON output: {"event": "settlement_generated", "settlement_id": "...", "tenant_id": "...", "line_count": 5}
# Loki/Elasticsearch: Kan query på settlement_id="abc" direkte

Konfiguration: Dev vs. produktion

# server/src/core/logging.py
import structlog
import logging
import sys

def configure_logging(environment: str = "production"):
    """
    Development: Farvet, human-readable output
    Production: JSON-output til stdout (Docker logs → Loki)
    """

    # Fælles processors for alle environments:
    shared_processors = [
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.ExceptionRenderer(),
    ]

    if environment == "development":
        # Farvet, indented output til terminal:
        structlog.configure(
            processors=[
                *shared_processors,
                structlog.dev.ConsoleRenderer(colors=True),
            ],
            wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
            logger_factory=structlog.PrintLoggerFactory(),
        )
    else:
        # JSON til stdout (Docker → Loki/Elasticsearch):
        structlog.configure(
            processors=[
                *shared_processors,
                structlog.processors.dict_tracebacks,
                structlog.processors.JSONRenderer(),
            ],
            wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
            logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
        )

FastAPI middleware: Request context

# server/src/core/middleware.py
from fastapi import Request
import structlog
import uuid
import time

logger = structlog.get_logger()

async def logging_middleware(request: Request, call_next):
    """
    Tilføj request_id, path og method til alle log-linjer i dette request.
    structlog.contextvars giver thread-safe context.
    """
    request_id = str(uuid.uuid4())[:8]

    # Bind request-context til alle log-kald i denne request:
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
    )

    start_time = time.time()

    logger.info("request_started")

    try:
        response = await call_next(request)
        duration_ms = int((time.time() - start_time) * 1000)

        logger.info(
            "request_completed",
            status_code=response.status_code,
            duration_ms=duration_ms,
        )
        return response

    except Exception as exc:
        logger.error("request_failed", exc_info=True)
        raise
# server/src/main.py
from fastapi import FastAPI
from server.src.core.middleware import logging_middleware
from server.src.core.logging import configure_logging

configure_logging(environment=settings.ENVIRONMENT)

app = FastAPI()
app.middleware("http")(logging_middleware)

Brug i routers

# server/src/settlements/router.py
import structlog

logger = structlog.get_logger()

@router.post("/{settlement_id}/generate")
async def generate_settlement(
    settlement_id: str,
    user: User = Depends(get_current_user),
    session: AsyncSession = Depends(get_session),
):
    # Log med kontekst — request_id arves fra middleware:
    log = logger.bind(settlement_id=settlement_id, user_id=str(user.id))
    log.info("settlement_generate_requested")

    settlement = await session.get(Settlement, settlement_id)
    if not settlement:
        log.warning("settlement_not_found")
        raise HTTPException(404, "Afregning ikke fundet")

    if settlement.tenant_id != user.tenant_id:
        log.error("settlement_access_denied", settlement_tenant=str(settlement.tenant_id))
        raise HTTPException(403, "Ingen adgang")

    # Dispatch baggrundstask:
    task = generate_settlement_pdf.delay(settlement_id)
    log.info("settlement_pdf_queued", task_id=task.id)

    return {"task_id": task.id}

Celery worker: Logging opsætning

# server/src/workers/celery_app.py
from celery.signals import worker_process_init, task_prerun, task_postrun, task_failure

@worker_process_init.connect
def setup_worker_logging(**kwargs):
    """Initialisér structlog i hvert worker-subprocess."""
    from server.src.core.logging import configure_logging
    configure_logging(environment=settings.ENVIRONMENT)

@task_prerun.connect
def log_task_start(task_id, task, args, kwargs, **kw):
    structlog.contextvars.bind_contextvars(
        task_id=task_id,
        task_name=task.name,
    )
    structlog.get_logger().info("task_started")

@task_failure.connect
def log_task_failure(task_id, exception, traceback, **kwargs):
    structlog.get_logger().error(
        "task_failed",
        error=str(exception),
        exc_info=(type(exception), exception, traceback),
    )

Grafana Loki: Log-aggregering

# docker-compose.yml — Loki + Promtail:
services:
  loki:
    image: grafana/loki:2.9.0
    ports:
      - "3100:3100"
    volumes:
      - loki_data:/loki

  promtail:
    image: grafana/promtail:2.9.0
    volumes:
      - /var/log/docker:/var/log/docker:ro
      - ./promtail-config.yml:/etc/promtail/config.yml:ro
    depends_on:
      - loki
# promtail-config.yml
scrape_configs:
  - job_name: mbus-server
    static_configs:
      - targets: [localhost]
        labels:
          job: mbus-server
          __path__: /var/log/docker/server*.log
    pipeline_stages:
      - json:
          expressions:
            event: event
            level: level
            settlement_id: settlement_id
            tenant_id: tenant_id
      - labels:
          level:
          event:
Loki query-eksempler:
  {job="mbus-server"} |= "settlement_generated"
  {job="mbus-server"} | json | settlement_id="abc-123"
  {job="mbus-server"} | json | level="error" | rate(1m)

Konklusion

structlog giver struktureret JSON-logging der er søgbar og maskinlæsbar. Kontekst-binding via contextvars sikrer at request_id og andre metadata automatisk følger med alle log-linjer i et request eller en Celery-task — uden at sende dem manuelt som parametre. Loki + Grafana gør loggen søgbar med millisekund-præcision.

Se FastAPI SQLModel guide eller Celery beat guide.