Python Celery beat — avanceret opgaveplanlægning til IoT og SaaS
Celery beat avancerede mønstre: crontab scheduling, dynamiske opgaver fra database, chord/chain workflows til PDF-generering, Flower monitoring, fejlhåndtering og IoT-burst scheduling.
Af M-Bus Gateway
Celery beat er SaaS-platformens rytme — daglig DMI-sync, månedlig afregning, ugentlig gateway-health og on-demand PDF-generering. Her er de avancerede mønstre.
Beat schedule — IoT og SaaS tidsplan
# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
from server.src.config import get_settings
def create_celery_app() -> Celery:
settings = get_settings()
app = Celery(
"mbus",
broker=str(settings.redis_url),
backend=str(settings.redis_url),
include=[
"server.src.workers.tasks.dmi",
"server.src.workers.tasks.settlement",
"server.src.workers.tasks.email",
"server.src.workers.tasks.gateway_health",
"server.src.workers.tasks.drip",
"server.src.workers.tasks.stripe_sync",
"server.src.workers.tasks.onboarding",
],
)
app.conf.beat_schedule = {
# IoT-timing: DMI graddage dagligt kl. 03:00 UTC (inden gateway-data kl. 06:00)
"sync-dmi-daily": {
"task": "server.src.workers.tasks.dmi.sync_degree_days",
"schedule": crontab(hour=3, minute=0),
},
# Gateway health check: Hvert 30. minut
"gateway-health-check": {
"task": "server.src.workers.tasks.gateway_health.check_all_gateways",
"schedule": crontab(minute="*/30"),
},
# Afregnings-deadline alarm: 5. i måneden kl. 07:00 UTC
"settlement-deadline-alert": {
"task": "server.src.workers.tasks.settlement.check_settlement_deadlines",
"schedule": crontab(day_of_month=5, hour=7, minute=0),
},
# Stripe abonnement-sync: 1. i måneden kl. 05:00 UTC
"stripe-subscription-sync": {
"task": "server.src.workers.tasks.stripe_sync.sync_subscription_quantities",
"schedule": crontab(day_of_month=1, hour=5, minute=0),
},
# Månedlig portefølje-email: 2. i måneden kl. 06:00 UTC
"monthly-portfolio-digest": {
"task": "server.src.workers.tasks.email.send_monthly_digest",
"schedule": crontab(day_of_month=2, hour=6, minute=0),
},
# Onboarding drip emails: Dagligt kl. 08:30 UTC
"onboarding-drip": {
"task": "server.src.workers.tasks.drip.send_onboarding_drip",
"schedule": crontab(hour=8, minute=30),
},
# Stale gateway notifikation: Dagligt kl. 09:30 UTC
"stale-gateway-alert": {
"task": "server.src.workers.tasks.gateway_health.alert_stale_gateways",
"schedule": crontab(hour=9, minute=30),
},
}
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ["json"]
app.conf.timezone = "UTC"
app.conf.enable_utc = True
# Resultater udløber efter 1 dag
app.conf.result_expires = 86400
return app
celery_app = create_celery_app()
Chord workflow — PDF-afregning til alle lejere
# server/src/workers/tasks/settlement.py
from celery import chord, group, chain
from server.src.workers.celery_app import celery_app
import structlog
log = structlog.get_logger()
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def generate_settlement_pdf(self, settlement_line_id: str) -> dict:
"""
Generér PDF for én afregningslinje.
Del af chord: Kører parallelt for alle linjer.
"""
import asyncio
try:
result = asyncio.get_event_loop().run_until_complete(
_generate_pdf_async(settlement_line_id)
)
return {"settlement_line_id": settlement_line_id, "pdf_key": result["s3_key"]}
except Exception as exc:
log.exception("pdf_generation_failed", settlement_line_id=settlement_line_id)
raise self.retry(exc=exc)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=120)
def send_settlement_emails(self, pdf_results: list[dict], property_id: str) -> dict:
"""
Callback: Kører EFTER alle PDF-tasks er færdige (chord callback).
Sender emails med PDF-links til alle lejere.
"""
import asyncio
try:
sent = asyncio.get_event_loop().run_until_complete(
_send_emails_async(pdf_results, property_id)
)
log.info("settlement_emails_sent", property_id=property_id, count=sent)
return {"property_id": property_id, "emails_sent": sent}
except Exception as exc:
log.exception("email_sending_failed", property_id=property_id)
raise self.retry(exc=exc)
def dispatch_settlement_for_property(
property_id: str,
settlement_line_ids: list[str],
) -> None:
"""
Chord: Generér alle PDFer parallelt → send emails når alle er færdige.
Skalerbar: 100 lejligheder = 100 parallelle PDF-tasks.
"""
if not settlement_line_ids:
return
# Header: Alle PDF-tasks kører parallelt
header = group(
generate_settlement_pdf.s(line_id)
for line_id in settlement_line_ids
)
# Callback: Kører med samlede resultater
callback = send_settlement_emails.s(property_id=property_id)
# Udfør chord
chord(header)(callback)
log.info(
"settlement_chord_dispatched",
property_id=property_id,
pdf_count=len(settlement_line_ids),
)
Chain workflow — Beregning → PDF → Email
# server/src/workers/tasks/settlement.py
def dispatch_full_settlement_pipeline(
property_id: str,
annual_input_id: str,
) -> None:
"""
Chain: Beregn fordeling → genér PDFer (chord) → send emails.
Hvert trin starter kun ved succes fra forrige.
"""
pipeline = chain(
calculate_distribution.s(annual_input_id=annual_input_id),
create_settlement_lines.s(property_id=property_id),
dispatch_pdf_chord.s(),
)
pipeline.apply_async()
log.info("settlement_pipeline_dispatched", property_id=property_id)
@celery_app.task(bind=True, max_retries=2)
def calculate_distribution(self, annual_input_id: str) -> dict:
"""Trin 1: Beregn fordelingsregnskab."""
import asyncio
result = asyncio.get_event_loop().run_until_complete(
_run_distribution_engine(annual_input_id)
)
return {"annual_input_id": annual_input_id, "distribution_result_ids": result}
@celery_app.task(bind=True, max_retries=2)
def create_settlement_lines(self, prev_result: dict, property_id: str) -> list[str]:
"""Trin 2: Opret afregningslinjer fra fordelingsresultat."""
import asyncio
line_ids = asyncio.get_event_loop().run_until_complete(
_create_lines(prev_result["distribution_result_ids"], property_id)
)
return line_ids
@celery_app.task
def dispatch_pdf_chord(line_ids: list[str]) -> None:
"""Trin 3: Start PDF-chord (afslutter selvstændigt)."""
if line_ids:
property_id = line_ids[0].split(":")[0] # Convention: {property_id}:{line_id}
dispatch_settlement_for_property(property_id, line_ids)
Dynamiske tasks fra database
# server/src/workers/tasks/dynamic.py
# Celery beat kan ikke læse DB direkte — brug en "dispatcher task"
# der kører hvert minut og tjekker hvad der skal gøres
@celery_app.task
def dispatch_scheduled_reports() -> None:
"""
Kører hvert 5. minut (beat schedule).
Finder ejendomme med scheduled reports og dispatcher individuelle tasks.
Mønster: Beat → dispatcher → individuelle tasks.
"""
import asyncio
from server.src.db.engine import get_db_sync
with get_db_sync() as db:
due_reports = db.execute("""
SELECT property_id, report_type
FROM scheduled_reports
WHERE next_run_at <= NOW()
AND status = 'pending'
""").fetchall()
for report in due_reports:
generate_report.delay(
property_id=report.property_id,
report_type=report.report_type,
)
if due_reports:
log.info("dynamic_reports_dispatched", count=len(due_reports))
Flower monitoring konfiguration
# docker-compose.yml — Flower dashboard til Celery monitoring
services:
flower:
image: mher/flower:2.0.1
environment:
CELERY_BROKER_URL: redis://redis:6379/0
FLOWER_BASIC_AUTH: admin:${FLOWER_PASSWORD}
FLOWER_PORT: 5555
FLOWER_PERSISTENT: true
FLOWER_DB: /data/flower.db
volumes:
- flower_data:/data
ports:
- "5555:5555" # Bag nginx med basic auth + IP-whitelist i prod
depends_on:
- redis
Flower dashboard — hvad man overvåger:
Active tasks:
→ generate_settlement_pdf: Antal kørende PDF-tasks
→ send_settlement_emails: Email-dispatcher status
Failed tasks (alert ved > 0):
→ calculate_distribution: Kritisk fejl i fordelingsmotor
→ sync_degree_days: DMI API nede
Queue depths (alert ved > 100):
→ default: Normal arbejdsbelastning
→ priority: Hurtige tasks (gateway alarmer)
Beat scheduler:
→ Bekræft at alle planlagte tasks kører til rette tid
→ sync-dmi-daily: Kørte kl. 03:00? ✅/❌
Konklusion
Celery beat med crontab håndterer alle tidsstyrede opgaver: daglig DMI-sync (03:00 UTC), gateway health (hvert 30 min), månedlig Stripe-sync og afregnings-deadlines. chord er det rette valg til PDF-generering — alle lejligheder parallelt, email kun når alle PDFer er klar. chain sikrer rækkefølge: beregn → opret linjer → start chord. Dynamiske tasks (dispatcher-mønster) er løsningen når beat-schedule ikke kan nå DB. Flower giver realtids-monitoring af task-status og queue-dybde.
Se Python Celery best practices eller FastAPI lifespan guide.