M-Bus Gateway
← Tilbage til blog
· Stripe· SaaS· abonnement· webhooks· FastAPI· Python· fakturering· betalinger

Stripe SaaS-abonnement med webhooks — implementering i FastAPI

Komplet Stripe-integration til SaaS-platform: abonnement oprettelse, invoice webhooks, dunning, trial-periode, unit-baseret fakturering og sikker webhook-verifikation med FastAPI.

Af M-Bus Gateway

M-Bus Gateway platformen bruger Stripe til abonnementsbaseret fakturering — 49 kr. pr. aktiv lejeenhed pr. måned. Her er den komplette implementering med webhooks, dunning og trial-håndtering.


Abonnementsmodel

Prismodel:
  49 kr./aktiv lejeenhed/måned
  "Aktiv enhed" = lejlighed med aktiv occupancy

Stripe-setup:
  Product: "M-Bus Gateway Platform"
  Price: unit_amount=4900 (øre), currency=dkk,
         billing_scheme=per_unit, recurring.interval=month

Forbrug synkronisering:
  Celery-task kl. 05:00 UTC 1. i måneden
  → Tæl aktive Occupancy pr. tenant
  → Opdatér Stripe SubscriptionItem.quantity

Oprettelse af abonnement

# server/src/stripe/service.py
import stripe
from uuid import UUID

stripe.api_key = settings.STRIPE_SECRET_KEY

async def create_subscription(
    tenant_id: UUID,
    email: str,
    company_name: str,
    unit_count: int,
    trial_days: int = 14,
) -> dict:
    """Opret Stripe Customer + Subscription med trial."""

    # 1. Opret Customer:
    customer = stripe.Customer.create(
        email=email,
        name=company_name,
        metadata={"tenant_id": str(tenant_id)},
    )

    # 2. Opret Subscription med trial:
    subscription = stripe.Subscription.create(
        customer=customer.id,
        items=[{
            "price": settings.STRIPE_PRICE_ID,
            "quantity": max(unit_count, 1),
        }],
        trial_period_days=trial_days,
        payment_settings={
            "payment_method_types": ["card"],
            "save_default_payment_method": "on_subscription",
        },
        expand=["latest_invoice.payment_intent"],
        metadata={"tenant_id": str(tenant_id)},
    )

    # 3. Gem Stripe IDs i DB:
    await update_tenant_stripe_ids(
        tenant_id=tenant_id,
        stripe_customer_id=customer.id,
        stripe_subscription_id=subscription.id,
    )

    return {
        "subscription_id": subscription.id,
        "trial_end": subscription.trial_end,
        "status": subscription.status,
        "client_secret": (
            subscription.latest_invoice
            .payment_intent.client_secret
            if subscription.latest_invoice else None
        ),
    }

Webhook endpoint

# server/src/stripe/router.py
from fastapi import APIRouter, Request, HTTPException
import stripe

router = APIRouter(prefix="/webhooks/stripe", tags=["stripe"])

WEBHOOK_SECRET = settings.STRIPE_WEBHOOK_SECRET

@router.post("/")
async def stripe_webhook(
    request: Request,
    session: AsyncSession = Depends(get_session),
):
    payload = await request.body()
    sig_header = request.headers.get("stripe-signature")

    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, WEBHOOK_SECRET
        )
    except stripe.error.SignatureVerificationError:
        raise HTTPException(400, "Ugyldig webhook signatur")

    handlers = {
        "invoice.paid": _handle_invoice_paid,
        "invoice.payment_failed": _handle_invoice_payment_failed,
        "invoice.upcoming": _handle_invoice_upcoming,
        "customer.subscription.deleted": _handle_subscription_deleted,
        "customer.subscription.trial_will_end": _handle_trial_will_end,
    }

    handler = handlers.get(event.type)
    if handler:
        await handler(event.data.object, session)

    return {"received": True}

Webhook handlers

async def _handle_invoice_paid(invoice, session: AsyncSession):
    """Nulstil past_due status når betaling lykkes."""
    tenant = await _get_tenant_by_stripe_customer(
        invoice.customer, session
    )
    if not tenant:
        return

    tenant.subscription_status = "active"
    tenant.past_due_since = None
    session.add(tenant)
    await session.commit()

    logger.info(
        "Betaling modtaget",
        tenant_id=str(tenant.id),
        amount=invoice.amount_paid / 100,
        currency=invoice.currency.upper(),
    )


async def _handle_invoice_payment_failed(invoice, session: AsyncSession):
    """Sæt past_due og send dunning email."""
    from server.src.workers.tasks.email import send_payment_failed_email

    tenant = await _get_tenant_by_stripe_customer(
        invoice.customer, session
    )
    if not tenant:
        return

    tenant.subscription_status = "past_due"
    session.add(tenant)
    await session.commit()

    # Find alle landlord-brugere og send email:
    users = await _get_landlord_users(tenant.id, session)
    for user in users:
        send_payment_failed_email.delay(
            user_id=str(user.id),
            amount=invoice.amount_due / 100,
            currency=invoice.currency.upper(),
            next_attempt=invoice.next_payment_attempt,
            stripe_customer_id=invoice.customer,
        )


async def _handle_invoice_upcoming(invoice, session: AsyncSession):
    """Send påmindelsesmail 7 dage inden fornyelse."""
    from server.src.workers.tasks.email import send_renewal_reminder_email

    tenant = await _get_tenant_by_stripe_customer(
        invoice.customer, session
    )
    if not tenant:
        return

    users = await _get_landlord_users(tenant.id, session)
    for user in users:
        send_renewal_reminder_email.delay(
            user_id=str(user.id),
            amount=invoice.amount_due / 100,
            currency=invoice.currency.upper(),
            renewal_date=invoice.period_end,
            unit_count=invoice.lines.data[0].quantity if invoice.lines.data else 0,
        )


async def _handle_subscription_deleted(subscription, session: AsyncSession):
    """Deaktivér tenant ved afmelding."""
    tenant = await _get_tenant_by_stripe_subscription(
        subscription.id, session
    )
    if not tenant:
        return

    tenant.subscription_status = "canceled"
    tenant.subscription_canceled_at = datetime.utcnow()
    session.add(tenant)
    await session.commit()

    logger.warning(
        "Abonnement annulleret",
        tenant_id=str(tenant.id),
    )


async def _handle_trial_will_end(subscription, session: AsyncSession):
    """Advar om trial-udløb 3 dage i forvejen."""
    from server.src.workers.tasks.email import send_trial_ending_email

    tenant = await _get_tenant_by_stripe_subscription(
        subscription.id, session
    )
    if not tenant:
        return

    users = await _get_landlord_users(tenant.id, session)
    for user in users:
        send_trial_ending_email.delay(
            user_id=str(user.id),
            trial_end=subscription.trial_end,
        )

Unit-baseret synkronisering

# server/src/workers/tasks/subscription_sync.py
from celery import shared_task
from sqlmodel import select, func

@shared_task(
    name="subscription_sync",
    bind=True,
    max_retries=3,
    default_retry_delay=300,
)
def sync_subscription_quantities(self):
    """Opdatér Stripe-abonnement med aktuel antal enheder."""
    import asyncio
    asyncio.run(_sync_all_tenants())


async def _sync_all_tenants():
    async with async_session_factory() as session:
        # Find alle aktive abonnementer:
        tenants = await session.execute(
            select(Tenant).where(
                Tenant.stripe_subscription_id != None,
                Tenant.subscription_status.in_(["active", "trialing"]),
                Tenant.deleted_at == None,
            )
        )

        for tenant in tenants.scalars():
            try:
                await _sync_tenant(tenant, session)
            except Exception as e:
                logger.error(
                    "Synk fejlede",
                    tenant_id=str(tenant.id),
                    error=str(e),
                )


async def _sync_tenant(tenant: Tenant, session: AsyncSession):
    # Tæl aktive occupancies:
    result = await session.execute(
        select(func.count(Occupancy.id)).where(
            Occupancy.tenant_id == tenant.id,
            Occupancy.move_out_date == None,
            Occupancy.deleted_at == None,
        )
    )
    active_units = result.scalar() or 0

    if active_units == tenant.active_units:
        return  # Ingen ændring

    # Opdatér Stripe:
    subscription = stripe.Subscription.retrieve(
        tenant.stripe_subscription_id
    )
    stripe.SubscriptionItem.modify(
        subscription.items.data[0].id,
        quantity=max(active_units, 1),
    )

    # Opdatér lokalt:
    tenant.active_units = active_units
    session.add(tenant)
    await session.commit()

    logger.info(
        "Abonnement synkroniseret",
        tenant_id=str(tenant.id),
        units=active_units,
    )

Stripe Customer Portal

# Lad kunder håndtere betalingsmetode selv:
@router.post("/billing/portal")
async def create_billing_portal(
    session: AsyncSession = Depends(get_session),
    user: User = Depends(require_role("landlord")),
):
    tenant = await session.get(Tenant, user.tenant_id)
    if not tenant.stripe_customer_id:
        raise HTTPException(404, "Ingen Stripe-konto fundet")

    portal_session = stripe.billing_portal.Session.create(
        customer=tenant.stripe_customer_id,
        return_url=f"{settings.FRONTEND_URL}/portal/profile",
    )
    return {"url": portal_session.url}

Idempotency: Beskytt mod duplikate events

# Webhook events kan leveres mere end én gang — altid idempotent:
async def _handle_invoice_paid(invoice, session: AsyncSession):
    tenant = await _get_tenant_by_stripe_customer(invoice.customer, session)
    if not tenant:
        return

    # Idempotent: Sæt kun hvis ikke allerede active:
    if tenant.subscription_status != "active":
        tenant.subscription_status = "active"
        tenant.past_due_since = None
        session.add(tenant)
        await session.commit()
        logger.info("Status opdateret til active", invoice_id=invoice.id)
    else:
        logger.debug("Allerede active — intet at gøre", invoice_id=invoice.id)

Konklusion

Stripe-integration i FastAPI kræver fire elementer: Customer + Subscription oprettelse, webhook endpoint med signaturverifikation, idempotente handlers for hvert event-type og månedlig unit-synkronisering. Trial-perioder og dunning håndteres automatisk via webhooks uden manuel intervention.

Se Stripe abonnement guide eller prøv platformen gratis i 14 dage.