· Celery· Redis· Python· SaaS· baggrundsjobs· opgavekø· Flower· FastAPI· async
Redis og Celery opgavekø til SaaS — opsætning og mønstre
Celery + Redis til SaaS baggrundsjobs: task routing til prioritetskøer, beat-scheduler, Flower monitoring, retry med exponential backoff og idempotens.
Af M-Bus Gateway
M-Bus Gateway platformen kører alle tunge operationer (PDF, email, OCR, DMI-data) som Celery-tasks med Redis som broker. Her er arkitekturen.
Opgavekø-arkitektur
FastAPI endpoint
↓
task.delay() / task.apply_async()
↓
Redis broker (liste pr. kø)
↓
Celery worker (subprocess)
↓
[PDF generering | Email | OCR | DMI-sync | Stripe]
↓
TimescaleDB / Hetzner Object Storage / Brevo / Stripe
Køer og prioriteter:
critical → Betalingskritiske (Stripe webhooks, afregning-emails)
default → Normal drift (PDF, OCR)
low → Ikke-urgent (DMI-sync, monthly digest)
beat → Scheduler-interne tasks (ingen worker)
Celery konfiguration
# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
celery_app = Celery(
"mbus",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1", # Task-resultater
include=[
"server.src.workers.tasks.pdf",
"server.src.workers.tasks.email",
"server.src.workers.tasks.ocr",
"server.src.workers.tasks.dmi",
"server.src.workers.tasks.stripe_sync",
"server.src.workers.tasks.monthly_digest",
],
)
celery_app.conf.update(
# Serialisering:
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# Routing — tasks → køer:
task_routes={
"pdf.*": {"queue": "default"},
"email.send_settlement_*": {"queue": "critical"},
"email.send_alarm_*": {"queue": "critical"},
"email.*": {"queue": "default"},
"ocr.*": {"queue": "default"},
"dmi.*": {"queue": "low"},
"stripe.*": {"queue": "critical"},
},
# Retry-adfærd:
task_acks_late=True, # ACK kun efter gennemførelse (ikke ved modtagelse)
task_reject_on_worker_lost=True, # Re-queue ved worker-crash
# Begræns parallelle tasks pr. worker:
worker_prefetch_multiplier=1,
# Timeout:
task_soft_time_limit=300, # 5 min → SoftTimeLimitExceeded (ren shutdown)
task_time_limit=360, # 6 min → hård kill
# Beat schedule:
beat_schedule={
"dmi-sync-daily": {
"task": "dmi.sync_degree_days",
"schedule": crontab(hour=5, minute=0),
},
"monthly-digest": {
"task": "email.monthly_digest",
"schedule": crontab(day_of_month=2, hour=6, minute=0),
},
"subscription-sync": {
"task": "stripe.sync_active_units",
"schedule": crontab(day_of_month=1, hour=5, minute=0),
},
"bek563-deadline-check": {
"task": "email.check_settlement_deadlines",
"schedule": crontab(day_of_month=5, hour=7, minute=0),
},
"onboarding-drip": {
"task": "email.send_onboarding_drip",
"schedule": crontab(hour=8, minute=30),
},
},
)
Idempotente tasks
# server/src/workers/tasks/pdf.py
from celery import shared_task
import structlog
logger = structlog.get_logger()
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
name="pdf.generate_settlement",
# Unik task pr. settlement_id — forhindrer dubletter ved re-queue:
task_id_format="pdf-settlement-{args[0]}",
)
def generate_settlement_pdf(self, settlement_id: str):
"""
Idempotent: Tjek altid om PDF allerede eksisterer inden generering.
"""
from server.src.db.sync_session import get_sync_session
from server.src.settlements.models import Settlement
try:
with get_sync_session() as session:
settlement = session.get(Settlement, settlement_id)
if not settlement:
return # Ikke genforsøg — settlement eksisterer ikke
if settlement.pdf_url:
logger.info("pdf_skip_exists", id=settlement_id)
return # Allerede genereret — idempotent
# ... generer og gem PDF
logger.info("pdf_generated", id=settlement_id)
except Exception as exc:
logger.warning("pdf_retry", id=settlement_id, attempt=self.request.retries)
raise self.retry(
exc=exc,
countdown=60 * (2 ** self.request.retries), # Exponential backoff
)
FastAPI: Dispatch tasks
# server/src/settlements/router.py
from fastapi import APIRouter, Depends
from server.src.workers.tasks.pdf import generate_settlement_pdf
from server.src.workers.tasks.email import send_settlement_email
router = APIRouter(prefix="/settlements", tags=["settlements"])
@router.post("/{settlement_id}/generate")
async def generate_settlement(
settlement_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(require_role("landlord", "super_admin")),
):
"""Dispatch PDF-generering og email som baggrundstasks."""
# Prioriteret kæde: PDF-generering → email
# chord: alle tasks i gruppen skal færdiggøres inden callback:
from celery import chain
workflow = chain(
generate_settlement_pdf.si(settlement_id),
send_settlement_email.si(settlement_id),
)
result = workflow.apply_async()
return {"task_id": result.id, "status": "queued"}
@router.get("/{settlement_id}/task-status/{task_id}")
async def get_task_status(task_id: str):
"""Tjek status på baggrundstask (polling fra frontend)."""
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
Docker Compose
# docker-compose.yml
services:
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
command: redis-server --appendonly yes # Persistens
worker-default:
build: .
command: celery -A server.src.workers.celery_app worker -Q default -c 4
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
worker-critical:
build: .
command: celery -A server.src.workers.celery_app worker -Q critical -c 2
# Færre workers — kritiske tasks må ikke vente
worker-low:
build: .
command: celery -A server.src.workers.celery_app worker -Q low -c 1
beat:
build: .
command: celery -A server.src.workers.celery_app beat --loglevel=info
# KUN ÉN beat-instans — ellers duplikate scheduled tasks
flower:
image: mher/flower:2.0
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
command: celery --broker=redis://redis:6379/0 flower --port=5555
depends_on:
- redis
volumes:
redis_data:
Flower: Monitoring
Flower dashboard (port 5555):
→ Kø-dybde pr. kø (backlog)
→ Task throughput (tasks/sek)
→ Worker-status (online/offline)
→ Failed tasks med traceback
→ Retry-historik
URL: http://hetzner-ip:5555 (kun via SSH tunnel i prod)
Vigtige metrics at overvåge:
→ critical-kø dybde > 10: Alarm — workers KAN ikke følge med
→ Failed tasks rate > 5%: Tjek ekstern service (Brevo, Stripe)
→ Beat-task missed: beat-worker er nede
Konklusion
Celery + Redis giver M-Bus Gateway platformen én klar regel: Intet tungt i FastAPI-tråden — alt dispatches som tasks. Prioritetskøer sikrer at betalingskritiske emails aldrig venter bag DMI-sync. Idempotens via "tjek om allerede gjort" gør det sikkert at re-køre tasks ved fejl.