M-Bus Gateway
← Tilbage til blog
· python· celery· celery-beat· scheduling· chord· chain· Flower· IoT· SaaS· PDF· crontab

Python Celery beat — avanceret opgaveplanlægning til IoT og SaaS

Celery beat avancerede mønstre: crontab scheduling, dynamiske opgaver fra database, chord/chain workflows til PDF-generering, Flower monitoring, fejlhåndtering og IoT-burst scheduling.

Af M-Bus Gateway

Celery beat er SaaS-platformens rytme — daglig DMI-sync, månedlig afregning, ugentlig gateway-health og on-demand PDF-generering. Her er de avancerede mønstre.


Beat schedule — IoT og SaaS tidsplan

# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
from server.src.config import get_settings


def create_celery_app() -> Celery:
    settings = get_settings()

    app = Celery(
        "mbus",
        broker=str(settings.redis_url),
        backend=str(settings.redis_url),
        include=[
            "server.src.workers.tasks.dmi",
            "server.src.workers.tasks.settlement",
            "server.src.workers.tasks.email",
            "server.src.workers.tasks.gateway_health",
            "server.src.workers.tasks.drip",
            "server.src.workers.tasks.stripe_sync",
            "server.src.workers.tasks.onboarding",
        ],
    )

    app.conf.beat_schedule = {
        # IoT-timing: DMI graddage dagligt kl. 03:00 UTC (inden gateway-data kl. 06:00)
        "sync-dmi-daily": {
            "task": "server.src.workers.tasks.dmi.sync_degree_days",
            "schedule": crontab(hour=3, minute=0),
        },
        # Gateway health check: Hvert 30. minut
        "gateway-health-check": {
            "task": "server.src.workers.tasks.gateway_health.check_all_gateways",
            "schedule": crontab(minute="*/30"),
        },
        # Afregnings-deadline alarm: 5. i måneden kl. 07:00 UTC
        "settlement-deadline-alert": {
            "task": "server.src.workers.tasks.settlement.check_settlement_deadlines",
            "schedule": crontab(day_of_month=5, hour=7, minute=0),
        },
        # Stripe abonnement-sync: 1. i måneden kl. 05:00 UTC
        "stripe-subscription-sync": {
            "task": "server.src.workers.tasks.stripe_sync.sync_subscription_quantities",
            "schedule": crontab(day_of_month=1, hour=5, minute=0),
        },
        # Månedlig portefølje-email: 2. i måneden kl. 06:00 UTC
        "monthly-portfolio-digest": {
            "task": "server.src.workers.tasks.email.send_monthly_digest",
            "schedule": crontab(day_of_month=2, hour=6, minute=0),
        },
        # Onboarding drip emails: Dagligt kl. 08:30 UTC
        "onboarding-drip": {
            "task": "server.src.workers.tasks.drip.send_onboarding_drip",
            "schedule": crontab(hour=8, minute=30),
        },
        # Stale gateway notifikation: Dagligt kl. 09:30 UTC
        "stale-gateway-alert": {
            "task": "server.src.workers.tasks.gateway_health.alert_stale_gateways",
            "schedule": crontab(hour=9, minute=30),
        },
    }

    app.conf.task_serializer = "json"
    app.conf.result_serializer = "json"
    app.conf.accept_content = ["json"]
    app.conf.timezone = "UTC"
    app.conf.enable_utc = True
    # Resultater udløber efter 1 dag
    app.conf.result_expires = 86400

    return app


celery_app = create_celery_app()

Chord workflow — PDF-afregning til alle lejere

# server/src/workers/tasks/settlement.py
from celery import chord, group, chain
from server.src.workers.celery_app import celery_app
import structlog

log = structlog.get_logger()


@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def generate_settlement_pdf(self, settlement_line_id: str) -> dict:
    """
    Generér PDF for én afregningslinje.
    Del af chord: Kører parallelt for alle linjer.
    """
    import asyncio
    try:
        result = asyncio.get_event_loop().run_until_complete(
            _generate_pdf_async(settlement_line_id)
        )
        return {"settlement_line_id": settlement_line_id, "pdf_key": result["s3_key"]}
    except Exception as exc:
        log.exception("pdf_generation_failed", settlement_line_id=settlement_line_id)
        raise self.retry(exc=exc)


@celery_app.task(bind=True, max_retries=3, default_retry_delay=120)
def send_settlement_emails(self, pdf_results: list[dict], property_id: str) -> dict:
    """
    Callback: Kører EFTER alle PDF-tasks er færdige (chord callback).
    Sender emails med PDF-links til alle lejere.
    """
    import asyncio
    try:
        sent = asyncio.get_event_loop().run_until_complete(
            _send_emails_async(pdf_results, property_id)
        )
        log.info("settlement_emails_sent", property_id=property_id, count=sent)
        return {"property_id": property_id, "emails_sent": sent}
    except Exception as exc:
        log.exception("email_sending_failed", property_id=property_id)
        raise self.retry(exc=exc)


def dispatch_settlement_for_property(
    property_id: str,
    settlement_line_ids: list[str],
) -> None:
    """
    Chord: Generér alle PDFer parallelt → send emails når alle er færdige.
    Skalerbar: 100 lejligheder = 100 parallelle PDF-tasks.
    """
    if not settlement_line_ids:
        return

    # Header: Alle PDF-tasks kører parallelt
    header = group(
        generate_settlement_pdf.s(line_id)
        for line_id in settlement_line_ids
    )

    # Callback: Kører med samlede resultater
    callback = send_settlement_emails.s(property_id=property_id)

    # Udfør chord
    chord(header)(callback)
    log.info(
        "settlement_chord_dispatched",
        property_id=property_id,
        pdf_count=len(settlement_line_ids),
    )

Chain workflow — Beregning → PDF → Email

# server/src/workers/tasks/settlement.py

def dispatch_full_settlement_pipeline(
    property_id: str,
    annual_input_id: str,
) -> None:
    """
    Chain: Beregn fordeling → genér PDFer (chord) → send emails.
    Hvert trin starter kun ved succes fra forrige.
    """
    pipeline = chain(
        calculate_distribution.s(annual_input_id=annual_input_id),
        create_settlement_lines.s(property_id=property_id),
        dispatch_pdf_chord.s(),
    )
    pipeline.apply_async()
    log.info("settlement_pipeline_dispatched", property_id=property_id)


@celery_app.task(bind=True, max_retries=2)
def calculate_distribution(self, annual_input_id: str) -> dict:
    """Trin 1: Beregn fordelingsregnskab."""
    import asyncio
    result = asyncio.get_event_loop().run_until_complete(
        _run_distribution_engine(annual_input_id)
    )
    return {"annual_input_id": annual_input_id, "distribution_result_ids": result}


@celery_app.task(bind=True, max_retries=2)
def create_settlement_lines(self, prev_result: dict, property_id: str) -> list[str]:
    """Trin 2: Opret afregningslinjer fra fordelingsresultat."""
    import asyncio
    line_ids = asyncio.get_event_loop().run_until_complete(
        _create_lines(prev_result["distribution_result_ids"], property_id)
    )
    return line_ids


@celery_app.task
def dispatch_pdf_chord(line_ids: list[str]) -> None:
    """Trin 3: Start PDF-chord (afslutter selvstændigt)."""
    if line_ids:
        property_id = line_ids[0].split(":")[0]    # Convention: {property_id}:{line_id}
        dispatch_settlement_for_property(property_id, line_ids)

Dynamiske tasks fra database

# server/src/workers/tasks/dynamic.py
# Celery beat kan ikke læse DB direkte — brug en "dispatcher task"
# der kører hvert minut og tjekker hvad der skal gøres

@celery_app.task
def dispatch_scheduled_reports() -> None:
    """
    Kører hvert 5. minut (beat schedule).
    Finder ejendomme med scheduled reports og dispatcher individuelle tasks.
    Mønster: Beat → dispatcher → individuelle tasks.
    """
    import asyncio
    from server.src.db.engine import get_db_sync

    with get_db_sync() as db:
        due_reports = db.execute("""
            SELECT property_id, report_type
            FROM scheduled_reports
            WHERE next_run_at <= NOW()
                AND status = 'pending'
        """).fetchall()

    for report in due_reports:
        generate_report.delay(
            property_id=report.property_id,
            report_type=report.report_type,
        )

    if due_reports:
        log.info("dynamic_reports_dispatched", count=len(due_reports))

Flower monitoring konfiguration

# docker-compose.yml — Flower dashboard til Celery monitoring
services:
  flower:
    image: mher/flower:2.0.1
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      FLOWER_BASIC_AUTH: admin:${FLOWER_PASSWORD}
      FLOWER_PORT: 5555
      FLOWER_PERSISTENT: true
      FLOWER_DB: /data/flower.db
    volumes:
      - flower_data:/data
    ports:
      - "5555:5555"    # Bag nginx med basic auth + IP-whitelist i prod
    depends_on:
      - redis
Flower dashboard — hvad man overvåger:

Active tasks:
  → generate_settlement_pdf: Antal kørende PDF-tasks
  → send_settlement_emails: Email-dispatcher status

Failed tasks (alert ved > 0):
  → calculate_distribution: Kritisk fejl i fordelingsmotor
  → sync_degree_days: DMI API nede

Queue depths (alert ved > 100):
  → default: Normal arbejdsbelastning
  → priority: Hurtige tasks (gateway alarmer)

Beat scheduler:
  → Bekræft at alle planlagte tasks kører til rette tid
  → sync-dmi-daily: Kørte kl. 03:00? ✅/❌

Konklusion

Celery beat med crontab håndterer alle tidsstyrede opgaver: daglig DMI-sync (03:00 UTC), gateway health (hvert 30 min), månedlig Stripe-sync og afregnings-deadlines. chord er det rette valg til PDF-generering — alle lejligheder parallelt, email kun når alle PDFer er klar. chain sikrer rækkefølge: beregn → opret linjer → start chord. Dynamiske tasks (dispatcher-mønster) er løsningen når beat-schedule ikke kan nå DB. Flower giver realtids-monitoring af task-status og queue-dybde.

Se Python Celery best practices eller FastAPI lifespan guide.