M-Bus Gateway
← Tilbage til blog
· Celery· Redis· Python· SaaS· baggrundsjobs· opgavekø· Flower· FastAPI· async

Redis og Celery opgavekø til SaaS — opsætning og mønstre

Celery + Redis til SaaS baggrundsjobs: task routing til prioritetskøer, beat-scheduler, Flower monitoring, retry med exponential backoff og idempotens.

Af M-Bus Gateway

M-Bus Gateway platformen kører alle tunge operationer (PDF, email, OCR, DMI-data) som Celery-tasks med Redis som broker. Her er arkitekturen.


Opgavekø-arkitektur

FastAPI endpoint
      ↓
  task.delay() / task.apply_async()
      ↓
Redis broker (liste pr. kø)
      ↓
Celery worker (subprocess)
      ↓
[PDF generering | Email | OCR | DMI-sync | Stripe]
      ↓
TimescaleDB / Hetzner Object Storage / Brevo / Stripe
Køer og prioriteter:

critical   → Betalingskritiske (Stripe webhooks, afregning-emails)
default    → Normal drift (PDF, OCR)
low        → Ikke-urgent (DMI-sync, monthly digest)
beat       → Scheduler-interne tasks (ingen worker)

Celery konfiguration

# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab

celery_app = Celery(
    "mbus",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",  # Task-resultater
    include=[
        "server.src.workers.tasks.pdf",
        "server.src.workers.tasks.email",
        "server.src.workers.tasks.ocr",
        "server.src.workers.tasks.dmi",
        "server.src.workers.tasks.stripe_sync",
        "server.src.workers.tasks.monthly_digest",
    ],
)

celery_app.conf.update(
    # Serialisering:
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],

    # Routing — tasks → køer:
    task_routes={
        "pdf.*": {"queue": "default"},
        "email.send_settlement_*": {"queue": "critical"},
        "email.send_alarm_*": {"queue": "critical"},
        "email.*": {"queue": "default"},
        "ocr.*": {"queue": "default"},
        "dmi.*": {"queue": "low"},
        "stripe.*": {"queue": "critical"},
    },

    # Retry-adfærd:
    task_acks_late=True,            # ACK kun efter gennemførelse (ikke ved modtagelse)
    task_reject_on_worker_lost=True,  # Re-queue ved worker-crash

    # Begræns parallelle tasks pr. worker:
    worker_prefetch_multiplier=1,

    # Timeout:
    task_soft_time_limit=300,   # 5 min → SoftTimeLimitExceeded (ren shutdown)
    task_time_limit=360,        # 6 min → hård kill

    # Beat schedule:
    beat_schedule={
        "dmi-sync-daily": {
            "task": "dmi.sync_degree_days",
            "schedule": crontab(hour=5, minute=0),
        },
        "monthly-digest": {
            "task": "email.monthly_digest",
            "schedule": crontab(day_of_month=2, hour=6, minute=0),
        },
        "subscription-sync": {
            "task": "stripe.sync_active_units",
            "schedule": crontab(day_of_month=1, hour=5, minute=0),
        },
        "bek563-deadline-check": {
            "task": "email.check_settlement_deadlines",
            "schedule": crontab(day_of_month=5, hour=7, minute=0),
        },
        "onboarding-drip": {
            "task": "email.send_onboarding_drip",
            "schedule": crontab(hour=8, minute=30),
        },
    },
)

Idempotente tasks

# server/src/workers/tasks/pdf.py
from celery import shared_task
import structlog

logger = structlog.get_logger()

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    name="pdf.generate_settlement",
    # Unik task pr. settlement_id — forhindrer dubletter ved re-queue:
    task_id_format="pdf-settlement-{args[0]}",
)
def generate_settlement_pdf(self, settlement_id: str):
    """
    Idempotent: Tjek altid om PDF allerede eksisterer inden generering.
    """
    from server.src.db.sync_session import get_sync_session
    from server.src.settlements.models import Settlement

    try:
        with get_sync_session() as session:
            settlement = session.get(Settlement, settlement_id)
            if not settlement:
                return  # Ikke genforsøg — settlement eksisterer ikke

            if settlement.pdf_url:
                logger.info("pdf_skip_exists", id=settlement_id)
                return  # Allerede genereret — idempotent

            # ... generer og gem PDF
            logger.info("pdf_generated", id=settlement_id)

    except Exception as exc:
        logger.warning("pdf_retry", id=settlement_id, attempt=self.request.retries)
        raise self.retry(
            exc=exc,
            countdown=60 * (2 ** self.request.retries),  # Exponential backoff
        )

FastAPI: Dispatch tasks

# server/src/settlements/router.py
from fastapi import APIRouter, Depends
from server.src.workers.tasks.pdf import generate_settlement_pdf
from server.src.workers.tasks.email import send_settlement_email

router = APIRouter(prefix="/settlements", tags=["settlements"])

@router.post("/{settlement_id}/generate")
async def generate_settlement(
    settlement_id: str,
    session: AsyncSession = Depends(get_session),
    user: User = Depends(require_role("landlord", "super_admin")),
):
    """Dispatch PDF-generering og email som baggrundstasks."""

    # Prioriteret kæde: PDF-generering → email
    # chord: alle tasks i gruppen skal færdiggøres inden callback:
    from celery import chain
    workflow = chain(
        generate_settlement_pdf.si(settlement_id),
        send_settlement_email.si(settlement_id),
    )
    result = workflow.apply_async()

    return {"task_id": result.id, "status": "queued"}


@router.get("/{settlement_id}/task-status/{task_id}")
async def get_task_status(task_id: str):
    """Tjek status på baggrundstask (polling fra frontend)."""
    from celery.result import AsyncResult
    result = AsyncResult(task_id, app=celery_app)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
    }

Docker Compose

# docker-compose.yml
services:
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes  # Persistens

  worker-default:
    build: .
    command: celery -A server.src.workers.celery_app worker -Q default -c 4
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis

  worker-critical:
    build: .
    command: celery -A server.src.workers.celery_app worker -Q critical -c 2
    # Færre workers — kritiske tasks må ikke vente

  worker-low:
    build: .
    command: celery -A server.src.workers.celery_app worker -Q low -c 1

  beat:
    build: .
    command: celery -A server.src.workers.celery_app beat --loglevel=info
    # KUN ÉN beat-instans — ellers duplikate scheduled tasks

  flower:
    image: mher/flower:2.0
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    command: celery --broker=redis://redis:6379/0 flower --port=5555
    depends_on:
      - redis

volumes:
  redis_data:

Flower: Monitoring

Flower dashboard (port 5555):
  → Kø-dybde pr. kø (backlog)
  → Task throughput (tasks/sek)
  → Worker-status (online/offline)
  → Failed tasks med traceback
  → Retry-historik

URL: http://hetzner-ip:5555 (kun via SSH tunnel i prod)

Vigtige metrics at overvåge:
  → critical-kø dybde > 10: Alarm — workers KAN ikke følge med
  → Failed tasks rate > 5%: Tjek ekstern service (Brevo, Stripe)
  → Beat-task missed: beat-worker er nede

Konklusion

Celery + Redis giver M-Bus Gateway platformen én klar regel: Intet tungt i FastAPI-tråden — alt dispatches som tasks. Prioritetskøer sikrer at betalingskritiske emails aldrig venter bag DMI-sync. Idempotens via "tjek om allerede gjort" gør det sikkert at re-køre tasks ved fejl.

Se Celery beat guide eller WeasyPrint PDF guide.