Python logging best practices til IoT og SaaS produktion
Python logging i IoT og SaaS: structlog JSON output, kontekst-propagering via ContextVar, FastAPI request-ID middleware, Celery task-logging, log-levels og Grafana Loki integration.
Af M-Bus Gateway
Professionel logging i IoT og SaaS er ikke print() — det er strukturerede JSON-logs med kontekst, der kan søges og analyseres i Grafana Loki eller Elasticsearch.
structlog — struktureret JSON logging
# server/src/logging_config.py
import structlog
import logging
import sys
def configure_logging(log_level: str = "INFO", json_output: bool = True) -> None:
"""
Konfigurér structlog med JSON output til produktion.
I development: Brug PrettyPrinter for læselig output.
"""
shared_processors = [
structlog.contextvars.merge_contextvars, # Inject request-ID m.m.
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.TimeStamper(fmt="iso"),
]
if json_output:
# Produktion: JSON → Grafana Loki / Elasticsearch
processors = shared_processors + [
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
else:
# Development: Farvet tekst
processors = shared_processors + [
structlog.dev.ConsoleRenderer(colors=True),
]
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(sys.stdout),
cache_logger_on_first_use=True,
)
# Brug i kode:
log = structlog.get_logger()
# Struktureret event:
log.info("reading_stored", meter_id="KAM-001", kwh=234.5, gateway_id="GW-0001")
# → {"event": "reading_stored", "meter_id": "KAM-001", "kwh": 234.5, "gateway_id": "GW-0001", "level": "info", "timestamp": "2026-05-24T07:00:00Z"}
# Fejl med traceback:
try:
do_something()
except Exception as e:
log.exception("operation_failed", error_type=type(e).__name__)
ContextVar — request-ID propagering
# server/src/middleware/logging_middleware.py
import uuid
import structlog
from contextvars import ContextVar
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
# ContextVar lever i den aktuelle async-kontekst (per-request)
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
log = structlog.get_logger()
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""
Sæt request-ID i ContextVar + structlog contextvars.
Alle log-statements i downstream kode får automatisk request_id.
"""
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request_id_var.set(request_id)
# Bind til structlog contextvars — propageres til alle log-calls
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
log.info("request_started")
import time
start = time.monotonic()
try:
response = await call_next(request)
duration_ms = (time.monotonic() - start) * 1000
log.info(
"request_completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 1),
)
response.headers["X-Request-ID"] = request_id
return response
except Exception:
log.exception("request_failed")
raise
finally:
structlog.contextvars.clear_contextvars()
Log-levels — hvornår hvad
# Log-level retningslinjer til IoT og SaaS:
log = structlog.get_logger()
# DEBUG: Intern tilstand, kun i development
log.debug("mqtt_message_received", topic="meters/GW-0001/data", size_bytes=2048)
log.debug("hca_telegram_parsed", meter_id="KAM-001", raw_kwh=234.567)
# INFO: Normale hændelser i drift
log.info("reading_stored", meter_id="KAM-001", count=42)
log.info("settlement_generated", property_id="EJD-001", units=12)
log.info("email_sent", recipient_count=12, email_type="settlement")
# WARNING: Uventet men håndterbart
log.warning("mqtt_reconnecting", attempt=3, host="178.105.90.8")
log.warning("gateway_stale", gateway_id="GW-0002", hours_since_reading=36)
log.warning("aes_decryption_failed", meter_id="ELS-001", reason="wrong_key")
# ERROR: Fejl der kræver opmærksomhed (men service kører stadig)
log.error("settlement_generation_failed", property_id="EJD-003", error="missing_annual_input")
log.error("email_delivery_failed", recipient="lejer@example.com", bounce_type="hard")
# CRITICAL: System-kritisk fejl — kræver øjeblikkelig handling
log.critical("database_connection_lost", error="connection refused")
log.critical("watchdog_not_fed", seconds_since_last=12)
Celery task logging
# server/src/workers/tasks/settlement.py
import structlog
from celery import Task
from server.src.workers.celery_app import celery_app
log = structlog.get_logger()
class LoggedTask(Task):
"""
Base Task der automatisk logger start, succes og fejl.
Arv fra denne i stedet for Task.
"""
def on_success(self, retval, task_id, args, kwargs) -> None:
log.info(
"task_succeeded",
task_name=self.name,
task_id=task_id,
result_type=type(retval).__name__,
)
def on_failure(self, exc, task_id, args, kwargs, einfo) -> None:
log.error(
"task_failed",
task_name=self.name,
task_id=task_id,
error_type=type(exc).__name__,
error=str(exc),
)
def on_retry(self, exc, task_id, args, kwargs, einfo) -> None:
log.warning(
"task_retrying",
task_name=self.name,
task_id=task_id,
attempt=self.request.retries,
)
@celery_app.task(base=LoggedTask, bind=True, max_retries=3)
def generate_settlement_task(self, property_id: str, period_end: str) -> dict:
log.info("settlement_task_started", property_id=property_id, period_end=period_end)
try:
result = generate_settlement(property_id, period_end)
return result
except Exception as exc:
log.exception("settlement_task_error", property_id=property_id)
raise self.retry(exc=exc, countdown=60 * (self.request.retries + 1))
Grafana Loki integration
# promtail/config.yml — Send Docker container logs til Loki
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: mbus-server
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
- source_labels: [__meta_docker_container_name]
target_label: container
pipeline_stages:
- json:
expressions:
level: level
event: event
request_id: request_id
- labels:
level:
event:
Grafana LogQL queries til IoT operations:
Alle fejl de seneste 24 timer:
{job="mbus-server"} |= "level=error" | json | line_format "{{.event}} {{.error}}"
Gateway stale-advarsler:
{job="mbus-server"} |= "gateway_stale"
Langsom requests (>500ms):
{job="mbus-server"} | json | duration_ms > 500 | line_format "{{.path}} {{.duration_ms}}ms"
AES-fejl pr. gateway:
{job="mbus-server"} |= "aes_decryption_failed"
| json gateway_id
| rate[5m]) by (gateway_id)
Konklusion
structlog med JSON output er standarden for produktion. ContextVar + structlog bind_contextvars propagerer request-ID automatisk til alle log-statements i en request. Log-levels: DEBUG (dev-only), INFO (normale hændelser), WARNING (uventet men håndterbart), ERROR (kræver opmærksomhed), CRITICAL (system-kritisk). Grafana Loki med Promtail giver søgbar log-analyse. Celery LoggedTask base-class sikrer konsistent task-logging uden boilerplate.
Se Python structlog logging guide eller Grafana alerting IoT guide.