· Celery· Python· async· workflow· canvas· chain· chord· baggrundsjobs
Celery canvas: chain, chord og group til komplekse workflows
Celery canvas primitiver i Python: chain, chord, group og starargs til komplekse asynkrone workflows — PDF-generering, email-udsendelse og parallel databehandling.
Af M-Bus Gateway
Celery canvas primitiver gør det muligt at komponere komplekse workflows af simple tasks. Her er de mønstre der bruges i produktions-SaaS.
De fire canvas primitiver
from celery import chain, chord, group, signature
# chain: sekventielt — output fra task N er input til task N+1
result = chain(
fetch_data.s(property_id),
process_readings.s(),
generate_pdf.s(),
send_email.s(recipient="lejer@example.com"),
)()
# group: parallelt — alle tasks starter simultaneously
result = group(
process_unit.s(unit_id)
for unit_id in unit_ids
)()
# chord: parallelt med callback — group + én afsluttende task
result = chord(
group(generate_line.s(occ_id) for occ_id in occ_ids),
finalize_settlement.s(settlement_id),
)()
# starargs / signature
sig = send_email.s(recipient="test@example.com")
sig.delay() # asynkron
sig.apply() # synkron (test)
sig.apply_async(countdown=60) # forsinket
Praktisk eksempel: Årsafregning workflow
# server/src/workers/tasks/settlement.py
from celery import chain, chord, group
from server.src.celery_app import celery_app
@celery_app.task(bind=True, acks_late=True, max_retries=3)
def generate_settlement_workflow(self, property_id: str, period_end: str):
"""
Komplet afregningsworkflow:
1. Hent og validér data (sekventielt)
2. Generer PDF pr. lejer (parallelt)
3. Send alle emails + mark afsluttet (chord callback)
"""
try:
workflow = chain(
validate_annual_input.s(property_id, period_end),
calculate_distributions.s(),
chord(
# Parallel: generer PDF for hver lejer
group(
generate_tenant_pdf.s(occ_id)
for occ_id in self.request.get("occupancy_ids", [])
),
# Callback: send emails + opdatér settlement
finalize_and_send_all.s(property_id),
),
)
workflow.delay()
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
@celery_app.task(bind=True, acks_late=True)
def validate_annual_input(self, property_id: str, period_end: str):
"""Hent og validér årsdata — returnerer dict til næste task."""
from server.src.db import get_sync_session
from server.src.annual_input.service import get_validated_input
with get_sync_session() as session:
validated = get_validated_input(session, property_id, period_end)
if not validated:
raise ValueError(f"Ingen valideret annual_input for {property_id}")
return {
"property_id": property_id,
"period_end": period_end,
"total_kwh": validated.total_kwh,
"total_cost": float(validated.total_cost),
"occupancy_ids": [str(o.id) for o in validated.occupancies],
}
@celery_app.task(bind=True, acks_late=True)
def calculate_distributions(self, annual_data: dict):
"""Beregn BEK 563 fordeling — input er output fra validate_annual_input."""
from server.src.settlements.engine import run_distribution
distributions = run_distribution(
property_id=annual_data["property_id"],
total_cost=annual_data["total_cost"],
occupancy_ids=annual_data["occupancy_ids"],
)
return {**annual_data, "distributions": distributions, "occupancy_ids": annual_data["occupancy_ids"]}
@celery_app.task(bind=True, acks_late=True, max_retries=5)
def generate_tenant_pdf(self, distributions_data: dict, occupancy_id: str):
"""Generer PDF for én lejer — kører parallelt for alle lejere."""
try:
from server.src.settlements.pdf import render_settlement_pdf
from server.src.storage import upload_pdf
pdf_bytes = render_settlement_pdf(
occupancy_id=occupancy_id,
distributions=distributions_data["distributions"],
)
s3_key = f"settlements/{occupancy_id}/afregning.pdf"
upload_pdf(s3_key, pdf_bytes)
return {"occupancy_id": occupancy_id, "s3_key": s3_key, "success": True}
except Exception as exc:
raise self.retry(exc=exc, countdown=30 * (self.request.retries + 1))
@celery_app.task(bind=True, acks_late=True)
def finalize_and_send_all(self, pdf_results: list[dict], property_id: str):
"""
Chord callback — modtager liste af resultater fra generate_tenant_pdf.
Sender emails og markerer settlement som sendt.
"""
from server.src.workers.tasks.email import send_settlement_email
failed = [r for r in pdf_results if not r.get("success")]
if failed:
# Log fejl men fortsæt med de vellykkede
for f in failed:
self.app.log.get_default_logger().error(
"PDF generation failed", occupancy_id=f.get("occupancy_id")
)
# Send emails for succesfulde PDFs
email_tasks = group(
send_settlement_email.s(r["occupancy_id"], r["s3_key"])
for r in pdf_results if r.get("success")
)
email_tasks.delay()
return {"property_id": property_id, "sent": len([r for r in pdf_results if r.get("success")])}
Error handling i chord
# Chord fejl: hvis én task i group fejler, fejler chord-callback ikke automatisk
# Brug chord's error callback:
from celery import chord
result = chord(
group(generate_tenant_pdf.s(occ_id) for occ_id in occ_ids),
finalize_and_send_all.s(property_id),
).on_error(
handle_settlement_error.s(property_id)
)()
@celery_app.task
def handle_settlement_error(request, exc, traceback, property_id: str):
"""Kald ved chord-fejl — notificér admin og markér settlement som fejlet."""
import structlog
log = structlog.get_logger()
log.error("Settlement workflow failed", property_id=property_id, exc=str(exc))
# Send alarm-email til admin
send_admin_alert.delay(
subject=f"Settlement fejlet: {property_id}",
body=f"Fejl: {exc}\n\nTraceback:\n{traceback}",
)
Retry-strategi med eksponentiel backoff
# server/src/workers/tasks/base.py
from celery import Task
import structlog
log = structlog.get_logger()
class RetryableTask(Task):
"""Base task med automatisk eksponentiel backoff."""
abstract = True
acks_late = True
max_retries = 5
def on_failure(self, exc, task_id, args, kwargs, einfo):
log.error("Task permanent failure",
task=self.name, task_id=task_id, exc=str(exc))
def on_retry(self, exc, task_id, args, kwargs, einfo):
log.warning("Task retry",
task=self.name, task_id=task_id,
attempt=self.request.retries, exc=str(exc))
def retry_with_backoff(self, exc):
countdown = 2 ** self.request.retries * 30 # 30s, 60s, 120s, 240s, 480s
raise self.retry(exc=exc, countdown=countdown)
@celery_app.task(bind=True, base=RetryableTask)
def fetch_dmi_degree_days(self, zip_code: str, date: str):
try:
return _fetch_dmi_api(zip_code, date)
except requests.HTTPError as exc:
if exc.response.status_code == 429: # Rate limited
self.retry_with_backoff(exc)
raise # Permanent fejl ved andre HTTP-fejl
Priority queues til settlement
# server/src/celery_app.py
CELERY_TASK_ROUTES = {
"workers.tasks.settlement.*": {"queue": "settlement"},
"workers.tasks.email.*": {"queue": "email"},
"workers.tasks.pdf.*": {"queue": "pdf"},
"workers.tasks.dmi.*": {"queue": "low_priority"},
}
# Start workers med specifik queue:
# celery -A server.src.celery_app worker -Q settlement,pdf -c 4
# celery -A server.src.celery_app worker -Q email -c 8
# celery -A server.src.celery_app worker -Q low_priority -c 1
# I task: angiv prioritet (0=højest, 9=lavest med RabbitMQ)
generate_settlement_workflow.apply_async(
args=[property_id, period_end],
priority=0, # Kritisk — kør straks
queue="settlement",
)
Test af canvas workflows
# server/tests/test_settlement_workflow.py
import pytest
from unittest.mock import patch, MagicMock
from celery import chain
def test_validate_annual_input_returns_correct_structure(mock_session):
with patch("server.src.annual_input.service.get_validated_input") as mock_get:
mock_get.return_value = MagicMock(
total_kwh=50000,
total_cost=25000.0,
occupancies=[MagicMock(id="occ-1"), MagicMock(id="occ-2")],
)
result = validate_annual_input.apply(
args=["prop-1", "2026-05-31"]
).get()
assert result["total_kwh"] == 50000
assert "occ-1" in result["occupancy_ids"]
def test_chord_collects_all_pdf_results():
pdf_results = [
{"occupancy_id": "occ-1", "s3_key": "settlements/occ-1/afregning.pdf", "success": True},
{"occupancy_id": "occ-2", "s3_key": "settlements/occ-2/afregning.pdf", "success": True},
]
with patch("server.src.workers.tasks.email.send_settlement_email") as mock_send:
mock_send.delay = MagicMock()
result = finalize_and_send_all.apply(
args=[pdf_results, "prop-1"]
).get()
assert result["sent"] == 2
Konklusion
Celery canvas giver strukturerede workflows til komplekse asynkrone operationer. chain håndterer afhængige trin, chord paralleliserer med garanteret callback, og group distribuerer uafhængige tasks. Eksponentiel backoff og priority queues gør produktions-workloads robuste over for midlertidige fejl.