M-Bus Gateway
← Tilbage til blog
· Celery· Celery Beat· Redis· baggrundsjobs· IoT· periodiske opgaver· Python· FastAPI· scheduler

Celery Beat til periodiske IoT-opgaver — arkitektur og fejlhåndtering

Celery Beat til periodiske baggrundsopgaver i IoT-platform: beat-schedule konfiguration, retry-logik med eksponentiel backoff, task prioritering, monitoring og fejlhåndtering.

Af M-Bus Gateway

M-Bus Gateway platformen bruger Celery Beat til 15+ periodiske baggrundsopgaver — fra daglig MQTT-aflæsning til månedlig Stripe-synkronisering. Her er arkitekturen og mønstrene der sikrer pålidelighed.


Opgaveliste: Hvad kører hvornår?

# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery("mbus", broker=settings.REDIS_URL)

app.conf.beat_schedule = {
    # Daglige opgaver:
    "dmi-graddage-sync": {
        "task": "dmi_sync",
        "schedule": crontab(hour=1, minute=0),      # 01:00 UTC
    },
    "reading-coverage-alert": {
        "task": "reading_coverage_alert",
        "schedule": crontab(hour=9, minute=30),     # 09:30 UTC
    },
    "settlement-reminder": {
        "task": "settlement_reminder",
        "schedule": crontab(hour=10, minute=0),     # 10:00 UTC
    },
    "onboarding-drip": {
        "task": "onboarding_drip",
        "schedule": crontab(hour=8, minute=30),     # 08:30 UTC
    },

    # Månedlige opgaver:
    "subscription-sync": {
        "task": "subscription_sync",
        "schedule": crontab(day_of_month=1, hour=5),  # 1. kl. 05:00 UTC
    },
    "monthly-digest": {
        "task": "monthly_digest",
        "schedule": crontab(day_of_month=2, hour=6),  # 2. kl. 06:00 UTC
    },
    "settlement-deadline-alert": {
        "task": "settlement_deadline_alert",
        "schedule": crontab(day_of_month=5, hour=7),  # 5. kl. 07:00 UTC
    },
}

app.conf.timezone = "UTC"
app.conf.task_serializer = "json"
app.conf.result_backend = settings.REDIS_URL

Task-definition med retry-logik

# server/src/workers/tasks/dmi_sync.py
from celery import shared_task
from celery.exceptions import Retry
import httpx

@shared_task(
    name="dmi_sync",
    bind=True,
    max_retries=5,
    default_retry_delay=60,  # 1 minut start
    autoretry_for=(httpx.TimeoutException, httpx.ConnectError),
    retry_backoff=True,       # Eksponentiel backoff: 60s, 120s, 240s...
    retry_backoff_max=3600,   # Max 1 time mellem forsøg
    retry_jitter=True,        # Tilfældig variation (undgår thundering herd)
)
def sync_dmi_degree_days(self, days_back: int = 3):
    """Synk graddage fra DMI Klimagrid API."""
    try:
        import asyncio
        asyncio.run(_sync_degree_days(days_back))
    except Exception as exc:
        logger.error("DMI sync fejlede", error=str(exc), attempt=self.request.retries)
        raise self.retry(exc=exc)


async def _sync_degree_days(days_back: int):
    async with httpx.AsyncClient(timeout=30) as client:
        for day_offset in range(days_back):
            date = (datetime.utcnow() - timedelta(days=day_offset)).date()
            url = f"https://dmigw.govcloud.dk/v2/climategrid/collections/items"
            params = {
                "datetime": date.isoformat(),
                "parameter-id": "heating_degree_days",
                "api-key": settings.DMI_API_KEY,
            }
            resp = await client.get(url, params=params)
            resp.raise_for_status()
            await _upsert_degree_days(resp.json(), date)

Prioriterede køer

# server/src/workers/celery_app.py
app.conf.task_routes = {
    # Høj prioritet — lejer-vendte operationer:
    "generate_settlement_pdf": {"queue": "high"},
    "send_settlement_email": {"queue": "high"},
    "send_tenant_welcome_email": {"queue": "high"},

    # Normal prioritet:
    "dmi_sync": {"queue": "default"},
    "reading_coverage_alert": {"queue": "default"},
    "settlement_reminder": {"queue": "default"},

    # Lav prioritet — batch-operationer:
    "subscription_sync": {"queue": "low"},
    "monthly_digest": {"queue": "low"},
    "settlement_deadline_alert": {"queue": "low"},
}

# Docker Compose worker-konfiguration:
# worker_high: --queues high --concurrency 4
# worker_default: --queues default --concurrency 2
# worker_low: --queues low --concurrency 1

Docker Compose: Beat + Workers

# docker-compose.yml
services:
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped

  celery-beat:
    build: ./server
    command: celery -A server.src.workers.celery_app beat --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    restart: unless-stopped
    # Kun ÉN beat-instans — aldrig skalér denne!

  worker-high:
    build: ./server
    command: celery -A server.src.workers.celery_app worker
             --queues high --concurrency 4 --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    restart: unless-stopped

  worker-default:
    build: ./server
    command: celery -A server.src.workers.celery_app worker
             --queues default --concurrency 2 --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    restart: unless-stopped

Idempotens: Kritisk for periodiske opgaver

Celery Beat kan sende en task flere gange (ved restart). Alle tasks SKAL være idempotente:

# FORKERT — ikke idempotent:
async def send_monthly_digest(tenant_id: str, month: str):
    email_content = build_digest(tenant_id, month)
    await send_email(email_content)  # Sender ved hvert kald!

# KORREKT — idempotent med sent-tracking:
async def send_monthly_digest(tenant_id: str, month: str):
    # Tjek om allerede sendt for denne måned:
    already_sent = await check_digest_sent(tenant_id, month)
    if already_sent:
        logger.info("Månedlig digest allerede sendt", tenant=tenant_id, month=month)
        return

    email_content = build_digest(tenant_id, month)
    await send_email(email_content)
    await mark_digest_sent(tenant_id, month)  # Atomisk upsert

Task-monitoring med Flower

# Start Flower monitoring UI:
celery -A server.src.workers.celery_app flower --port=5555

# Docker Compose:
services:
  flower:
    build: ./server
    command: celery -A server.src.workers.celery_app flower
             --port=5555 --basic_auth=admin:${FLOWER_PASSWORD}
    ports:
      - "5555:5555"
    depends_on:
      - redis
# Alternativt: Struktureret logging pr. task:
from celery.signals import task_success, task_failure, task_retry

@task_success.connect
def on_task_success(sender, result, **kwargs):
    logger.info("Task fuldført", task=sender.name, result=str(result)[:100])

@task_failure.connect
def on_task_failure(sender, exception, traceback, **kwargs):
    logger.error("Task fejlede", task=sender.name, error=str(exception))

@task_retry.connect
def on_task_retry(sender, reason, **kwargs):
    logger.warning("Task gentager", task=sender.name, reason=str(reason))

Timeout og long-running tasks

# PDF-generering kan tage 10-30 sekunder:
@shared_task(
    name="generate_settlement_pdf",
    bind=True,
    time_limit=300,        # Hard limit: 5 minutter → SIGKILL
    soft_time_limit=240,   # Soft limit: 4 minutter → SoftTimeLimitExceeded
    max_retries=3,
)
def generate_settlement_pdf(self, settlement_id: str):
    try:
        pdf_bytes = _generate_pdf(settlement_id)
        _upload_to_storage(settlement_id, pdf_bytes)
        _update_settlement_pdf_url(settlement_id)
    except SoftTimeLimitExceeded:
        logger.error("PDF generering timeout", settlement_id=settlement_id)
        raise self.retry(countdown=60)

Konklusion

Celery Beat + Redis er det rette valg til periodiske IoT-opgaver: cron-lignende scheduling, prioriterede køer, eksponentiel backoff og idempotente tasks sikrer at platformen håndterer fejl og restarts uden at sende duplikate emails eller lave dobbelt-opkrævninger. Den eneste regel: aldrig skalér Beat-processen — kun workers.

Se Celery Redis guide eller server arkitektur.