M-Bus Gateway
← Tilbage til blog
· OpenTelemetry· Python· observability· tracing· Prometheus· Grafana· FastAPI· IoT

OpenTelemetry i Python — tracing, metrics og logs til IoT SaaS

OpenTelemetry Python SDK: automatisk instrumentering af FastAPI og SQLAlchemy, custom spans, Prometheus metrics, Grafana Tempo og distributed tracing til gateway-events.

Af M-Bus Gateway

OpenTelemetry giver unified observability: tracing, metrics og logs i ét framework. Her er produktions-opsætningen til en FastAPI + Celery IoT-platform.


Installation og grundkonfiguration

# uv add opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
# uv add opentelemetry-instrumentation-fastapi
# uv add opentelemetry-instrumentation-sqlalchemy
# uv add opentelemetry-instrumentation-redis
# uv add opentelemetry-instrumentation-httpx

# server/src/telemetry.py

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
import structlog

log = structlog.get_logger()


def setup_telemetry(service_name: str = "mbus-server") -> None:
    resource = Resource(attributes={
        ResourceAttributes.SERVICE_NAME: service_name,
        ResourceAttributes.SERVICE_VERSION: "1.0.0",
        ResourceAttributes.DEPLOYMENT_ENVIRONMENT: "production",
    })

    # Tracing: Eksportér til Grafana Tempo via OTLP
    tracer_provider = TracerProvider(resource=resource)
    otlp_exporter = OTLPSpanExporter(
        endpoint="http://tempo:4317",  # Grafana Tempo
        insecure=True,
    )
    tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
    trace.set_tracer_provider(tracer_provider)

    # Metrics: Eksportér til Prometheus
    prometheus_reader = PrometheusMetricReader(port=9090)
    meter_provider = MeterProvider(
        resource=resource,
        metric_readers=[prometheus_reader],
    )
    metrics.set_meter_provider(meter_provider)

    log.info("OpenTelemetry initialized", service=service_name)

FastAPI auto-instrumentering

# server/src/main.py

from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from server.src.telemetry import setup_telemetry

@asynccontextmanager
async def lifespan(app: FastAPI):
    setup_telemetry()

    # Auto-instrumentér — ingen kodeændringer i routers:
    FastAPIInstrumentor.instrument_app(
        app,
        excluded_urls="api/health",  # Undgå at tracke health checks
    )
    RedisInstrumentor().instrument()

    yield

    FastAPIInstrumentor().uninstrument()

app = FastAPI(lifespan=lifespan)

# SQLAlchemy instrumentering (ved engine-oprettelse):
async def init_db():
    engine = create_async_engine(...)
    SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
    return engine

Custom spans til gateway-events

# server/src/mqtt/ingest.py

from opentelemetry import trace

tracer = trace.get_tracer(__name__)


async def ingest_gateway_data(gateway_id: str, payload: dict) -> None:
    with tracer.start_as_current_span("gateway.ingest") as span:
        span.set_attribute("gateway.id", gateway_id)
        span.set_attribute("reading.count", len(payload.get("readings", [])))

        try:
            # Validér data
            with tracer.start_as_current_span("gateway.validate"):
                validated = validate_payload(payload)
                span.set_attribute("validation.ok", True)

            # Gem i database
            with tracer.start_as_current_span("database.bulk_insert") as db_span:
                count = await bulk_insert_readings(validated["readings"])
                db_span.set_attribute("db.rows_inserted", count)

            # Opdatér gateway-status
            with tracer.start_as_current_span("gateway.update_status"):
                await update_gateway_last_seen(gateway_id)

        except Exception as exc:
            span.record_exception(exc)
            span.set_status(trace.StatusCode.ERROR, str(exc))
            raise

Custom metrics

# server/src/metrics.py

from opentelemetry import metrics

meter = metrics.get_meter(__name__)

# Counters:
readings_ingested = meter.create_counter(
    "mbus.readings.ingested.total",
    description="Total antal readings modtaget fra gateways",
    unit="1",
)

settlements_generated = meter.create_counter(
    "mbus.settlements.generated.total",
    description="Antal genererede afregninger",
    unit="1",
)

# Gauges (aktuelle værdier):
active_gateways = meter.create_observable_gauge(
    "mbus.gateways.active",
    description="Antal gateways set inden for 36 timer",
    unit="1",
)

# Histograms:
reading_processing_duration = meter.create_histogram(
    "mbus.reading.processing.duration",
    description="Behandlingstid for gateway-payload",
    unit="ms",
)


# Brug i kode:
async def ingest_gateway_data(gateway_id: str, payload: dict) -> None:
    start = time.monotonic()
    count = len(payload.get("readings", []))

    await _do_ingest(gateway_id, payload)

    duration_ms = (time.monotonic() - start) * 1000
    readings_ingested.add(count, attributes={"gateway_id": gateway_id})
    reading_processing_duration.record(duration_ms, attributes={"gateway_id": gateway_id})

Grafana dashboard: Nøgle-metrics

# grafana-dashboard.json (forenklet)
# Nøgle-paneler til IoT SaaS monitoring:

panels:
  - title: "Gateway heartbeat rate"
    query: "rate(mbus_readings_ingested_total[5m])"

  - title: "API P99 latency"
    query: "histogram_quantile(0.99, rate(http_server_duration_milliseconds_bucket[5m]))"

  - title: "Active gateways (< 36h)"
    query: "mbus_gateways_active"

  - title: "Database connection pool utilization"
    query: "mbus_db_pool_in_use / mbus_db_pool_max_size * 100"

  - title: "Settlement generation rate"
    query: "rate(mbus_settlements_generated_total[1h])"

  - title: "Failed readings (AES decrypt errors)"
    query: "rate(mbus_readings_decrypt_errors_total[5m])"

Struktureret logging med trace context

# server/src/logging_setup.py

import structlog
from opentelemetry import trace


def add_trace_context(logger, method, event_dict):
    """Tilføj trace_id og span_id til structlog events."""
    current_span = trace.get_current_span()
    if current_span.is_recording():
        ctx = current_span.get_span_context()
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict


structlog.configure(
    processors=[
        add_trace_context,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ]
)

# Logs med trace context kan nu korreleres med traces i Grafana:
# trace_id i log → klik i Loki → åbn trace i Tempo

Konklusion

OpenTelemetry giver end-to-end observability på tværs af FastAPI, SQLAlchemy og Celery med minimal kodeændring. Auto-instrumentering fanger HTTP-requests og database-queries automatisk. Custom spans giver indsigt i gateway-ingest og settlement-processing. Metrics i Prometheus + traces i Grafana Tempo + logs i Loki (med trace_id correlation) udgør en komplet observability-stak.

Se IoT gateway monitoring Grafana guide eller python structlog logging guide.