M-Bus Gateway
← Tilbage til blog
· Stripe· webhooks· SaaS· Python· FastAPI· abonnement· betalinger· idempotens· dunning

Stripe webhooks til SaaS — håndtér abonnementshændelser i Python

Stripe webhook integration til SaaS i Python: signaturvalidering, idempotent håndtering, invoice.paid, customer.subscription.deleted, trial expiry og dunning flow.

Af M-Bus Gateway

M-Bus Gateway bruger Stripe webhooks til at holde abonnementsstatus i sync. Her er den komplette Python-implementering med signaturvalidering og idempotent håndtering.


Webhook-endpoint: Signaturvalidering

# server/src/stripe/router.py
import stripe
from fastapi import APIRouter, Request, HTTPException
from server.src.core.config import settings
import structlog

logger = structlog.get_logger()
router = APIRouter(prefix="/stripe", tags=["stripe"])


@router.post("/webhook")
async def stripe_webhook(request: Request):
    """
    Stripe webhook endpoint.
    VIGTIGT: Brug request.body() — ikke JSON-parsed body.
    Stripe signatur kræver rå bytes.
    """
    payload = await request.body()
    sig_header = request.headers.get("stripe-signature")

    if not sig_header:
        raise HTTPException(400, "Manglende Stripe-Signature header")

    try:
        event = stripe.Webhook.construct_event(
            payload,
            sig_header,
            settings.STRIPE_WEBHOOK_SECRET,
        )
    except stripe.error.SignatureVerificationError as exc:
        logger.warning("stripe_webhook_invalid_signature", error=str(exc))
        raise HTTPException(400, "Ugyldig webhook-signatur")

    # Idempotens: Duplikerede events ignoreres via event-ID:
    already_processed = await _check_event_processed(event["id"])
    if already_processed:
        return {"status": "duplicate", "event_id": event["id"]}

    # Dispatch til handler:
    await _dispatch_event(event)
    await _mark_event_processed(event["id"])

    return {"status": "ok", "event_id": event["id"]}

Event dispatcher: Alle relevante Stripe-events

async def _dispatch_event(event: dict) -> None:
    """Route Stripe events til korrekte handlers."""
    event_type = event["type"]
    data = event["data"]["object"]

    handlers = {
        # Abonnement oprettet (trial start):
        "customer.subscription.created": _handle_subscription_created,
        # Abonnement opdateret (trial→active, upgrade, downgrade):
        "customer.subscription.updated": _handle_subscription_updated,
        # Abonnement annulleret (churn):
        "customer.subscription.deleted": _handle_subscription_deleted,
        # Betaling gennemført (past_due → active):
        "invoice.paid": _handle_invoice_paid,
        # Betaling fejlede (dunning start):
        "invoice.payment_failed": _handle_invoice_payment_failed,
        # Kommende faktura (7 dages reminder):
        "invoice.upcoming": _handle_invoice_upcoming,
        # Kreditkort opdateret:
        "payment_method.attached": _handle_payment_method_attached,
    }

    handler = handlers.get(event_type)
    if handler:
        logger.info("stripe_event", type=event_type, id=event["id"])
        await handler(data)
    else:
        logger.debug("stripe_event_unhandled", type=event_type)

Subscription lifecycle handlers

async def _handle_subscription_updated(subscription: dict) -> None:
    """
    Håndtér alle subscription-tilstandsændringer.
    Trial → active (kreditkort tilføjet)
    Active → past_due (betaling fejler)
    Past_due → active (betaling lykkes)
    """
    stripe_customer_id = subscription["customer"]
    new_status = subscription["status"]
    trial_end = subscription.get("trial_end")

    async with AsyncSessionLocal() as session:
        tenant = await session.execute(
            select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
        )
        tenant = tenant.scalar_one_or_none()

        if not tenant:
            logger.warning("tenant_not_found_for_stripe_customer",
                           customer_id=stripe_customer_id)
            return

        tenant.subscription_status = new_status
        tenant.stripe_subscription_id = subscription["id"]
        if trial_end:
            tenant.trial_end = datetime.fromtimestamp(trial_end, tz=timezone.utc)
        tenant.updated_at = datetime.utcnow()
        session.add(tenant)
        await session.commit()

    logger.info("subscription_status_updated",
                tenant_id=str(tenant.id),
                new_status=new_status)


async def _handle_subscription_deleted(subscription: dict) -> None:
    """Abonnement annulleret — bloker adgang, bevar data."""
    stripe_customer_id = subscription["customer"]

    async with AsyncSessionLocal() as session:
        tenant = await session.scalar(
            select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
        )
        if tenant:
            tenant.subscription_status = "canceled"
            tenant.updated_at = datetime.utcnow()
            session.add(tenant)
            await session.commit()

    # Send churn-email og internt alert til sales:
    await send_churn_notification.delay(str(tenant.id))

Dunning flow: Betaling fejler

async def _handle_invoice_payment_failed(invoice: dict) -> None:
    """
    Betaling fejlede → start dunning flow.
    Stripe sender automatisk reminder-emails (konfigureret i Stripe dashboard).
    Platform sender intern email til admin + opdaterer status.
    """
    stripe_customer_id = invoice["customer"]
    attempt_count = invoice.get("attempt_count", 1)
    next_payment_attempt = invoice.get("next_payment_attempt")

    async with AsyncSessionLocal() as session:
        tenant = await session.scalar(
            select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
        )
        if not tenant:
            return

        # Sæt past_due ved første fejl:
        if tenant.subscription_status != "past_due":
            tenant.subscription_status = "past_due"
            tenant.updated_at = datetime.utcnow()
            session.add(tenant)
            await session.commit()

    # Send dunning email til alle landlord-brugere:
    await send_payment_failed_email.delay(
        tenant_id=str(tenant.id),
        attempt=attempt_count,
        next_attempt_ts=next_payment_attempt,
        amount_due=invoice["amount_due"],
        currency=invoice["currency"].upper(),
    )

    logger.warning("payment_failed",
                   tenant_id=str(tenant.id),
                   attempt=attempt_count,
                   amount_due=invoice["amount_due"])


async def _handle_invoice_paid(invoice: dict) -> None:
    """Betaling gennemført — nulstil past_due → active."""
    stripe_customer_id = invoice["customer"]

    async with AsyncSessionLocal() as session:
        tenant = await session.scalar(
            select(Tenant).where(Tenant.stripe_customer_id == stripe_customer_id)
        )
        if tenant and tenant.subscription_status == "past_due":
            tenant.subscription_status = "active"
            tenant.updated_at = datetime.utcnow()
            session.add(tenant)
            await session.commit()
            logger.info("subscription_reactivated", tenant_id=str(tenant.id))

Idempotens: Duplikerede events

# Stripe kan sende samme event flere gange (network retry)
# Platform skal håndtere dette idempotent

# processed_stripe_events tabel:
class ProcessedStripeEvent(SQLModel, table=True):
    __tablename__ = "processed_stripe_event"
    event_id: str = Field(primary_key=True)  # stripe event ID
    processed_at: datetime = Field(default_factory=datetime.utcnow)


async def _check_event_processed(event_id: str) -> bool:
    async with AsyncSessionLocal() as session:
        existing = await session.get(ProcessedStripeEvent, event_id)
        return existing is not None


async def _mark_event_processed(event_id: str) -> None:
    async with AsyncSessionLocal() as session:
        session.add(ProcessedStripeEvent(event_id=event_id))
        await session.commit()

Test med Stripe CLI

# Lokal webhook-test uden deployment:
stripe listen --forward-to localhost:8000/api/v1/stripe/webhook

# Trigger specifikke events:
stripe trigger customer.subscription.updated
stripe trigger invoice.payment_failed
stripe trigger customer.subscription.deleted

# Se events i real-time:
stripe logs tail

Konklusion

Stripe webhooks kræver rå bytes til signaturvalidering — aldrig JSON-parse body først. Idempotens via processed_stripe_events-tabel sikrer at duplikerede events ignoreres. Dunning flow: invoice.payment_failedpast_due + email → invoice.paidactive igen. Stripe CLI giver lokal test uden deployment.

Se Stripe Checkout guide eller Celery beat guide.