M-Bus Gateway
← Tilbage til blog
· Celery· Redis· Python· scheduler· cron· async· baggrundsjobs· SaaS· Flower· beat

Celery beat periodiske opgaver — scheduler mønstre til SaaS

Celery beat til SaaS: RedisScheduler, task prioritering, idempotente tasks, exponential backoff, dead letter queue, Flower monitoring og produktion-scheduler mønstre.

Af M-Bus Gateway

M-Bus Gateway platformen kører 15+ periodiske Celery-tasks — fra daglig graddage-sync til månedlige afregnings-deadlines. Her er produktionsmønstrene.


Beat-schedule: Oversigt over alle tasks

# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery("mbus")
app.config_from_object("server.src.core.celery_config")

app.conf.beat_schedule = {
    # ─── Daglige tasks ──────────────────────────────────────────────
    "dmi-graddage-sync": {
        "task": "workers.tasks.dmi.sync_degree_days",
        "schedule": crontab(hour=2, minute=0),  # 02:00 UTC
        "options": {"queue": "low"},
    },
    "gateway-heartbeat-check": {
        "task": "workers.tasks.gateway.check_heartbeats",
        "schedule": crontab(hour="*", minute=30),  # Hvert 30. min
        "options": {"queue": "critical"},
    },
    "reading-coverage-alert": {
        "task": "workers.tasks.alerts.check_reading_coverage",
        "schedule": crontab(hour=9, minute=30),
        "options": {"queue": "default"},
    },
    "settlement-reminder": {
        "task": "workers.tasks.settlement_reminder.send_reminders",
        "schedule": crontab(hour=10, minute=0),
        "options": {"queue": "default"},
    },
    "onboarding-drip": {
        "task": "workers.tasks.onboarding.send_drip_emails",
        "schedule": crontab(hour=8, minute=30),
        "options": {"queue": "low"},
    },
    "daily-db-backup": {
        "task": "backup.daily_db_backup",
        "schedule": crontab(hour=3, minute=0),  # 03:00 UTC
        "options": {"queue": "low"},
    },

    # ─── Månedlige tasks ────────────────────────────────────────────
    "stripe-subscription-sync": {
        "task": "workers.tasks.subscription_sync.sync_all_subscriptions",
        "schedule": crontab(hour=5, minute=0, day_of_month=1),  # 1. i måneden
        "options": {"queue": "default"},
    },
    "bek563-deadline-alert": {
        "task": "workers.tasks.bek563_deadline.check_all_deadlines",
        "schedule": crontab(hour=7, minute=0, day_of_month=5),  # 5. i måneden
        "options": {"queue": "default"},
    },
    "monthly-portfolio-digest": {
        "task": "workers.tasks.monthly_digest.send_all_digests",
        "schedule": crontab(hour=6, minute=0, day_of_month=2),  # 2. i måneden
        "options": {"queue": "low"},
    },
}

Idempotente tasks: Sikker genudførelse

# server/src/workers/tasks/dmi.py
from celery import shared_task
import structlog

logger = structlog.get_logger()

@shared_task(
    name="workers.tasks.dmi.sync_degree_days",
    bind=True,
    max_retries=3,
    default_retry_delay=300,  # 5 min mellem forsøg
    acks_late=True,           # Ack først når task er færdig (ikke ved start)
    reject_on_worker_lost=True,
)
def sync_degree_days(self) -> dict:
    """
    Hent graddage fra DMI og gem i DB.
    Idempotent: ON CONFLICT DO UPDATE — sikker ved genudførelse.
    """
    try:
        from server.src.workers.tasks._dmi_impl import _sync_impl
        result = _sync_impl()
        logger.info("dmi_sync_complete", **result)
        return result
    except Exception as exc:
        logger.error("dmi_sync_failed", error=str(exc), retry=self.request.retries)
        # Exponential backoff: 5min, 10min, 20min:
        raise self.retry(exc=exc, countdown=300 * (2 ** self.request.retries))

Priority queues: Kritisk vs. lav prioritet

# server/src/core/celery_config.py
from kombu import Queue

# 3 køer med prioritet:
task_queues = (
    Queue("critical", routing_key="critical"),  # Alarmer, gateway-checks
    Queue("default", routing_key="default"),    # Email, PDF-generering
    Queue("low", routing_key="low"),            # Backup, graddage, digest
)

task_default_queue = "default"
task_default_routing_key = "default"

# Workers startes med alle 3 køer, men critical har højest prioritet:
# celery worker -Q critical,default,low --concurrency=4

# Kritisk email (alarm) via eksplicit kø:
from server.src.workers.celery_app import app

def send_alarm_email_urgent(alarm_data: dict) -> None:
    """Send alarm email med kritisk prioritet."""
    app.send_task(
        "workers.tasks.email.send_alarm_email",
        args=[alarm_data],
        queue="critical",
        priority=9,  # 0-9, 9 = højest
    )

Dead letter queue: Håndtér fejlede tasks

# Tasks der fejler efter max_retries havner i DLQ:

# celery_config.py tilføjelse:
task_routes = {
    "workers.tasks.*": {"queue": "default"},
    "workers.tasks.email.*": {"queue": "critical"},
    "backup.*": {"queue": "low"},
}

# Dead letter exchange (RabbitMQ concept — simuleret med Redis):
# Fejlede tasks logges til audit_log for manuel intervention:

@shared_task(name="workers.celery_error_handler")
def handle_task_failure(uuid, exc, traceback, einfo, *args, **kwargs):
    """
    Registreres som task_failure_handler — kaldes ved alle task-fejl.
    Logger til audit_log og sender admin-alert ved kritiske fejl.
    """
    import asyncio
    from server.src.db.models import AuditLog
    asyncio.run(_log_failure(uuid, str(exc), traceback))

app.conf.task_failure_handler = handle_task_failure

Flower: Real-time monitoring

# docker-compose.yml tilføjelse:
  flower:
    image: mher/flower:2.0
    command: >
      celery
      --broker=redis://:${REDIS_PASSWORD}@redis:6379/0
      flower
      --port=5555
      --basic_auth=${FLOWER_USER}:${FLOWER_PASSWORD}
      --url_prefix=flower
    ports:
      - "5555:5555"
    networks:
      - internal
    depends_on:
      - redis
# Programmatisk Flower API — dashboard integration:
import httpx

async def get_celery_stats() -> dict:
    """Hent Celery worker-statistik fra Flower API."""
    async with httpx.AsyncClient() as client:
        r = await client.get(
            "http://flower:5555/api/workers",
            auth=(settings.FLOWER_USER, settings.FLOWER_PASSWORD),
        )
    workers = r.json()
    return {
        "active_tasks": sum(len(w.get("active", [])) for w in workers.values()),
        "processed_today": sum(w.get("total", {}).get("workers.tasks.*", 0) for w in workers.values()),
        "workers_online": len([w for w in workers.values() if w.get("status") == "OK"]),
    }

RedisScheduler: Persistent beat-tilstand

# Standard beat gemmer schedule i lokalt fil — slettes ved deploy
# RedisScheduler gemmer i Redis — persistent på tværs af deployments

# Installation:
# pip install celery-redbeat

# celery_config.py:
beat_scheduler = "redbeat.RedisScheduler"
beat_max_loop_interval = 5  # Tjek hvert 5 sekunder
redbeat_redis_url = settings.REDIS_URL
redbeat_key_prefix = "redbeat:"

# Start beat:
# celery -A server.src.workers.celery_app beat \
#   --scheduler redbeat.RedisScheduler \
#   --loglevel=info

Konklusion

Celery beat med RedisScheduler giver persistent, deploy-resistent task-scheduling. Priority queues (critical/default/low) sikrer at alarmer ikke blokeres af tungere baggrundstasks. Idempotente tasks med ON CONFLICT + max_retries + exponential backoff håndterer midlertidige fejl. Flower giver real-time monitorering af workers og task-historik.

Se Redis Celery kø guide eller DMI graddage API guide.