Celery Beat til periodiske IoT-opgaver — arkitektur og fejlhåndtering
Celery Beat til periodiske baggrundsopgaver i IoT-platform: beat-schedule konfiguration, retry-logik med eksponentiel backoff, task prioritering, monitoring og fejlhåndtering.
Af M-Bus Gateway
M-Bus Gateway platformen bruger Celery Beat til 15+ periodiske baggrundsopgaver — fra daglig MQTT-aflæsning til månedlig Stripe-synkronisering. Her er arkitekturen og mønstrene der sikrer pålidelighed.
Opgaveliste: Hvad kører hvornår?
# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
app = Celery("mbus", broker=settings.REDIS_URL)
app.conf.beat_schedule = {
# Daglige opgaver:
"dmi-graddage-sync": {
"task": "dmi_sync",
"schedule": crontab(hour=1, minute=0), # 01:00 UTC
},
"reading-coverage-alert": {
"task": "reading_coverage_alert",
"schedule": crontab(hour=9, minute=30), # 09:30 UTC
},
"settlement-reminder": {
"task": "settlement_reminder",
"schedule": crontab(hour=10, minute=0), # 10:00 UTC
},
"onboarding-drip": {
"task": "onboarding_drip",
"schedule": crontab(hour=8, minute=30), # 08:30 UTC
},
# Månedlige opgaver:
"subscription-sync": {
"task": "subscription_sync",
"schedule": crontab(day_of_month=1, hour=5), # 1. kl. 05:00 UTC
},
"monthly-digest": {
"task": "monthly_digest",
"schedule": crontab(day_of_month=2, hour=6), # 2. kl. 06:00 UTC
},
"settlement-deadline-alert": {
"task": "settlement_deadline_alert",
"schedule": crontab(day_of_month=5, hour=7), # 5. kl. 07:00 UTC
},
}
app.conf.timezone = "UTC"
app.conf.task_serializer = "json"
app.conf.result_backend = settings.REDIS_URL
Task-definition med retry-logik
# server/src/workers/tasks/dmi_sync.py
from celery import shared_task
from celery.exceptions import Retry
import httpx
@shared_task(
name="dmi_sync",
bind=True,
max_retries=5,
default_retry_delay=60, # 1 minut start
autoretry_for=(httpx.TimeoutException, httpx.ConnectError),
retry_backoff=True, # Eksponentiel backoff: 60s, 120s, 240s...
retry_backoff_max=3600, # Max 1 time mellem forsøg
retry_jitter=True, # Tilfældig variation (undgår thundering herd)
)
def sync_dmi_degree_days(self, days_back: int = 3):
"""Synk graddage fra DMI Klimagrid API."""
try:
import asyncio
asyncio.run(_sync_degree_days(days_back))
except Exception as exc:
logger.error("DMI sync fejlede", error=str(exc), attempt=self.request.retries)
raise self.retry(exc=exc)
async def _sync_degree_days(days_back: int):
async with httpx.AsyncClient(timeout=30) as client:
for day_offset in range(days_back):
date = (datetime.utcnow() - timedelta(days=day_offset)).date()
url = f"https://dmigw.govcloud.dk/v2/climategrid/collections/items"
params = {
"datetime": date.isoformat(),
"parameter-id": "heating_degree_days",
"api-key": settings.DMI_API_KEY,
}
resp = await client.get(url, params=params)
resp.raise_for_status()
await _upsert_degree_days(resp.json(), date)
Prioriterede køer
# server/src/workers/celery_app.py
app.conf.task_routes = {
# Høj prioritet — lejer-vendte operationer:
"generate_settlement_pdf": {"queue": "high"},
"send_settlement_email": {"queue": "high"},
"send_tenant_welcome_email": {"queue": "high"},
# Normal prioritet:
"dmi_sync": {"queue": "default"},
"reading_coverage_alert": {"queue": "default"},
"settlement_reminder": {"queue": "default"},
# Lav prioritet — batch-operationer:
"subscription_sync": {"queue": "low"},
"monthly_digest": {"queue": "low"},
"settlement_deadline_alert": {"queue": "low"},
}
# Docker Compose worker-konfiguration:
# worker_high: --queues high --concurrency 4
# worker_default: --queues default --concurrency 2
# worker_low: --queues low --concurrency 1
Docker Compose: Beat + Workers
# docker-compose.yml
services:
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: unless-stopped
celery-beat:
build: ./server
command: celery -A server.src.workers.celery_app beat --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
# Kun ÉN beat-instans — aldrig skalér denne!
worker-high:
build: ./server
command: celery -A server.src.workers.celery_app worker
--queues high --concurrency 4 --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
worker-default:
build: ./server
command: celery -A server.src.workers.celery_app worker
--queues default --concurrency 2 --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
Idempotens: Kritisk for periodiske opgaver
Celery Beat kan sende en task flere gange (ved restart). Alle tasks SKAL være idempotente:
# FORKERT — ikke idempotent:
async def send_monthly_digest(tenant_id: str, month: str):
email_content = build_digest(tenant_id, month)
await send_email(email_content) # Sender ved hvert kald!
# KORREKT — idempotent med sent-tracking:
async def send_monthly_digest(tenant_id: str, month: str):
# Tjek om allerede sendt for denne måned:
already_sent = await check_digest_sent(tenant_id, month)
if already_sent:
logger.info("Månedlig digest allerede sendt", tenant=tenant_id, month=month)
return
email_content = build_digest(tenant_id, month)
await send_email(email_content)
await mark_digest_sent(tenant_id, month) # Atomisk upsert
Task-monitoring med Flower
# Start Flower monitoring UI:
celery -A server.src.workers.celery_app flower --port=5555
# Docker Compose:
services:
flower:
build: ./server
command: celery -A server.src.workers.celery_app flower
--port=5555 --basic_auth=admin:${FLOWER_PASSWORD}
ports:
- "5555:5555"
depends_on:
- redis
# Alternativt: Struktureret logging pr. task:
from celery.signals import task_success, task_failure, task_retry
@task_success.connect
def on_task_success(sender, result, **kwargs):
logger.info("Task fuldført", task=sender.name, result=str(result)[:100])
@task_failure.connect
def on_task_failure(sender, exception, traceback, **kwargs):
logger.error("Task fejlede", task=sender.name, error=str(exception))
@task_retry.connect
def on_task_retry(sender, reason, **kwargs):
logger.warning("Task gentager", task=sender.name, reason=str(reason))
Timeout og long-running tasks
# PDF-generering kan tage 10-30 sekunder:
@shared_task(
name="generate_settlement_pdf",
bind=True,
time_limit=300, # Hard limit: 5 minutter → SIGKILL
soft_time_limit=240, # Soft limit: 4 minutter → SoftTimeLimitExceeded
max_retries=3,
)
def generate_settlement_pdf(self, settlement_id: str):
try:
pdf_bytes = _generate_pdf(settlement_id)
_upload_to_storage(settlement_id, pdf_bytes)
_update_settlement_pdf_url(settlement_id)
except SoftTimeLimitExceeded:
logger.error("PDF generering timeout", settlement_id=settlement_id)
raise self.retry(countdown=60)
Konklusion
Celery Beat + Redis er det rette valg til periodiske IoT-opgaver: cron-lignende scheduling, prioriterede køer, eksponentiel backoff og idempotente tasks sikrer at platformen håndterer fejl og restarts uden at sende duplikate emails eller lave dobbelt-opkrævninger. Den eneste regel: aldrig skalér Beat-processen — kun workers.