M-Bus Gateway
← Tilbage til blog
· OpenTelemetry· tracing· FastAPI· Python· observability· MQTT· IoT· backend

Python OpenTelemetry og distributed tracing — FastAPI IoT platform

OpenTelemetry distributed tracing for FastAPI og IoT: OTLP Jaeger eksport, custom spans til MQTT og wM-Bus, context propagation, Celery tracing, metrics og sampling strategier.

Af M-Bus Gateway

OpenTelemetry giver end-to-end tracing fra MQTT-modtagelse til PDF-generering. Her er implementeringen til IoT-platform med Jaeger og custom spans.


Setup og instrumentering

# server/src/telemetry.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.resources import Resource


def setup_telemetry(app, engine) -> None:
    """Initialiser OpenTelemetry for hele server-applikationen."""
    resource = Resource.create({
        "service.name": "mbus-gateway-server",
        "service.version": "1.0.0",
        "deployment.environment": settings.environment,
    })

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(
        BatchSpanProcessor(
            OTLPSpanExporter(endpoint=settings.otel_endpoint)
        )
    )
    trace.set_tracer_provider(provider)

    # Auto-instrumentering:
    FastAPIInstrumentor.instrument_app(app)
    SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
    RedisInstrumentor().instrument()


# Global tracer:
tracer = trace.get_tracer("mbus-gateway")

Custom spans til MQTT og wM-Bus

# server/src/mqtt/subscriber.py

from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

tracer = trace.get_tracer("mbus.mqtt")
propagator = TraceContextTextMapPropagator()


async def handle_gateway_payload(gateway_id: str, payload: dict) -> None:
    """
    MQTT payload-behandling med full distributed trace.
    Span starter ved MQTT-modtagelse og propageres til Celery.
    """
    # Udpak traceparent fra payload hvis gateway sender den med:
    carrier = payload.get("_trace", {})
    ctx = propagator.extract(carrier)

    with tracer.start_as_current_span(
        "mqtt.payload.process",
        context=ctx,
        kind=trace.SpanKind.CONSUMER,
        attributes={
            "messaging.system": "mqtt",
            "messaging.destination": f"meters/{gateway_id}/data",
            "gateway.id": gateway_id,
            "readings.count": len(payload.get("readings", [])),
        },
    ) as span:
        try:
            readings = await process_readings(gateway_id, payload)
            span.set_attribute("readings.stored", len(readings))
            span.set_status(trace.StatusCode.OK)
        except Exception as e:
            span.record_exception(e)
            span.set_status(trace.StatusCode.ERROR, str(e))
            raise


@tracer.start_as_current_span("wmbus.telegram.parse")
def parse_wmbus_telegram(raw_bytes: bytes, meter_id: str) -> dict:
    """Parse wM-Bus telegram med automatisk span."""
    span = trace.get_current_span()
    span.set_attribute("meter.id", meter_id)
    span.set_attribute("telegram.length_bytes", len(raw_bytes))

    result = _do_parse(raw_bytes)

    span.set_attribute("telegram.manufacturer", result.get("manufacturer"))
    span.set_attribute("telegram.encrypted", result.get("aes_encrypted", False))

    return result

Context propagation til Celery

# server/src/workers/tasks/readings.py

from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from celery import shared_task

tracer = trace.get_tracer("mbus.celery")
propagator = TraceContextTextMapPropagator()


@shared_task(name="readings.process_batch", acks_late=True)
def process_readings_batch(readings: list[dict], traceparent: dict | None = None):
    """
    Celery task med context propagation.
    traceparent sendes fra FastAPI/MQTT til Celery for end-to-end trace.
    """
    # Genopret trace-context fra HTTP-headers:
    ctx = propagator.extract(traceparent or {})

    with tracer.start_as_current_span(
        "celery.readings.batch",
        context=ctx,
        kind=trace.SpanKind.CONSUMER,
        attributes={
            "celery.task": "readings.process_batch",
            "batch.size": len(readings),
        },
    ):
        return _store_readings(readings)


# Kald fra FastAPI endpoint med context:
def dispatch_batch(readings: list, session):
    carrier = {}
    propagator.inject(carrier)    # Injicer aktuel traceparent

    process_readings_batch.delay(
        readings=[r.model_dump() for r in readings],
        traceparent=carrier,
    )

Metrics med OpenTelemetry

# server/src/telemetry.py

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter


def setup_metrics() -> None:
    reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(endpoint=settings.otel_endpoint),
        export_interval_millis=30_000,
    )
    provider = MeterProvider(resource=resource, metric_readers=[reader])
    metrics.set_meter_provider(provider)


meter = metrics.get_meter("mbus-gateway")

# Counters og histogrammer:
readings_ingested = meter.create_counter(
    "readings.ingested.total",
    description="Antal aflæsninger modtaget fra gateways",
)

mqtt_payload_size = meter.create_histogram(
    "mqtt.payload.bytes",
    description="MQTT payload størrelse i bytes",
    unit="bytes",
)

settlement_duration = meter.create_histogram(
    "settlement.generation.duration",
    description="PDF-afregning genereringstid",
    unit="ms",
)

active_gateways = meter.create_observable_gauge(
    "gateways.active",
    callbacks=[lambda _: [(count_active_gateways(), {})]],
    description="Antal gateways der har sendt data inden for 36t",
)


# Brug i kode:
async def ingest_reading(reading: Reading, gateway_id: str):
    readings_ingested.add(1, {
        "gateway_id": gateway_id,
        "meter_type": reading.meter_type,
        "tenant_id": str(reading.tenant_id),
    })

Sampling strategi

# server/src/telemetry.py
# Adaptive sampling — undgå at drukne i heartbeat-spans

from opentelemetry.sdk.trace.sampling import (
    ParentBased,
    TraceIdRatioBased,
    ALWAYS_ON,
    ALWAYS_OFF,
)


class MbusAdaptiveSampler:
    """
    Altid sample kritiske operationer.
    10% sample på heartbeat/routine.
    """
    def __init__(self):
        self._always = ALWAYS_ON
        self._never = ALWAYS_OFF
        self._ratio = TraceIdRatioBased(0.1)

    def should_sample(self, parent_ctx, trace_id, name, kind, attributes, links):
        # Altid trace errors, settlements og MQTT-payloads:
        critical = any(kw in name for kw in [
            "settlement", "error", "mqtt.payload", "ota.", "alert"
        ])
        if critical:
            return self._always.should_sample(
                parent_ctx, trace_id, name, kind, attributes, links
            )

        # Heartbeat: aldrig trace
        if "heartbeat" in name or "health" in name:
            return self._never.should_sample(
                parent_ctx, trace_id, name, kind, attributes, links
            )

        # Alt andet: 10%
        return self._ratio.should_sample(
            parent_ctx, trace_id, name, kind, attributes, links
        )

    @property
    def description(self):
        return "MbusAdaptiveSampler"


provider = TracerProvider(
    resource=resource,
    sampler=ParentBased(root=MbusAdaptiveSampler()),
)

Jaeger Docker Compose

# docker-compose.yml

services:
  jaeger:
    image: jaegertracing/all-in-one:1.57
    ports:
      - "16686:16686"    # Jaeger UI
      - "4317:4317"      # OTLP gRPC
      - "4318:4318"      # OTLP HTTP
    environment:
      COLLECTOR_OTLP_ENABLED: "true"
    volumes:
      - jaeger_data:/badger
    networks:
      - internal

  server:
    environment:
      OTEL_ENDPOINT: "http://jaeger:4317"
      OTEL_EXPORTER_OTLP_ENDPOINT: "http://jaeger:4317"
    depends_on:
      - jaeger

Konklusion

OpenTelemetry giver end-to-end tracing fra MQTT-modtagelse til Celery-tasks og PDF-generering. FastAPIInstrumentor auto-instrumenterer alle HTTP-endpoints. Custom spans til parse_wmbus_telegram og handle_gateway_payload giver granulær synlighed. Context propagation via TraceContextTextMapPropagator og traceparent i Celery-kald sikrer sammenhængende traces på tværs af service-grænser. Adaptive sampling prioriterer kritiske operationer (settlements, OTA, alerts) og dropper routine-heartbeats.

Se Grafana alerting guide eller structlog logging guide.