· FastAPI· webhook· OpenAPI· Python· Svix· API· backend· integration
FastAPI webhooks og OpenAPI 3.1 — dokumentér og test udgående events
FastAPI webhook-dokumentation med OpenAPI 3.1: outgoing webhooks, callback-events, stripe-lignende payload-specifikation, Svix webhook-dispatcher, signaturvalidering og retry-logik.
Af M-Bus Gateway
Udgående webhooks til tredjepartsintegrationer kræver klar dokumentation og pålidelig levering. Her er implementeringen med OpenAPI 3.1 og Svix.
OpenAPI 3.1 webhook-dokumentation
# server/src/main.py
# FastAPI understøtter OpenAPI 3.1 webhooks via 'webhooks' parameter
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI(title="M-Bus Gateway API", version="1.0.0")
# Definer webhook-schemas:
@app.webhooks.post("reading.ingested")
def reading_ingested_webhook(body: ReadingIngestedPayload):
"""
**reading.ingested** — Ny aflæsning modtaget fra gateway.
Udsendes indenfor 30 sekunder efter gateway-data er berigeret
og gemt i TimescaleDB.
Payload indeholder måler-ID, timestamp, forbrug og gateway-ID.
Brug til: Real-time BI-integration, SCADA-bridge, SMS-alarm.
"""
@app.webhooks.post("settlement.sent")
def settlement_sent_webhook(body: SettlementSentPayload):
"""
**settlement.sent** — Årsafregning sendt til lejer.
Udsendes når PDF er genereret og email afsendt via Brevo.
Payload inkluderer settlement_id, unit_id, email og beløb.
"""
@app.webhooks.post("gateway.offline")
def gateway_offline_webhook(body: GatewayOfflinePayload):
"""
**gateway.offline** — Gateway har ikke sendt data i 36+ timer.
"""
Webhook payload schemas
# server/src/webhooks/schemas.py
from pydantic import BaseModel, Field
from datetime import datetime
import uuid
class WebhookEvent(BaseModel):
"""Base schema for alle webhook events."""
event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
event_type: str
api_version: str = "2025-01-01"
created_at: datetime = Field(default_factory=datetime.utcnow)
tenant_id: uuid.UUID
class ReadingIngestedPayload(WebhookEvent):
event_type: str = "reading.ingested"
data: ReadingData
class ReadingData(BaseModel):
meter_installation_id: uuid.UUID
gateway_id: str
timestamp: datetime
value: float
unit: str
meter_type: str
class SettlementSentPayload(WebhookEvent):
event_type: str = "settlement.sent"
data: SettlementData
class SettlementData(BaseModel):
settlement_id: uuid.UUID
unit_id: uuid.UUID
tenant_email: str
result_amount: float
currency: str = "DKK"
period_end: str # ISO date
class GatewayOfflinePayload(WebhookEvent):
event_type: str = "gateway.offline"
data: GatewayData
class GatewayData(BaseModel):
gateway_id: str
last_seen_at: datetime
hours_offline: float
property_id: uuid.UUID
Svix webhook-dispatcher
# server/src/webhooks/dispatcher.py
# Svix: Managed webhook delivery med retry, logs og portal
import svix
from svix import Svix, MessageIn
svix_client = Svix(settings.svix_api_key.get_secret_value())
async def dispatch_webhook(
event_type: str,
payload: WebhookEvent,
tenant_id: uuid.UUID,
) -> None:
"""
Send webhook via Svix til alle abonnenter for denne tenant.
Svix håndterer: Retry (exponential backoff), signatur, delivery log.
"""
try:
await asyncio.to_thread(
svix_client.message.create,
str(tenant_id), # Svix app_id pr. tenant
MessageIn(
event_type=event_type,
payload=payload.model_dump(mode="json"),
)
)
log.info("Webhook dispatched", event_type=event_type,
tenant_id=str(tenant_id))
except svix.WebhookVerificationError as e:
log.error("Svix dispatch fejlet", error=str(e))
# Brug fra Celery task:
@shared_task(name="webhooks.dispatch_reading")
def dispatch_reading_webhook(reading_data: dict, tenant_id: str):
payload = ReadingIngestedPayload(
tenant_id=uuid.UUID(tenant_id),
data=ReadingData(**reading_data),
)
asyncio.run(dispatch_webhook("reading.ingested", payload, uuid.UUID(tenant_id)))
Signaturvalidering i modtager
# Modtager (tredjepartsintegration) — verificér Svix-signatur:
from svix.webhooks import Webhook, WebhookVerificationError
from fastapi import Request, HTTPException
@external_router.post("/webhooks/mbus")
async def receive_mbus_webhook(request: Request):
"""
Tredjeparts endpoint der modtager webhooks fra M-Bus Gateway platform.
"""
payload = await request.body()
headers = dict(request.headers)
wh = Webhook(settings.webhook_signing_secret)
try:
msg = wh.verify(payload, headers)
except WebhookVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
event_type = msg.get("event_type")
data = msg.get("data", {})
# Dispatch til handler:
handlers = {
"reading.ingested": handle_new_reading,
"settlement.sent": handle_settlement_sent,
"gateway.offline": handle_gateway_offline,
}
handler = handlers.get(event_type)
if handler:
await handler(data)
return {"received": True}
Webhook-management API
# server/src/webhooks/router.py
# Lad tenants registrere egne webhook-endpoints:
@router.post("/webhooks/endpoints")
async def create_webhook_endpoint(
data: WebhookEndpointCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(require_role("landlord")),
) -> WebhookEndpointOut:
"""Registrér ny webhook-endpoint URL til modtagelse af events."""
# Opret i Svix:
endpoint = await asyncio.to_thread(
svix_client.endpoint.create,
str(current_user.tenant_id),
EndpointIn(
url=str(data.url),
filter_types=data.event_types or None, # None = alle events
description=data.description,
version=1,
)
)
return WebhookEndpointOut(
endpoint_id=endpoint.id,
url=data.url,
event_types=data.event_types,
created_at=endpoint.created_at,
)
@router.get("/webhooks/logs")
async def get_webhook_logs(
endpoint_id: str,
limit: int = Query(50, le=200),
current_user: User = Depends(require_role("landlord")),
) -> list[WebhookAttempt]:
"""Vis delivery-log for webhook-endpoint (success/failure + retry)."""
attempts = await asyncio.to_thread(
svix_client.message_attempt.list_by_endpoint,
str(current_user.tenant_id),
endpoint_id,
MessageAttemptListByEndpointOptions(limit=limit),
)
return [WebhookAttempt.from_svix(a) for a in attempts.data]
Retry-logik uden Svix (DIY)
# Alternativ: Implementér retry selv uden Svix:
@shared_task(
name="webhooks.deliver",
max_retries=5,
default_retry_delay=60,
acks_late=True,
)
def deliver_webhook(endpoint_url: str, payload: dict, attempt: int = 0):
import hmac, hashlib, time
body = json.dumps(payload, default=str).encode()
timestamp = str(int(time.time()))
signature = hmac.new(
settings.webhook_signing_secret.encode(),
f"{timestamp}.{body.decode()}".encode(),
hashlib.sha256,
).hexdigest()
try:
resp = httpx.post(
endpoint_url,
content=body,
headers={
"Content-Type": "application/json",
"Svix-Timestamp": timestamp,
"Svix-Signature": f"v1,{signature}",
"Svix-Id": str(uuid.uuid4()),
},
timeout=10.0,
)
resp.raise_for_status()
except (httpx.HTTPError, httpx.TimeoutException) as exc:
raise deliver_webhook.retry(
exc=exc,
countdown=2 ** attempt * 30, # Exponential: 30s, 60s, 120s...
)
Konklusion
FastAPI OpenAPI 3.1 webhooks giver selvdokumenterende udgående events — app.webhooks.post() dekorerer payload-schemas der vises i /docs. Svix håndterer retry (exponential backoff), signaturvalidering og en lejer-venlig delivery-log UI. Tredjepartsintegrationer bruger HMAC-SHA256 signaturvalidering til at verificere at payloaden er fra platformen. Webhook-endpoints administreres pr. tenant via REST API.