· FastAPI· BackgroundTasks· Celery· Redis· Python· async· baggrundsjobs· IoT· SaaS
FastAPI BackgroundTasks og Celery — hvornår bruger du hvad?
FastAPI BackgroundTasks vs Celery: hvornår er den simple løsning nok, hvornår kræves Celery, Celery-resultat tracking, beat-schedule og fejlhåndtering.
Af M-Bus Gateway
FastAPI har en simpel BackgroundTasks mekanisme til simple opgaver — men Celery er nødvendigt til produktionskritiske baggrundsjobs. Her er forskellen og hvornår du bruger hvad.
FastAPI BackgroundTasks — hvornår er det nok
# Godt til: Hurtige, ikke-kritiske opgaver der kan miste data ved crash
from fastapi import BackgroundTasks
@router.post("/settlements/{id}/send")
async def send_settlement(
id: UUID,
background_tasks: BackgroundTasks,
user: TokenPayload = Depends(require_role("landlord")),
session: AsyncSession = Depends(get_session),
):
settlement = await get_or_404(session, Settlement, id, user.tenant_id)
# Background task: Starter EFTER response er sendt til klient
# ❌ Problem: Hvis serveren crasher efter response men inden email sendes
# → Email sendes aldrig
# ✅ OK hvis: Email-fejl er acceptable, og retransmission sker manuelt
background_tasks.add_task(
_send_settlement_email_simple,
settlement_id=str(id),
recipient=settlement.tenant_email,
)
return {"status": "queued"}
async def _send_settlement_email_simple(settlement_id: str, recipient: str):
"""Simpel email — taber ved server-crash."""
await send_email(to=recipient, template="settlement", vars={"id": settlement_id})
Celery — hvornår er det nødvendigt
# Celery kræves når:
# 1. Task IKKE må miste data ved server-crash
# 2. Task tager lang tid (PDF-generering, OCR, DMI-kald)
# 3. Task skal retries ved fejl (ekstern API nede)
# 4. Task skal schedules (Celery beat)
# 5. Task skal kunne skaleres horisontalt (multiple workers)
# 6. Task-resultat skal trackes (status, fejlbesked)
from server.src.workers.celery_app import celery_app
@router.post("/settlements/{id}/send")
async def send_settlement(
id: UUID,
user: TokenPayload = Depends(require_role("landlord")),
session: AsyncSession = Depends(get_session),
):
settlement = await get_or_404(session, Settlement, id, user.tenant_id)
# Celery task: Gemmes i Redis broker — overlever server-crash
task = send_settlement_email.apply_async(
args=[str(id)],
queue="critical", # Høj prioritet
countdown=0,
task_id=f"settlement-email-{id}", # Idempotent task ID
retry=True,
max_retries=3,
)
# Gem task_id til status-tracking
settlement.email_task_id = task.id
await session.commit()
return {"task_id": task.id, "status": "queued"}
Celery task definition
# server/src/workers/tasks/settlement.py
from server.src.workers.celery_app import celery_app
import structlog
log = structlog.get_logger()
@celery_app.task(
bind=True,
queue="critical",
max_retries=3,
default_retry_delay=60, # 60 sek mellem retries
acks_late=True, # Bekræft EFTER task er udført
reject_on_worker_lost=True, # Requeue ved worker-crash
)
def send_settlement_email(self, settlement_id: str) -> dict:
"""
Send årsafregning til lejer.
Retries ved: SMTP fejl, netværksfejl, midlertidig Brevo-fejl.
"""
try:
# Synkron DB access i Celery task (ikke async):
from server.src.db.sync import SyncSession
with SyncSession() as session:
settlement = session.get(Settlement, settlement_id)
if not settlement:
log.warning("settlement_not_found", id=settlement_id)
return {"status": "skipped", "reason": "not_found"}
if settlement.email_sent_at:
log.info("settlement_already_sent", id=settlement_id)
return {"status": "skipped", "reason": "already_sent"}
# Generer PDF
pdf_bytes = generate_settlement_pdf(settlement)
# Upload til S3
pdf_url = upload_to_s3(settlement_id, pdf_bytes)
# Send email
send_brevo_email(
to=settlement.tenant_email,
template="settlement_annual",
vars={"pdf_url": pdf_url, "period": settlement.period},
)
# Opdatér timestamp
settlement.email_sent_at = datetime.now(timezone.utc)
settlement.pdf_url = pdf_url
session.commit()
log.info("settlement_email_sent", id=settlement_id)
return {"status": "sent", "pdf_url": pdf_url}
except (SMTPException, BraveAPIException) as exc:
log.warning("settlement_email_retry", id=settlement_id, error=str(exc))
raise self.retry(exc=exc)
except Exception as exc:
log.error("settlement_email_failed", id=settlement_id, error=str(exc))
raise
Task status tracking
# server/src/billing/router.py
from celery.result import AsyncResult
@router.get("/tasks/{task_id}")
async def get_task_status(
task_id: str,
user: TokenPayload = Depends(get_current_user),
):
"""Poll task status fra Celery backend."""
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
# PENDING = ikke startet eller ukendt
# STARTED = kører
# SUCCESS = færdig
# FAILURE = fejlet
# RETRY = afventer retry
"result": result.result if result.successful() else None,
"error": str(result.result) if result.failed() else None,
"ready": result.ready(),
}
# Frontend: Poll hvert 2. sek indtil done:
# while (!result.ready) {
# await new Promise(r => setTimeout(r, 2000));
# result = await api.tasks.status(taskId);
# }
Celery beat schedule
# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
celery_app = Celery("mbus")
celery_app.conf.beat_schedule = {
# Daglige jobs:
"sync-dmi-graddage": {
"task": "server.src.workers.tasks.dmi.sync_degree_days",
"schedule": crontab(hour=3, minute=30),
"options": {"queue": "default"},
},
"check-stale-gateways": {
"task": "server.src.workers.tasks.alerts.check_stale_gateways",
"schedule": crontab(minute="*/30"), # Hvert 30. min
"options": {"queue": "default"},
},
"settlement-reminder": {
"task": "server.src.workers.tasks.settlement_reminder.send_reminders",
"schedule": crontab(hour=10, minute=0),
"options": {"queue": "default"},
},
# Månedlige jobs:
"subscription-sync": {
"task": "server.src.workers.tasks.subscription_sync.sync_unit_counts",
"schedule": crontab(day_of_month=1, hour=5, minute=0),
"options": {"queue": "default"},
},
"monthly-digest": {
"task": "server.src.workers.tasks.monthly_digest.send_digests",
"schedule": crontab(day_of_month=2, hour=6, minute=0),
"options": {"queue": "low"},
},
}
Sammenligning: BackgroundTasks vs. Celery
BackgroundTasks Celery
─────────────────────────────────────────────────────
Kompleksitet: Lav (ingen deps) Høj (Redis + workers)
Persistens: Nej (mister ved crash) Ja (Redis broker)
Retry ved fejl: Nej Ja (max_retries)
Scheduling: Nej Ja (beat)
Skalering: Nej Ja (multiple workers)
Monitoring: Nej Flower dashboard
Task tracking: Nej AsyncResult
Timeout: Nej Ja (soft/hard_time_limit)
Brug BackgroundTasks til:
→ Analytics/logging der kan tabe data
→ Webhook fanout der ikke er kritisk
→ Cache invalidering
Brug Celery til:
→ Email afsendelse (kritisk)
→ PDF-generering (langsom)
→ Stripe-webhooks (kritisk)
→ OTA-opdateringer (langsom + kritisk)
→ DMI graddage sync (scheduled)
→ Alle Celery beat jobs
Konklusion
FastAPI BackgroundTasks er nok til enkle, tabelige opgaver som analytics-logging. Celery er påkrævet når tasks er kritiske (email, PDF), kan fejle (externe APIs), kræver retry-logik, skal schedules (beat) eller skal skaleres til multiple workers. acks_late=True + reject_on_worker_lost=True sikrer at tasks ikke tabes ved worker-crash — essensen af mindst-én-gang-levering i Celery.