M-Bus Gateway
← Tilbage til blog
· Celery· Redis· baggrundsjobs· asynkron· PDF· email· IoT· arkitektur· Python

Celery og Redis til IoT baggrundsjobs — arkitektur og best practices

Hvordan M-Bus Gateway bruger Celery + Redis til asynkrone baggrundsjobs: PDF-generering, email-afsendelse, OCR, Stripe-synkronisering, DMI-data og onboarding drip-emails.

Af M-Bus Gateway

En SaaS-platform til IoT-data kræver mange asynkrone jobs: PDF-generering, email-afsendelse, ekstern API-polling, periodiske alarmer. Celery + Redis er standardstakken for dette i Python.


Arkitektur-oversigt

[FastAPI — synkron request/response]
        │
        │ .delay() / .apply_async()
        ↓
[Redis broker — job queue]
        │
        │ worker poll
        ↓
[Celery workers]
    ├── PDF-generering (WeasyPrint)
    ├── Email-afsendelse (Brevo)
    ├── OCR-scanning (faktura-parsing)
    ├── DMI graddage-sync (daglig)
    ├── Stripe-synkronisering (månedlig)
    ├── Onboarding drip-emails (daglig)
    ├── BEK 563 deadline-alarm (månedlig)
    └── Månedlig portefølje-digest

[Celery Beat — periodiske jobs]
    → Scheduler for cron-lignende tasks

Opsætning

# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab

celery = Celery(
    "mbus_workers",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",
    include=[
        "server.src.workers.tasks.pdf",
        "server.src.workers.tasks.email",
        "server.src.workers.tasks.ocr",
        "server.src.workers.tasks.dmi",
        "server.src.workers.tasks.stripe",
        "server.src.workers.tasks.onboarding_drip",
        "server.src.workers.tasks.settlement_deadlines",
        "server.src.workers.tasks.monthly_digest",
    ],
)

celery.conf.beat_schedule = {
    # DMI graddage: Daglig kl. 04:00 UTC
    "dmi-sync": {
        "task": "server.src.workers.tasks.dmi.sync_degree_days",
        "schedule": crontab(hour=4, minute=0),
    },
    # Onboarding drip: Daglig kl. 08:30 UTC
    "onboarding-drip": {
        "task": "server.src.workers.tasks.onboarding_drip.send_drip_emails",
        "schedule": crontab(hour=8, minute=30),
    },
    # BEK 563 deadline-alarm: Månedligt, 5. kl. 07:00 UTC
    "settlement-deadlines": {
        "task": "server.src.workers.tasks.settlement_deadlines.check_deadlines",
        "schedule": crontab(hour=7, minute=0, day_of_month=5),
    },
    # Stripe enheds-synkronisering: 1. kl. 05:00 UTC
    "stripe-unit-sync": {
        "task": "server.src.workers.tasks.stripe.sync_subscription_units",
        "schedule": crontab(hour=5, minute=0, day_of_month=1),
    },
    # Månedlig portefølje-digest: 2. kl. 06:00 UTC
    "monthly-digest": {
        "task": "server.src.workers.tasks.monthly_digest.send_digests",
        "schedule": crontab(hour=6, minute=0, day_of_month=2),
    },
}

celery.conf.task_serializer = "json"
celery.conf.result_expires = 86400  # 24 timer

PDF-generering (WeasyPrint)

# server/src/workers/tasks/pdf.py
from celery import shared_task
from weasyprint import HTML, CSS
import asyncio

@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def generate_settlement_pdf(self, settlement_id: str):
    """Generer PDF-afregning og gem til Hetzner Object Storage."""
    try:
        # Hent afregningsdata (sync wrapper om async DB):
        settlement_data = asyncio.run(_fetch_settlement(settlement_id))

        # Render HTML-skabelon:
        html_content = render_settlement_template(settlement_data)

        # Generer PDF:
        pdf_bytes = HTML(string=html_content).write_pdf(
            stylesheets=[CSS(filename="templates/settlement.css")]
        )

        # Upload til Hetzner Object Storage:
        s3_key = f"settlements/{settlement_id}/afregning.pdf"
        upload_to_object_storage(s3_key, pdf_bytes, content_type="application/pdf")

        # Opdater settlement.pdf_url i DB:
        asyncio.run(_update_settlement_pdf_url(settlement_id, s3_key))

        return {"status": "ok", "settlement_id": settlement_id, "size_bytes": len(pdf_bytes)}

    except Exception as exc:
        raise self.retry(exc=exc)

Email med Brevo (retry-logik)

# server/src/workers/tasks/email.py
from celery import shared_task
import sib_api_v3_sdk

@shared_task(bind=True, max_retries=5, default_retry_delay=60)
def send_settlement_email(self, tenant_id: str, settlement_line_id: str):
    """Send afregnings-email til lejer med magic-link til lejer-portal."""
    try:
        data = asyncio.run(_fetch_settlement_line(settlement_line_id))

        api = sib_api_v3_sdk.TransactionalEmailsApi(get_brevo_client())
        send_email = sib_api_v3_sdk.SendSmtpEmail(
            to=[{"email": data["email"], "name": data["name"]}],
            template_id=SETTLEMENT_TEMPLATE_ID,
            params={
                "magic_link": generate_magic_link(settlement_line_id),
                "period": data["period"],
                "balance": data["balance_kr"],
                "is_refund": data["balance_kr"] < 0,
            },
        )
        api.send_transac_email(send_email)

    except sib_api_v3_sdk.ApiException as exc:
        if exc.status == 429:  # Rate limit
            raise self.retry(exc=exc, countdown=300)  # Vent 5 min
        raise self.retry(exc=exc)

OCR-scanning (faktura-parsing)

@shared_task(bind=True, max_retries=2)
def scan_annual_input_ocr(self, annual_input_id: str, file_url: str):
    """OCR-scan af fjernvarmefaktura og ekstraher nøglefelter."""
    try:
        # Download PDF/billede fra Object Storage:
        file_bytes = download_from_storage(file_url)

        # Kør OCR (Tesseract eller cloud-API):
        text = run_ocr(file_bytes)

        # Regex-ekstraktion:
        extracted = {
            "total_kr": extract_amount(text, patterns=DANISH_AMOUNT_PATTERNS),
            "total_mwh": extract_value(text, patterns=MWH_PATTERNS),
            "period_start": extract_date(text, patterns=PERIOD_START_PATTERNS),
            "period_end": extract_date(text, patterns=PERIOD_END_PATTERNS),
            "confidence": calculate_confidence(extracted),
        }

        # Gem ekstraktion + status:
        asyncio.run(_update_annual_input_ocr(annual_input_id, extracted))

    except Exception as exc:
        asyncio.run(_mark_ocr_failed(annual_input_id, str(exc)))
        raise self.retry(exc=exc)

DMI graddage-sync

@shared_task
def sync_degree_days():
    """Hent graddage fra DMI Klimagrid og gem i degree_days-tabel."""
    import httpx

    # DMI's gratis API (kræver API-nøgle registreret hos DMI):
    url = "https://dmigw.govcloud.dk/v2/climateData/collections/municipalityValue/items"
    params = {
        "parameterId": "mean_temp",
        "datetime": f"{yesterday()}/{today()}",
        "api-key": DMI_API_KEY,
    }
    response = httpx.get(url, params=params)
    items = response.json()["features"]

    degree_days = []
    for item in items:
        temp = item["properties"]["value"]
        base_temp = 17.0  # Standard graddage-base
        hdd = max(0, base_temp - temp)
        degree_days.append({
            "date": item["properties"]["from"][:10],
            "municipality_id": item["properties"]["municipalityId"],
            "hdd": hdd,
        })

    asyncio.run(_upsert_degree_days(degree_days))
    return {"synced": len(degree_days)}

Docker Compose — Celery i produktion

# docker-compose.yml (udsnit)
services:
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes  # Persistens

  celery-worker:
    build: ./server
    command: celery -A server.src.workers.celery_app worker --loglevel=info --concurrency=4
    environment:
      - REDIS_URL=redis://redis:6379/0
      - DATABASE_URL=${DATABASE_URL}
      - BREVO_API_KEY=${BREVO_API_KEY}
    depends_on: [redis, db]

  celery-beat:
    build: ./server
    command: celery -A server.src.workers.celery_app beat --loglevel=info
    depends_on: [redis]
    # OBS: Kun ÉN beat-instans — ellers dobbelt-kørsel af periodiske jobs

volumes:
  redis_data:

Overvågning med Flower

# Flower — Celery web-UI til overvågning af jobs:
celery -A server.src.workers.celery_app flower --port=5555

# Tilgås via: http://hetzner-ip:5555
# Viser: Aktive workers, queue-dybde, job-historik, fejlede tasks

Konklusion

Celery + Redis håndterer al asynkron behandling i platformen: PDF-generering (WeasyPrint), email (Brevo), OCR, graddage-sync (DMI), Stripe-synkronisering og onboarding-emails. Beat-scheduler erstatter cron og giver fuld observabilitet via Flower. Retry-logik med eksponentiel backoff sikrer robusthed ved API-fejl.

Se teknisk arkitektur eller API-dokumentation.