OpenTelemetry i Python — tracing, metrics og logs til IoT SaaS
OpenTelemetry Python SDK: automatisk instrumentering af FastAPI og SQLAlchemy, custom spans, Prometheus metrics, Grafana Tempo og distributed tracing til gateway-events.
Af M-Bus Gateway
OpenTelemetry giver unified observability: tracing, metrics og logs i ét framework. Her er produktions-opsætningen til en FastAPI + Celery IoT-platform.
Installation og grundkonfiguration
# uv add opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
# uv add opentelemetry-instrumentation-fastapi
# uv add opentelemetry-instrumentation-sqlalchemy
# uv add opentelemetry-instrumentation-redis
# uv add opentelemetry-instrumentation-httpx
# server/src/telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
import structlog
log = structlog.get_logger()
def setup_telemetry(service_name: str = "mbus-server") -> None:
resource = Resource(attributes={
ResourceAttributes.SERVICE_NAME: service_name,
ResourceAttributes.SERVICE_VERSION: "1.0.0",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: "production",
})
# Tracing: Eksportér til Grafana Tempo via OTLP
tracer_provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(
endpoint="http://tempo:4317", # Grafana Tempo
insecure=True,
)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
# Metrics: Eksportér til Prometheus
prometheus_reader = PrometheusMetricReader(port=9090)
meter_provider = MeterProvider(
resource=resource,
metric_readers=[prometheus_reader],
)
metrics.set_meter_provider(meter_provider)
log.info("OpenTelemetry initialized", service=service_name)
FastAPI auto-instrumentering
# server/src/main.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from server.src.telemetry import setup_telemetry
@asynccontextmanager
async def lifespan(app: FastAPI):
setup_telemetry()
# Auto-instrumentér — ingen kodeændringer i routers:
FastAPIInstrumentor.instrument_app(
app,
excluded_urls="api/health", # Undgå at tracke health checks
)
RedisInstrumentor().instrument()
yield
FastAPIInstrumentor().uninstrument()
app = FastAPI(lifespan=lifespan)
# SQLAlchemy instrumentering (ved engine-oprettelse):
async def init_db():
engine = create_async_engine(...)
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
return engine
Custom spans til gateway-events
# server/src/mqtt/ingest.py
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def ingest_gateway_data(gateway_id: str, payload: dict) -> None:
with tracer.start_as_current_span("gateway.ingest") as span:
span.set_attribute("gateway.id", gateway_id)
span.set_attribute("reading.count", len(payload.get("readings", [])))
try:
# Validér data
with tracer.start_as_current_span("gateway.validate"):
validated = validate_payload(payload)
span.set_attribute("validation.ok", True)
# Gem i database
with tracer.start_as_current_span("database.bulk_insert") as db_span:
count = await bulk_insert_readings(validated["readings"])
db_span.set_attribute("db.rows_inserted", count)
# Opdatér gateway-status
with tracer.start_as_current_span("gateway.update_status"):
await update_gateway_last_seen(gateway_id)
except Exception as exc:
span.record_exception(exc)
span.set_status(trace.StatusCode.ERROR, str(exc))
raise
Custom metrics
# server/src/metrics.py
from opentelemetry import metrics
meter = metrics.get_meter(__name__)
# Counters:
readings_ingested = meter.create_counter(
"mbus.readings.ingested.total",
description="Total antal readings modtaget fra gateways",
unit="1",
)
settlements_generated = meter.create_counter(
"mbus.settlements.generated.total",
description="Antal genererede afregninger",
unit="1",
)
# Gauges (aktuelle værdier):
active_gateways = meter.create_observable_gauge(
"mbus.gateways.active",
description="Antal gateways set inden for 36 timer",
unit="1",
)
# Histograms:
reading_processing_duration = meter.create_histogram(
"mbus.reading.processing.duration",
description="Behandlingstid for gateway-payload",
unit="ms",
)
# Brug i kode:
async def ingest_gateway_data(gateway_id: str, payload: dict) -> None:
start = time.monotonic()
count = len(payload.get("readings", []))
await _do_ingest(gateway_id, payload)
duration_ms = (time.monotonic() - start) * 1000
readings_ingested.add(count, attributes={"gateway_id": gateway_id})
reading_processing_duration.record(duration_ms, attributes={"gateway_id": gateway_id})
Grafana dashboard: Nøgle-metrics
# grafana-dashboard.json (forenklet)
# Nøgle-paneler til IoT SaaS monitoring:
panels:
- title: "Gateway heartbeat rate"
query: "rate(mbus_readings_ingested_total[5m])"
- title: "API P99 latency"
query: "histogram_quantile(0.99, rate(http_server_duration_milliseconds_bucket[5m]))"
- title: "Active gateways (< 36h)"
query: "mbus_gateways_active"
- title: "Database connection pool utilization"
query: "mbus_db_pool_in_use / mbus_db_pool_max_size * 100"
- title: "Settlement generation rate"
query: "rate(mbus_settlements_generated_total[1h])"
- title: "Failed readings (AES decrypt errors)"
query: "rate(mbus_readings_decrypt_errors_total[5m])"
Struktureret logging med trace context
# server/src/logging_setup.py
import structlog
from opentelemetry import trace
def add_trace_context(logger, method, event_dict):
"""Tilføj trace_id og span_id til structlog events."""
current_span = trace.get_current_span()
if current_span.is_recording():
ctx = current_span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
structlog.configure(
processors=[
add_trace_context,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
]
)
# Logs med trace context kan nu korreleres med traces i Grafana:
# trace_id i log → klik i Loki → åbn trace i Tempo
Konklusion
OpenTelemetry giver end-to-end observability på tværs af FastAPI, SQLAlchemy og Celery med minimal kodeændring. Auto-instrumentering fanger HTTP-requests og database-queries automatisk. Custom spans giver indsigt i gateway-ingest og settlement-processing. Metrics i Prometheus + traces i Grafana Tempo + logs i Loki (med trace_id correlation) udgør en komplet observability-stak.
Se IoT gateway monitoring Grafana guide eller python structlog logging guide.