M-Bus Gateway
← Tilbage til blog
· FastAPI· webhook· OpenAPI· Python· Svix· API· backend· integration

FastAPI webhooks og OpenAPI 3.1 — dokumentér og test udgående events

FastAPI webhook-dokumentation med OpenAPI 3.1: outgoing webhooks, callback-events, stripe-lignende payload-specifikation, Svix webhook-dispatcher, signaturvalidering og retry-logik.

Af M-Bus Gateway

Udgående webhooks til tredjepartsintegrationer kræver klar dokumentation og pålidelig levering. Her er implementeringen med OpenAPI 3.1 og Svix.


OpenAPI 3.1 webhook-dokumentation

# server/src/main.py
# FastAPI understøtter OpenAPI 3.1 webhooks via 'webhooks' parameter

from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi

app = FastAPI(title="M-Bus Gateway API", version="1.0.0")


# Definer webhook-schemas:
@app.webhooks.post("reading.ingested")
def reading_ingested_webhook(body: ReadingIngestedPayload):
    """
    **reading.ingested** — Ny aflæsning modtaget fra gateway.

    Udsendes indenfor 30 sekunder efter gateway-data er berigeret
    og gemt i TimescaleDB.

    Payload indeholder måler-ID, timestamp, forbrug og gateway-ID.
    
    Brug til: Real-time BI-integration, SCADA-bridge, SMS-alarm.
    """


@app.webhooks.post("settlement.sent")
def settlement_sent_webhook(body: SettlementSentPayload):
    """
    **settlement.sent** — Årsafregning sendt til lejer.

    Udsendes når PDF er genereret og email afsendt via Brevo.
    Payload inkluderer settlement_id, unit_id, email og beløb.
    """


@app.webhooks.post("gateway.offline")
def gateway_offline_webhook(body: GatewayOfflinePayload):
    """
    **gateway.offline** — Gateway har ikke sendt data i 36+ timer.
    """

Webhook payload schemas

# server/src/webhooks/schemas.py

from pydantic import BaseModel, Field
from datetime import datetime
import uuid


class WebhookEvent(BaseModel):
    """Base schema for alle webhook events."""
    event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    event_type: str
    api_version: str = "2025-01-01"
    created_at: datetime = Field(default_factory=datetime.utcnow)
    tenant_id: uuid.UUID


class ReadingIngestedPayload(WebhookEvent):
    event_type: str = "reading.ingested"
    data: ReadingData

class ReadingData(BaseModel):
    meter_installation_id: uuid.UUID
    gateway_id: str
    timestamp: datetime
    value: float
    unit: str
    meter_type: str


class SettlementSentPayload(WebhookEvent):
    event_type: str = "settlement.sent"
    data: SettlementData

class SettlementData(BaseModel):
    settlement_id: uuid.UUID
    unit_id: uuid.UUID
    tenant_email: str
    result_amount: float
    currency: str = "DKK"
    period_end: str   # ISO date


class GatewayOfflinePayload(WebhookEvent):
    event_type: str = "gateway.offline"
    data: GatewayData

class GatewayData(BaseModel):
    gateway_id: str
    last_seen_at: datetime
    hours_offline: float
    property_id: uuid.UUID

Svix webhook-dispatcher

# server/src/webhooks/dispatcher.py
# Svix: Managed webhook delivery med retry, logs og portal

import svix
from svix import Svix, MessageIn

svix_client = Svix(settings.svix_api_key.get_secret_value())


async def dispatch_webhook(
    event_type: str,
    payload: WebhookEvent,
    tenant_id: uuid.UUID,
) -> None:
    """
    Send webhook via Svix til alle abonnenter for denne tenant.
    Svix håndterer: Retry (exponential backoff), signatur, delivery log.
    """
    try:
        await asyncio.to_thread(
            svix_client.message.create,
            str(tenant_id),   # Svix app_id pr. tenant
            MessageIn(
                event_type=event_type,
                payload=payload.model_dump(mode="json"),
            )
        )
        log.info("Webhook dispatched", event_type=event_type,
                 tenant_id=str(tenant_id))
    except svix.WebhookVerificationError as e:
        log.error("Svix dispatch fejlet", error=str(e))


# Brug fra Celery task:
@shared_task(name="webhooks.dispatch_reading")
def dispatch_reading_webhook(reading_data: dict, tenant_id: str):
    payload = ReadingIngestedPayload(
        tenant_id=uuid.UUID(tenant_id),
        data=ReadingData(**reading_data),
    )
    asyncio.run(dispatch_webhook("reading.ingested", payload, uuid.UUID(tenant_id)))

Signaturvalidering i modtager

# Modtager (tredjepartsintegration) — verificér Svix-signatur:

from svix.webhooks import Webhook, WebhookVerificationError
from fastapi import Request, HTTPException

@external_router.post("/webhooks/mbus")
async def receive_mbus_webhook(request: Request):
    """
    Tredjeparts endpoint der modtager webhooks fra M-Bus Gateway platform.
    """
    payload = await request.body()
    headers = dict(request.headers)

    wh = Webhook(settings.webhook_signing_secret)
    try:
        msg = wh.verify(payload, headers)
    except WebhookVerificationError:
        raise HTTPException(status_code=400, detail="Invalid signature")

    event_type = msg.get("event_type")
    data = msg.get("data", {})

    # Dispatch til handler:
    handlers = {
        "reading.ingested": handle_new_reading,
        "settlement.sent": handle_settlement_sent,
        "gateway.offline": handle_gateway_offline,
    }

    handler = handlers.get(event_type)
    if handler:
        await handler(data)

    return {"received": True}

Webhook-management API

# server/src/webhooks/router.py
# Lad tenants registrere egne webhook-endpoints:

@router.post("/webhooks/endpoints")
async def create_webhook_endpoint(
    data: WebhookEndpointCreate,
    session: AsyncSession = Depends(get_session),
    current_user: User = Depends(require_role("landlord")),
) -> WebhookEndpointOut:
    """Registrér ny webhook-endpoint URL til modtagelse af events."""
    # Opret i Svix:
    endpoint = await asyncio.to_thread(
        svix_client.endpoint.create,
        str(current_user.tenant_id),
        EndpointIn(
            url=str(data.url),
            filter_types=data.event_types or None,   # None = alle events
            description=data.description,
            version=1,
        )
    )

    return WebhookEndpointOut(
        endpoint_id=endpoint.id,
        url=data.url,
        event_types=data.event_types,
        created_at=endpoint.created_at,
    )


@router.get("/webhooks/logs")
async def get_webhook_logs(
    endpoint_id: str,
    limit: int = Query(50, le=200),
    current_user: User = Depends(require_role("landlord")),
) -> list[WebhookAttempt]:
    """Vis delivery-log for webhook-endpoint (success/failure + retry)."""
    attempts = await asyncio.to_thread(
        svix_client.message_attempt.list_by_endpoint,
        str(current_user.tenant_id),
        endpoint_id,
        MessageAttemptListByEndpointOptions(limit=limit),
    )
    return [WebhookAttempt.from_svix(a) for a in attempts.data]

Retry-logik uden Svix (DIY)

# Alternativ: Implementér retry selv uden Svix:

@shared_task(
    name="webhooks.deliver",
    max_retries=5,
    default_retry_delay=60,
    acks_late=True,
)
def deliver_webhook(endpoint_url: str, payload: dict, attempt: int = 0):
    import hmac, hashlib, time

    body = json.dumps(payload, default=str).encode()
    timestamp = str(int(time.time()))
    signature = hmac.new(
        settings.webhook_signing_secret.encode(),
        f"{timestamp}.{body.decode()}".encode(),
        hashlib.sha256,
    ).hexdigest()

    try:
        resp = httpx.post(
            endpoint_url,
            content=body,
            headers={
                "Content-Type": "application/json",
                "Svix-Timestamp": timestamp,
                "Svix-Signature": f"v1,{signature}",
                "Svix-Id": str(uuid.uuid4()),
            },
            timeout=10.0,
        )
        resp.raise_for_status()
    except (httpx.HTTPError, httpx.TimeoutException) as exc:
        raise deliver_webhook.retry(
            exc=exc,
            countdown=2 ** attempt * 30,   # Exponential: 30s, 60s, 120s...
        )

Konklusion

FastAPI OpenAPI 3.1 webhooks giver selvdokumenterende udgående events — app.webhooks.post() dekorerer payload-schemas der vises i /docs. Svix håndterer retry (exponential backoff), signaturvalidering og en lejer-venlig delivery-log UI. Tredjepartsintegrationer bruger HMAC-SHA256 signaturvalidering til at verificere at payloaden er fra platformen. Webhook-endpoints administreres pr. tenant via REST API.

Se OpenAPI dokumentation guide eller Stripe webhook guide.