· Celery· Redis· Python· scheduler· cron· async· baggrundsjobs· SaaS· Flower· beat
Celery beat periodiske opgaver — scheduler mønstre til SaaS
Celery beat til SaaS: RedisScheduler, task prioritering, idempotente tasks, exponential backoff, dead letter queue, Flower monitoring og produktion-scheduler mønstre.
Af M-Bus Gateway
M-Bus Gateway platformen kører 15+ periodiske Celery-tasks — fra daglig graddage-sync til månedlige afregnings-deadlines. Her er produktionsmønstrene.
Beat-schedule: Oversigt over alle tasks
# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
app = Celery("mbus")
app.config_from_object("server.src.core.celery_config")
app.conf.beat_schedule = {
# ─── Daglige tasks ──────────────────────────────────────────────
"dmi-graddage-sync": {
"task": "workers.tasks.dmi.sync_degree_days",
"schedule": crontab(hour=2, minute=0), # 02:00 UTC
"options": {"queue": "low"},
},
"gateway-heartbeat-check": {
"task": "workers.tasks.gateway.check_heartbeats",
"schedule": crontab(hour="*", minute=30), # Hvert 30. min
"options": {"queue": "critical"},
},
"reading-coverage-alert": {
"task": "workers.tasks.alerts.check_reading_coverage",
"schedule": crontab(hour=9, minute=30),
"options": {"queue": "default"},
},
"settlement-reminder": {
"task": "workers.tasks.settlement_reminder.send_reminders",
"schedule": crontab(hour=10, minute=0),
"options": {"queue": "default"},
},
"onboarding-drip": {
"task": "workers.tasks.onboarding.send_drip_emails",
"schedule": crontab(hour=8, minute=30),
"options": {"queue": "low"},
},
"daily-db-backup": {
"task": "backup.daily_db_backup",
"schedule": crontab(hour=3, minute=0), # 03:00 UTC
"options": {"queue": "low"},
},
# ─── Månedlige tasks ────────────────────────────────────────────
"stripe-subscription-sync": {
"task": "workers.tasks.subscription_sync.sync_all_subscriptions",
"schedule": crontab(hour=5, minute=0, day_of_month=1), # 1. i måneden
"options": {"queue": "default"},
},
"bek563-deadline-alert": {
"task": "workers.tasks.bek563_deadline.check_all_deadlines",
"schedule": crontab(hour=7, minute=0, day_of_month=5), # 5. i måneden
"options": {"queue": "default"},
},
"monthly-portfolio-digest": {
"task": "workers.tasks.monthly_digest.send_all_digests",
"schedule": crontab(hour=6, minute=0, day_of_month=2), # 2. i måneden
"options": {"queue": "low"},
},
}
Idempotente tasks: Sikker genudførelse
# server/src/workers/tasks/dmi.py
from celery import shared_task
import structlog
logger = structlog.get_logger()
@shared_task(
name="workers.tasks.dmi.sync_degree_days",
bind=True,
max_retries=3,
default_retry_delay=300, # 5 min mellem forsøg
acks_late=True, # Ack først når task er færdig (ikke ved start)
reject_on_worker_lost=True,
)
def sync_degree_days(self) -> dict:
"""
Hent graddage fra DMI og gem i DB.
Idempotent: ON CONFLICT DO UPDATE — sikker ved genudførelse.
"""
try:
from server.src.workers.tasks._dmi_impl import _sync_impl
result = _sync_impl()
logger.info("dmi_sync_complete", **result)
return result
except Exception as exc:
logger.error("dmi_sync_failed", error=str(exc), retry=self.request.retries)
# Exponential backoff: 5min, 10min, 20min:
raise self.retry(exc=exc, countdown=300 * (2 ** self.request.retries))
Priority queues: Kritisk vs. lav prioritet
# server/src/core/celery_config.py
from kombu import Queue
# 3 køer med prioritet:
task_queues = (
Queue("critical", routing_key="critical"), # Alarmer, gateway-checks
Queue("default", routing_key="default"), # Email, PDF-generering
Queue("low", routing_key="low"), # Backup, graddage, digest
)
task_default_queue = "default"
task_default_routing_key = "default"
# Workers startes med alle 3 køer, men critical har højest prioritet:
# celery worker -Q critical,default,low --concurrency=4
# Kritisk email (alarm) via eksplicit kø:
from server.src.workers.celery_app import app
def send_alarm_email_urgent(alarm_data: dict) -> None:
"""Send alarm email med kritisk prioritet."""
app.send_task(
"workers.tasks.email.send_alarm_email",
args=[alarm_data],
queue="critical",
priority=9, # 0-9, 9 = højest
)
Dead letter queue: Håndtér fejlede tasks
# Tasks der fejler efter max_retries havner i DLQ:
# celery_config.py tilføjelse:
task_routes = {
"workers.tasks.*": {"queue": "default"},
"workers.tasks.email.*": {"queue": "critical"},
"backup.*": {"queue": "low"},
}
# Dead letter exchange (RabbitMQ concept — simuleret med Redis):
# Fejlede tasks logges til audit_log for manuel intervention:
@shared_task(name="workers.celery_error_handler")
def handle_task_failure(uuid, exc, traceback, einfo, *args, **kwargs):
"""
Registreres som task_failure_handler — kaldes ved alle task-fejl.
Logger til audit_log og sender admin-alert ved kritiske fejl.
"""
import asyncio
from server.src.db.models import AuditLog
asyncio.run(_log_failure(uuid, str(exc), traceback))
app.conf.task_failure_handler = handle_task_failure
Flower: Real-time monitoring
# docker-compose.yml tilføjelse:
flower:
image: mher/flower:2.0
command: >
celery
--broker=redis://:${REDIS_PASSWORD}@redis:6379/0
flower
--port=5555
--basic_auth=${FLOWER_USER}:${FLOWER_PASSWORD}
--url_prefix=flower
ports:
- "5555:5555"
networks:
- internal
depends_on:
- redis
# Programmatisk Flower API — dashboard integration:
import httpx
async def get_celery_stats() -> dict:
"""Hent Celery worker-statistik fra Flower API."""
async with httpx.AsyncClient() as client:
r = await client.get(
"http://flower:5555/api/workers",
auth=(settings.FLOWER_USER, settings.FLOWER_PASSWORD),
)
workers = r.json()
return {
"active_tasks": sum(len(w.get("active", [])) for w in workers.values()),
"processed_today": sum(w.get("total", {}).get("workers.tasks.*", 0) for w in workers.values()),
"workers_online": len([w for w in workers.values() if w.get("status") == "OK"]),
}
RedisScheduler: Persistent beat-tilstand
# Standard beat gemmer schedule i lokalt fil — slettes ved deploy
# RedisScheduler gemmer i Redis — persistent på tværs af deployments
# Installation:
# pip install celery-redbeat
# celery_config.py:
beat_scheduler = "redbeat.RedisScheduler"
beat_max_loop_interval = 5 # Tjek hvert 5 sekunder
redbeat_redis_url = settings.REDIS_URL
redbeat_key_prefix = "redbeat:"
# Start beat:
# celery -A server.src.workers.celery_app beat \
# --scheduler redbeat.RedisScheduler \
# --loglevel=info
Konklusion
Celery beat med RedisScheduler giver persistent, deploy-resistent task-scheduling. Priority queues (critical/default/low) sikrer at alarmer ikke blokeres af tungere baggrundstasks. Idempotente tasks med ON CONFLICT + max_retries + exponential backoff håndterer midlertidige fejl. Flower giver real-time monitorering af workers og task-historik.