M-Bus Gateway
← Tilbage til blog
· FastAPI· BackgroundTasks· Celery· Redis· Python· async· baggrundsjobs· IoT· SaaS

FastAPI BackgroundTasks og Celery — hvornår bruger du hvad?

FastAPI BackgroundTasks vs Celery: hvornår er den simple løsning nok, hvornår kræves Celery, Celery-resultat tracking, beat-schedule og fejlhåndtering.

Af M-Bus Gateway

FastAPI har en simpel BackgroundTasks mekanisme til simple opgaver — men Celery er nødvendigt til produktionskritiske baggrundsjobs. Her er forskellen og hvornår du bruger hvad.


FastAPI BackgroundTasks — hvornår er det nok

# Godt til: Hurtige, ikke-kritiske opgaver der kan miste data ved crash

from fastapi import BackgroundTasks

@router.post("/settlements/{id}/send")
async def send_settlement(
    id: UUID,
    background_tasks: BackgroundTasks,
    user: TokenPayload = Depends(require_role("landlord")),
    session: AsyncSession = Depends(get_session),
):
    settlement = await get_or_404(session, Settlement, id, user.tenant_id)

    # Background task: Starter EFTER response er sendt til klient
    # ❌ Problem: Hvis serveren crasher efter response men inden email sendes
    #            → Email sendes aldrig
    # ✅ OK hvis: Email-fejl er acceptable, og retransmission sker manuelt
    background_tasks.add_task(
        _send_settlement_email_simple,
        settlement_id=str(id),
        recipient=settlement.tenant_email,
    )

    return {"status": "queued"}

async def _send_settlement_email_simple(settlement_id: str, recipient: str):
    """Simpel email — taber ved server-crash."""
    await send_email(to=recipient, template="settlement", vars={"id": settlement_id})

Celery — hvornår er det nødvendigt

# Celery kræves når:
# 1. Task IKKE må miste data ved server-crash
# 2. Task tager lang tid (PDF-generering, OCR, DMI-kald)
# 3. Task skal retries ved fejl (ekstern API nede)
# 4. Task skal schedules (Celery beat)
# 5. Task skal kunne skaleres horisontalt (multiple workers)
# 6. Task-resultat skal trackes (status, fejlbesked)

from server.src.workers.celery_app import celery_app

@router.post("/settlements/{id}/send")
async def send_settlement(
    id: UUID,
    user: TokenPayload = Depends(require_role("landlord")),
    session: AsyncSession = Depends(get_session),
):
    settlement = await get_or_404(session, Settlement, id, user.tenant_id)

    # Celery task: Gemmes i Redis broker — overlever server-crash
    task = send_settlement_email.apply_async(
        args=[str(id)],
        queue="critical",           # Høj prioritet
        countdown=0,
        task_id=f"settlement-email-{id}",  # Idempotent task ID
        retry=True,
        max_retries=3,
    )

    # Gem task_id til status-tracking
    settlement.email_task_id = task.id
    await session.commit()

    return {"task_id": task.id, "status": "queued"}

Celery task definition

# server/src/workers/tasks/settlement.py

from server.src.workers.celery_app import celery_app
import structlog

log = structlog.get_logger()

@celery_app.task(
    bind=True,
    queue="critical",
    max_retries=3,
    default_retry_delay=60,        # 60 sek mellem retries
    acks_late=True,                # Bekræft EFTER task er udført
    reject_on_worker_lost=True,    # Requeue ved worker-crash
)
def send_settlement_email(self, settlement_id: str) -> dict:
    """
    Send årsafregning til lejer.
    Retries ved: SMTP fejl, netværksfejl, midlertidig Brevo-fejl.
    """
    try:
        # Synkron DB access i Celery task (ikke async):
        from server.src.db.sync import SyncSession
        with SyncSession() as session:
            settlement = session.get(Settlement, settlement_id)
            if not settlement:
                log.warning("settlement_not_found", id=settlement_id)
                return {"status": "skipped", "reason": "not_found"}

            if settlement.email_sent_at:
                log.info("settlement_already_sent", id=settlement_id)
                return {"status": "skipped", "reason": "already_sent"}

            # Generer PDF
            pdf_bytes = generate_settlement_pdf(settlement)

            # Upload til S3
            pdf_url = upload_to_s3(settlement_id, pdf_bytes)

            # Send email
            send_brevo_email(
                to=settlement.tenant_email,
                template="settlement_annual",
                vars={"pdf_url": pdf_url, "period": settlement.period},
            )

            # Opdatér timestamp
            settlement.email_sent_at = datetime.now(timezone.utc)
            settlement.pdf_url = pdf_url
            session.commit()

            log.info("settlement_email_sent", id=settlement_id)
            return {"status": "sent", "pdf_url": pdf_url}

    except (SMTPException, BraveAPIException) as exc:
        log.warning("settlement_email_retry", id=settlement_id, error=str(exc))
        raise self.retry(exc=exc)

    except Exception as exc:
        log.error("settlement_email_failed", id=settlement_id, error=str(exc))
        raise

Task status tracking

# server/src/billing/router.py

from celery.result import AsyncResult

@router.get("/tasks/{task_id}")
async def get_task_status(
    task_id: str,
    user: TokenPayload = Depends(get_current_user),
):
    """Poll task status fra Celery backend."""
    result = AsyncResult(task_id)

    return {
        "task_id": task_id,
        "status": result.status,
        # PENDING = ikke startet eller ukendt
        # STARTED = kører
        # SUCCESS = færdig
        # FAILURE = fejlet
        # RETRY = afventer retry
        "result": result.result if result.successful() else None,
        "error": str(result.result) if result.failed() else None,
        "ready": result.ready(),
    }

# Frontend: Poll hvert 2. sek indtil done:
# while (!result.ready) {
#   await new Promise(r => setTimeout(r, 2000));
#   result = await api.tasks.status(taskId);
# }

Celery beat schedule

# server/src/workers/celery_app.py

from celery import Celery
from celery.schedules import crontab

celery_app = Celery("mbus")
celery_app.conf.beat_schedule = {
    # Daglige jobs:
    "sync-dmi-graddage": {
        "task": "server.src.workers.tasks.dmi.sync_degree_days",
        "schedule": crontab(hour=3, minute=30),
        "options": {"queue": "default"},
    },
    "check-stale-gateways": {
        "task": "server.src.workers.tasks.alerts.check_stale_gateways",
        "schedule": crontab(minute="*/30"),  # Hvert 30. min
        "options": {"queue": "default"},
    },
    "settlement-reminder": {
        "task": "server.src.workers.tasks.settlement_reminder.send_reminders",
        "schedule": crontab(hour=10, minute=0),
        "options": {"queue": "default"},
    },
    # Månedlige jobs:
    "subscription-sync": {
        "task": "server.src.workers.tasks.subscription_sync.sync_unit_counts",
        "schedule": crontab(day_of_month=1, hour=5, minute=0),
        "options": {"queue": "default"},
    },
    "monthly-digest": {
        "task": "server.src.workers.tasks.monthly_digest.send_digests",
        "schedule": crontab(day_of_month=2, hour=6, minute=0),
        "options": {"queue": "low"},
    },
}

Sammenligning: BackgroundTasks vs. Celery

                    BackgroundTasks     Celery
─────────────────────────────────────────────────────
Kompleksitet:       Lav (ingen deps)    Høj (Redis + workers)
Persistens:         Nej (mister ved crash) Ja (Redis broker)
Retry ved fejl:     Nej                 Ja (max_retries)
Scheduling:         Nej                 Ja (beat)
Skalering:          Nej                 Ja (multiple workers)
Monitoring:         Nej                 Flower dashboard
Task tracking:      Nej                 AsyncResult
Timeout:            Nej                 Ja (soft/hard_time_limit)

Brug BackgroundTasks til:
  → Analytics/logging der kan tabe data
  → Webhook fanout der ikke er kritisk
  → Cache invalidering

Brug Celery til:
  → Email afsendelse (kritisk)
  → PDF-generering (langsom)
  → Stripe-webhooks (kritisk)
  → OTA-opdateringer (langsom + kritisk)
  → DMI graddage sync (scheduled)
  → Alle Celery beat jobs

Konklusion

FastAPI BackgroundTasks er nok til enkle, tabelige opgaver som analytics-logging. Celery er påkrævet når tasks er kritiske (email, PDF), kan fejle (externe APIs), kræver retry-logik, skal schedules (beat) eller skal skaleres til multiple workers. acks_late=True + reject_on_worker_lost=True sikrer at tasks ikke tabes ved worker-crash — essensen af mindst-én-gang-levering i Celery.

Se Celery beat guide eller Redis caching guide.