Python OpenTelemetry og distributed tracing — FastAPI IoT platform
OpenTelemetry distributed tracing for FastAPI og IoT: OTLP Jaeger eksport, custom spans til MQTT og wM-Bus, context propagation, Celery tracing, metrics og sampling strategier.
Af M-Bus Gateway
OpenTelemetry giver end-to-end tracing fra MQTT-modtagelse til PDF-generering. Her er implementeringen til IoT-platform med Jaeger og custom spans.
Setup og instrumentering
# server/src/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.resources import Resource
def setup_telemetry(app, engine) -> None:
"""Initialiser OpenTelemetry for hele server-applikationen."""
resource = Resource.create({
"service.name": "mbus-gateway-server",
"service.version": "1.0.0",
"deployment.environment": settings.environment,
})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=settings.otel_endpoint)
)
)
trace.set_tracer_provider(provider)
# Auto-instrumentering:
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
RedisInstrumentor().instrument()
# Global tracer:
tracer = trace.get_tracer("mbus-gateway")
Custom spans til MQTT og wM-Bus
# server/src/mqtt/subscriber.py
from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
tracer = trace.get_tracer("mbus.mqtt")
propagator = TraceContextTextMapPropagator()
async def handle_gateway_payload(gateway_id: str, payload: dict) -> None:
"""
MQTT payload-behandling med full distributed trace.
Span starter ved MQTT-modtagelse og propageres til Celery.
"""
# Udpak traceparent fra payload hvis gateway sender den med:
carrier = payload.get("_trace", {})
ctx = propagator.extract(carrier)
with tracer.start_as_current_span(
"mqtt.payload.process",
context=ctx,
kind=trace.SpanKind.CONSUMER,
attributes={
"messaging.system": "mqtt",
"messaging.destination": f"meters/{gateway_id}/data",
"gateway.id": gateway_id,
"readings.count": len(payload.get("readings", [])),
},
) as span:
try:
readings = await process_readings(gateway_id, payload)
span.set_attribute("readings.stored", len(readings))
span.set_status(trace.StatusCode.OK)
except Exception as e:
span.record_exception(e)
span.set_status(trace.StatusCode.ERROR, str(e))
raise
@tracer.start_as_current_span("wmbus.telegram.parse")
def parse_wmbus_telegram(raw_bytes: bytes, meter_id: str) -> dict:
"""Parse wM-Bus telegram med automatisk span."""
span = trace.get_current_span()
span.set_attribute("meter.id", meter_id)
span.set_attribute("telegram.length_bytes", len(raw_bytes))
result = _do_parse(raw_bytes)
span.set_attribute("telegram.manufacturer", result.get("manufacturer"))
span.set_attribute("telegram.encrypted", result.get("aes_encrypted", False))
return result
Context propagation til Celery
# server/src/workers/tasks/readings.py
from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from celery import shared_task
tracer = trace.get_tracer("mbus.celery")
propagator = TraceContextTextMapPropagator()
@shared_task(name="readings.process_batch", acks_late=True)
def process_readings_batch(readings: list[dict], traceparent: dict | None = None):
"""
Celery task med context propagation.
traceparent sendes fra FastAPI/MQTT til Celery for end-to-end trace.
"""
# Genopret trace-context fra HTTP-headers:
ctx = propagator.extract(traceparent or {})
with tracer.start_as_current_span(
"celery.readings.batch",
context=ctx,
kind=trace.SpanKind.CONSUMER,
attributes={
"celery.task": "readings.process_batch",
"batch.size": len(readings),
},
):
return _store_readings(readings)
# Kald fra FastAPI endpoint med context:
def dispatch_batch(readings: list, session):
carrier = {}
propagator.inject(carrier) # Injicer aktuel traceparent
process_readings_batch.delay(
readings=[r.model_dump() for r in readings],
traceparent=carrier,
)
Metrics med OpenTelemetry
# server/src/telemetry.py
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
def setup_metrics() -> None:
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=settings.otel_endpoint),
export_interval_millis=30_000,
)
provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(provider)
meter = metrics.get_meter("mbus-gateway")
# Counters og histogrammer:
readings_ingested = meter.create_counter(
"readings.ingested.total",
description="Antal aflæsninger modtaget fra gateways",
)
mqtt_payload_size = meter.create_histogram(
"mqtt.payload.bytes",
description="MQTT payload størrelse i bytes",
unit="bytes",
)
settlement_duration = meter.create_histogram(
"settlement.generation.duration",
description="PDF-afregning genereringstid",
unit="ms",
)
active_gateways = meter.create_observable_gauge(
"gateways.active",
callbacks=[lambda _: [(count_active_gateways(), {})]],
description="Antal gateways der har sendt data inden for 36t",
)
# Brug i kode:
async def ingest_reading(reading: Reading, gateway_id: str):
readings_ingested.add(1, {
"gateway_id": gateway_id,
"meter_type": reading.meter_type,
"tenant_id": str(reading.tenant_id),
})
Sampling strategi
# server/src/telemetry.py
# Adaptive sampling — undgå at drukne i heartbeat-spans
from opentelemetry.sdk.trace.sampling import (
ParentBased,
TraceIdRatioBased,
ALWAYS_ON,
ALWAYS_OFF,
)
class MbusAdaptiveSampler:
"""
Altid sample kritiske operationer.
10% sample på heartbeat/routine.
"""
def __init__(self):
self._always = ALWAYS_ON
self._never = ALWAYS_OFF
self._ratio = TraceIdRatioBased(0.1)
def should_sample(self, parent_ctx, trace_id, name, kind, attributes, links):
# Altid trace errors, settlements og MQTT-payloads:
critical = any(kw in name for kw in [
"settlement", "error", "mqtt.payload", "ota.", "alert"
])
if critical:
return self._always.should_sample(
parent_ctx, trace_id, name, kind, attributes, links
)
# Heartbeat: aldrig trace
if "heartbeat" in name or "health" in name:
return self._never.should_sample(
parent_ctx, trace_id, name, kind, attributes, links
)
# Alt andet: 10%
return self._ratio.should_sample(
parent_ctx, trace_id, name, kind, attributes, links
)
@property
def description(self):
return "MbusAdaptiveSampler"
provider = TracerProvider(
resource=resource,
sampler=ParentBased(root=MbusAdaptiveSampler()),
)
Jaeger Docker Compose
# docker-compose.yml
services:
jaeger:
image: jaegertracing/all-in-one:1.57
ports:
- "16686:16686" # Jaeger UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
COLLECTOR_OTLP_ENABLED: "true"
volumes:
- jaeger_data:/badger
networks:
- internal
server:
environment:
OTEL_ENDPOINT: "http://jaeger:4317"
OTEL_EXPORTER_OTLP_ENDPOINT: "http://jaeger:4317"
depends_on:
- jaeger
Konklusion
OpenTelemetry giver end-to-end tracing fra MQTT-modtagelse til Celery-tasks og PDF-generering. FastAPIInstrumentor auto-instrumenterer alle HTTP-endpoints. Custom spans til parse_wmbus_telegram og handle_gateway_payload giver granulær synlighed. Context propagation via TraceContextTextMapPropagator og traceparent i Celery-kald sikrer sammenhængende traces på tværs af service-grænser. Adaptive sampling prioriterer kritiske operationer (settlements, OTA, alerts) og dropper routine-heartbeats.