· Stripe· webhooks· SaaS· Python· FastAPI· abonnement· betalinger· idempotens· dunning
Stripe webhooks til SaaS — håndtér abonnementshændelser i Python
Stripe webhook integration til SaaS i Python: signaturvalidering, idempotent håndtering, invoice.paid, customer.subscription.deleted, trial expiry og dunning flow.
Af M-Bus Gateway
M-Bus Gateway bruger Stripe webhooks til at holde abonnementsstatus i sync. Her er den komplette Python-implementering med signaturvalidering og idempotent håndtering.
Webhook-endpoint: Signaturvalidering
# server/src/stripe/router.py
import stripe
from fastapi import APIRouter, Request, HTTPException
from server.src.core.config import settings
import structlog
logger = structlog.get_logger()
router = APIRouter(prefix="/stripe", tags=["stripe"])
@router.post("/webhook")
async def stripe_webhook(request: Request):
"""
Stripe webhook endpoint.
VIGTIGT: Brug request.body() — ikke JSON-parsed body.
Stripe signatur kræver rå bytes.
"""
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
if not sig_header:
raise HTTPException(400, "Manglende Stripe-Signature header")
try:
event = stripe.Webhook.construct_event(
payload,
sig_header,
settings.STRIPE_WEBHOOK_SECRET,
)
except stripe.error.SignatureVerificationError as exc:
logger.warning("stripe_webhook_invalid_signature", error=str(exc))
raise HTTPException(400, "Ugyldig webhook-signatur")
# Idempotens: Duplikerede events ignoreres via event-ID:
already_processed = await _check_event_processed(event["id"])
if already_processed:
return {"status": "duplicate", "event_id": event["id"]}
# Dispatch til handler:
await _dispatch_event(event)
await _mark_event_processed(event["id"])
return {"status": "ok", "event_id": event["id"]}
Event dispatcher: Alle relevante Stripe-events
async def _dispatch_event(event: dict) -> None:
"""Route Stripe events til korrekte handlers."""
event_type = event["type"]
data = event["data"]["object"]
handlers = {
# Abonnement oprettet (trial start):
"customer.subscription.created": _handle_subscription_created,
# Abonnement opdateret (trial→active, upgrade, downgrade):
"customer.subscription.updated": _handle_subscription_updated,
# Abonnement annulleret (churn):
"customer.subscription.deleted": _handle_subscription_deleted,
# Betaling gennemført (past_due → active):
"invoice.paid": _handle_invoice_paid,
# Betaling fejlede (dunning start):
"invoice.payment_failed": _handle_invoice_payment_failed,
# Kommende faktura (7 dages reminder):
"invoice.upcoming": _handle_invoice_upcoming,
# Kreditkort opdateret:
"payment_method.attached": _handle_payment_method_attached,
}
handler = handlers.get(event_type)
if handler:
logger.info("stripe_event", type=event_type, id=event["id"])
await handler(data)
else:
logger.debug("stripe_event_unhandled", type=event_type)
Subscription lifecycle handlers
async def _handle_subscription_updated(subscription: dict) -> None:
"""
Håndtér alle subscription-tilstandsændringer.
Trial → active (kreditkort tilføjet)
Active → past_due (betaling fejler)
Past_due → active (betaling lykkes)
"""
stripe_customer_id = subscription["customer"]
new_status = subscription["status"]
trial_end = subscription.get("trial_end")
async with AsyncSessionLocal() as session:
tenant = await session.execute(
select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
)
tenant = tenant.scalar_one_or_none()
if not tenant:
logger.warning("tenant_not_found_for_stripe_customer",
customer_id=stripe_customer_id)
return
tenant.subscription_status = new_status
tenant.stripe_subscription_id = subscription["id"]
if trial_end:
tenant.trial_end = datetime.fromtimestamp(trial_end, tz=timezone.utc)
tenant.updated_at = datetime.utcnow()
session.add(tenant)
await session.commit()
logger.info("subscription_status_updated",
tenant_id=str(tenant.id),
new_status=new_status)
async def _handle_subscription_deleted(subscription: dict) -> None:
"""Abonnement annulleret — bloker adgang, bevar data."""
stripe_customer_id = subscription["customer"]
async with AsyncSessionLocal() as session:
tenant = await session.scalar(
select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
)
if tenant:
tenant.subscription_status = "canceled"
tenant.updated_at = datetime.utcnow()
session.add(tenant)
await session.commit()
# Send churn-email og internt alert til sales:
await send_churn_notification.delay(str(tenant.id))
Dunning flow: Betaling fejler
async def _handle_invoice_payment_failed(invoice: dict) -> None:
"""
Betaling fejlede → start dunning flow.
Stripe sender automatisk reminder-emails (konfigureret i Stripe dashboard).
Platform sender intern email til admin + opdaterer status.
"""
stripe_customer_id = invoice["customer"]
attempt_count = invoice.get("attempt_count", 1)
next_payment_attempt = invoice.get("next_payment_attempt")
async with AsyncSessionLocal() as session:
tenant = await session.scalar(
select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
)
if not tenant:
return
# Sæt past_due ved første fejl:
if tenant.subscription_status != "past_due":
tenant.subscription_status = "past_due"
tenant.updated_at = datetime.utcnow()
session.add(tenant)
await session.commit()
# Send dunning email til alle landlord-brugere:
await send_payment_failed_email.delay(
tenant_id=str(tenant.id),
attempt=attempt_count,
next_attempt_ts=next_payment_attempt,
amount_due=invoice["amount_due"],
currency=invoice["currency"].upper(),
)
logger.warning("payment_failed",
tenant_id=str(tenant.id),
attempt=attempt_count,
amount_due=invoice["amount_due"])
async def _handle_invoice_paid(invoice: dict) -> None:
"""Betaling gennemført — nulstil past_due → active."""
stripe_customer_id = invoice["customer"]
async with AsyncSessionLocal() as session:
tenant = await session.scalar(
select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
)
if tenant and tenant.subscription_status == "past_due":
tenant.subscription_status = "active"
tenant.updated_at = datetime.utcnow()
session.add(tenant)
await session.commit()
logger.info("subscription_reactivated", tenant_id=str(tenant.id))
Idempotens: Duplikerede events
# Stripe kan sende samme event flere gange (network retry)
# Platform skal håndtere dette idempotent
# processed_stripe_events tabel:
class ProcessedStripeEvent(SQLModel, table=True):
__tablename__ = "processed_stripe_event"
event_id: str = Field(primary_key=True) # stripe event ID
processed_at: datetime = Field(default_factory=datetime.utcnow)
async def _check_event_processed(event_id: str) -> bool:
async with AsyncSessionLocal() as session:
existing = await session.get(ProcessedStripeEvent, event_id)
return existing is not None
async def _mark_event_processed(event_id: str) -> None:
async with AsyncSessionLocal() as session:
session.add(ProcessedStripeEvent(event_id=event_id))
await session.commit()
Test med Stripe CLI
# Lokal webhook-test uden deployment:
stripe listen --forward-to localhost:8000/api/v1/stripe/webhook
# Trigger specifikke events:
stripe trigger customer.subscription.updated
stripe trigger invoice.payment_failed
stripe trigger customer.subscription.deleted
# Se events i real-time:
stripe logs tail
Konklusion
Stripe webhooks kræver rå bytes til signaturvalidering — aldrig JSON-parse body først. Idempotens via processed_stripe_events-tabel sikrer at duplikerede events ignoreres. Dunning flow: invoice.payment_failed → past_due + email → invoice.paid → active igen. Stripe CLI giver lokal test uden deployment.