M-Bus Gateway
← Tilbage til blog
· Celery· Python· async· workflow· canvas· chain· chord· baggrundsjobs

Celery canvas: chain, chord og group til komplekse workflows

Celery canvas primitiver i Python: chain, chord, group og starargs til komplekse asynkrone workflows — PDF-generering, email-udsendelse og parallel databehandling.

Af M-Bus Gateway

Celery canvas primitiver gør det muligt at komponere komplekse workflows af simple tasks. Her er de mønstre der bruges i produktions-SaaS.


De fire canvas primitiver

from celery import chain, chord, group, signature

# chain: sekventielt — output fra task N er input til task N+1
result = chain(
    fetch_data.s(property_id),
    process_readings.s(),
    generate_pdf.s(),
    send_email.s(recipient="lejer@example.com"),
)()

# group: parallelt — alle tasks starter simultaneously
result = group(
    process_unit.s(unit_id)
    for unit_id in unit_ids
)()

# chord: parallelt med callback — group + én afsluttende task
result = chord(
    group(generate_line.s(occ_id) for occ_id in occ_ids),
    finalize_settlement.s(settlement_id),
)()

# starargs / signature
sig = send_email.s(recipient="test@example.com")
sig.delay()          # asynkron
sig.apply()          # synkron (test)
sig.apply_async(countdown=60)  # forsinket

Praktisk eksempel: Årsafregning workflow

# server/src/workers/tasks/settlement.py

from celery import chain, chord, group
from server.src.celery_app import celery_app

@celery_app.task(bind=True, acks_late=True, max_retries=3)
def generate_settlement_workflow(self, property_id: str, period_end: str):
    """
    Komplet afregningsworkflow:
    1. Hent og validér data (sekventielt)
    2. Generer PDF pr. lejer (parallelt)
    3. Send alle emails + mark afsluttet (chord callback)
    """
    try:
        workflow = chain(
            validate_annual_input.s(property_id, period_end),
            calculate_distributions.s(),
            chord(
                # Parallel: generer PDF for hver lejer
                group(
                    generate_tenant_pdf.s(occ_id)
                    for occ_id in self.request.get("occupancy_ids", [])
                ),
                # Callback: send emails + opdatér settlement
                finalize_and_send_all.s(property_id),
            ),
        )
        workflow.delay()
    except Exception as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)


@celery_app.task(bind=True, acks_late=True)
def validate_annual_input(self, property_id: str, period_end: str):
    """Hent og validér årsdata — returnerer dict til næste task."""
    from server.src.db import get_sync_session
    from server.src.annual_input.service import get_validated_input

    with get_sync_session() as session:
        validated = get_validated_input(session, property_id, period_end)
        if not validated:
            raise ValueError(f"Ingen valideret annual_input for {property_id}")
        return {
            "property_id": property_id,
            "period_end": period_end,
            "total_kwh": validated.total_kwh,
            "total_cost": float(validated.total_cost),
            "occupancy_ids": [str(o.id) for o in validated.occupancies],
        }


@celery_app.task(bind=True, acks_late=True)
def calculate_distributions(self, annual_data: dict):
    """Beregn BEK 563 fordeling — input er output fra validate_annual_input."""
    from server.src.settlements.engine import run_distribution

    distributions = run_distribution(
        property_id=annual_data["property_id"],
        total_cost=annual_data["total_cost"],
        occupancy_ids=annual_data["occupancy_ids"],
    )
    return {**annual_data, "distributions": distributions, "occupancy_ids": annual_data["occupancy_ids"]}


@celery_app.task(bind=True, acks_late=True, max_retries=5)
def generate_tenant_pdf(self, distributions_data: dict, occupancy_id: str):
    """Generer PDF for én lejer — kører parallelt for alle lejere."""
    try:
        from server.src.settlements.pdf import render_settlement_pdf
        from server.src.storage import upload_pdf

        pdf_bytes = render_settlement_pdf(
            occupancy_id=occupancy_id,
            distributions=distributions_data["distributions"],
        )
        s3_key = f"settlements/{occupancy_id}/afregning.pdf"
        upload_pdf(s3_key, pdf_bytes)
        return {"occupancy_id": occupancy_id, "s3_key": s3_key, "success": True}
    except Exception as exc:
        raise self.retry(exc=exc, countdown=30 * (self.request.retries + 1))


@celery_app.task(bind=True, acks_late=True)
def finalize_and_send_all(self, pdf_results: list[dict], property_id: str):
    """
    Chord callback — modtager liste af resultater fra generate_tenant_pdf.
    Sender emails og markerer settlement som sendt.
    """
    from server.src.workers.tasks.email import send_settlement_email

    failed = [r for r in pdf_results if not r.get("success")]
    if failed:
        # Log fejl men fortsæt med de vellykkede
        for f in failed:
            self.app.log.get_default_logger().error(
                "PDF generation failed", occupancy_id=f.get("occupancy_id")
            )

    # Send emails for succesfulde PDFs
    email_tasks = group(
        send_settlement_email.s(r["occupancy_id"], r["s3_key"])
        for r in pdf_results if r.get("success")
    )
    email_tasks.delay()

    return {"property_id": property_id, "sent": len([r for r in pdf_results if r.get("success")])}

Error handling i chord

# Chord fejl: hvis én task i group fejler, fejler chord-callback ikke automatisk
# Brug chord's error callback:

from celery import chord

result = chord(
    group(generate_tenant_pdf.s(occ_id) for occ_id in occ_ids),
    finalize_and_send_all.s(property_id),
).on_error(
    handle_settlement_error.s(property_id)
)()

@celery_app.task
def handle_settlement_error(request, exc, traceback, property_id: str):
    """Kald ved chord-fejl — notificér admin og markér settlement som fejlet."""
    import structlog
    log = structlog.get_logger()
    log.error("Settlement workflow failed", property_id=property_id, exc=str(exc))
    # Send alarm-email til admin
    send_admin_alert.delay(
        subject=f"Settlement fejlet: {property_id}",
        body=f"Fejl: {exc}\n\nTraceback:\n{traceback}",
    )

Retry-strategi med eksponentiel backoff

# server/src/workers/tasks/base.py

from celery import Task
import structlog

log = structlog.get_logger()

class RetryableTask(Task):
    """Base task med automatisk eksponentiel backoff."""
    abstract = True
    acks_late = True
    max_retries = 5

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        log.error("Task permanent failure",
                  task=self.name, task_id=task_id, exc=str(exc))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        log.warning("Task retry",
                    task=self.name, task_id=task_id,
                    attempt=self.request.retries, exc=str(exc))

    def retry_with_backoff(self, exc):
        countdown = 2 ** self.request.retries * 30  # 30s, 60s, 120s, 240s, 480s
        raise self.retry(exc=exc, countdown=countdown)


@celery_app.task(bind=True, base=RetryableTask)
def fetch_dmi_degree_days(self, zip_code: str, date: str):
    try:
        return _fetch_dmi_api(zip_code, date)
    except requests.HTTPError as exc:
        if exc.response.status_code == 429:  # Rate limited
            self.retry_with_backoff(exc)
        raise  # Permanent fejl ved andre HTTP-fejl

Priority queues til settlement

# server/src/celery_app.py

CELERY_TASK_ROUTES = {
    "workers.tasks.settlement.*": {"queue": "settlement"},
    "workers.tasks.email.*":      {"queue": "email"},
    "workers.tasks.pdf.*":        {"queue": "pdf"},
    "workers.tasks.dmi.*":        {"queue": "low_priority"},
}

# Start workers med specifik queue:
# celery -A server.src.celery_app worker -Q settlement,pdf -c 4
# celery -A server.src.celery_app worker -Q email -c 8
# celery -A server.src.celery_app worker -Q low_priority -c 1

# I task: angiv prioritet (0=højest, 9=lavest med RabbitMQ)
generate_settlement_workflow.apply_async(
    args=[property_id, period_end],
    priority=0,  # Kritisk — kør straks
    queue="settlement",
)

Test af canvas workflows

# server/tests/test_settlement_workflow.py

import pytest
from unittest.mock import patch, MagicMock
from celery import chain

def test_validate_annual_input_returns_correct_structure(mock_session):
    with patch("server.src.annual_input.service.get_validated_input") as mock_get:
        mock_get.return_value = MagicMock(
            total_kwh=50000,
            total_cost=25000.0,
            occupancies=[MagicMock(id="occ-1"), MagicMock(id="occ-2")],
        )
        result = validate_annual_input.apply(
            args=["prop-1", "2026-05-31"]
        ).get()
        assert result["total_kwh"] == 50000
        assert "occ-1" in result["occupancy_ids"]

def test_chord_collects_all_pdf_results():
    pdf_results = [
        {"occupancy_id": "occ-1", "s3_key": "settlements/occ-1/afregning.pdf", "success": True},
        {"occupancy_id": "occ-2", "s3_key": "settlements/occ-2/afregning.pdf", "success": True},
    ]
    with patch("server.src.workers.tasks.email.send_settlement_email") as mock_send:
        mock_send.delay = MagicMock()
        result = finalize_and_send_all.apply(
            args=[pdf_results, "prop-1"]
        ).get()
        assert result["sent"] == 2

Konklusion

Celery canvas giver strukturerede workflows til komplekse asynkrone operationer. chain håndterer afhængige trin, chord paralleliserer med garanteret callback, og group distribuerer uafhængige tasks. Eksponentiel backoff og priority queues gør produktions-workloads robuste over for midlertidige fejl.

Se FastAPI background tasks guide eller Celery beat guide.