· Celery· Redis· baggrundsjobs· asynkron· PDF· email· IoT· arkitektur· Python
Celery og Redis til IoT baggrundsjobs — arkitektur og best practices
Hvordan M-Bus Gateway bruger Celery + Redis til asynkrone baggrundsjobs: PDF-generering, email-afsendelse, OCR, Stripe-synkronisering, DMI-data og onboarding drip-emails.
Af M-Bus Gateway
En SaaS-platform til IoT-data kræver mange asynkrone jobs: PDF-generering, email-afsendelse, ekstern API-polling, periodiske alarmer. Celery + Redis er standardstakken for dette i Python.
Arkitektur-oversigt
[FastAPI — synkron request/response]
│
│ .delay() / .apply_async()
↓
[Redis broker — job queue]
│
│ worker poll
↓
[Celery workers]
├── PDF-generering (WeasyPrint)
├── Email-afsendelse (Brevo)
├── OCR-scanning (faktura-parsing)
├── DMI graddage-sync (daglig)
├── Stripe-synkronisering (månedlig)
├── Onboarding drip-emails (daglig)
├── BEK 563 deadline-alarm (månedlig)
└── Månedlig portefølje-digest
[Celery Beat — periodiske jobs]
→ Scheduler for cron-lignende tasks
Opsætning
# server/src/workers/celery_app.py
from celery import Celery
from celery.schedules import crontab
celery = Celery(
"mbus_workers",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1",
include=[
"server.src.workers.tasks.pdf",
"server.src.workers.tasks.email",
"server.src.workers.tasks.ocr",
"server.src.workers.tasks.dmi",
"server.src.workers.tasks.stripe",
"server.src.workers.tasks.onboarding_drip",
"server.src.workers.tasks.settlement_deadlines",
"server.src.workers.tasks.monthly_digest",
],
)
celery.conf.beat_schedule = {
# DMI graddage: Daglig kl. 04:00 UTC
"dmi-sync": {
"task": "server.src.workers.tasks.dmi.sync_degree_days",
"schedule": crontab(hour=4, minute=0),
},
# Onboarding drip: Daglig kl. 08:30 UTC
"onboarding-drip": {
"task": "server.src.workers.tasks.onboarding_drip.send_drip_emails",
"schedule": crontab(hour=8, minute=30),
},
# BEK 563 deadline-alarm: Månedligt, 5. kl. 07:00 UTC
"settlement-deadlines": {
"task": "server.src.workers.tasks.settlement_deadlines.check_deadlines",
"schedule": crontab(hour=7, minute=0, day_of_month=5),
},
# Stripe enheds-synkronisering: 1. kl. 05:00 UTC
"stripe-unit-sync": {
"task": "server.src.workers.tasks.stripe.sync_subscription_units",
"schedule": crontab(hour=5, minute=0, day_of_month=1),
},
# Månedlig portefølje-digest: 2. kl. 06:00 UTC
"monthly-digest": {
"task": "server.src.workers.tasks.monthly_digest.send_digests",
"schedule": crontab(hour=6, minute=0, day_of_month=2),
},
}
celery.conf.task_serializer = "json"
celery.conf.result_expires = 86400 # 24 timer
PDF-generering (WeasyPrint)
# server/src/workers/tasks/pdf.py
from celery import shared_task
from weasyprint import HTML, CSS
import asyncio
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def generate_settlement_pdf(self, settlement_id: str):
"""Generer PDF-afregning og gem til Hetzner Object Storage."""
try:
# Hent afregningsdata (sync wrapper om async DB):
settlement_data = asyncio.run(_fetch_settlement(settlement_id))
# Render HTML-skabelon:
html_content = render_settlement_template(settlement_data)
# Generer PDF:
pdf_bytes = HTML(string=html_content).write_pdf(
stylesheets=[CSS(filename="templates/settlement.css")]
)
# Upload til Hetzner Object Storage:
s3_key = f"settlements/{settlement_id}/afregning.pdf"
upload_to_object_storage(s3_key, pdf_bytes, content_type="application/pdf")
# Opdater settlement.pdf_url i DB:
asyncio.run(_update_settlement_pdf_url(settlement_id, s3_key))
return {"status": "ok", "settlement_id": settlement_id, "size_bytes": len(pdf_bytes)}
except Exception as exc:
raise self.retry(exc=exc)
Email med Brevo (retry-logik)
# server/src/workers/tasks/email.py
from celery import shared_task
import sib_api_v3_sdk
@shared_task(bind=True, max_retries=5, default_retry_delay=60)
def send_settlement_email(self, tenant_id: str, settlement_line_id: str):
"""Send afregnings-email til lejer med magic-link til lejer-portal."""
try:
data = asyncio.run(_fetch_settlement_line(settlement_line_id))
api = sib_api_v3_sdk.TransactionalEmailsApi(get_brevo_client())
send_email = sib_api_v3_sdk.SendSmtpEmail(
to=[{"email": data["email"], "name": data["name"]}],
template_id=SETTLEMENT_TEMPLATE_ID,
params={
"magic_link": generate_magic_link(settlement_line_id),
"period": data["period"],
"balance": data["balance_kr"],
"is_refund": data["balance_kr"] < 0,
},
)
api.send_transac_email(send_email)
except sib_api_v3_sdk.ApiException as exc:
if exc.status == 429: # Rate limit
raise self.retry(exc=exc, countdown=300) # Vent 5 min
raise self.retry(exc=exc)
OCR-scanning (faktura-parsing)
@shared_task(bind=True, max_retries=2)
def scan_annual_input_ocr(self, annual_input_id: str, file_url: str):
"""OCR-scan af fjernvarmefaktura og ekstraher nøglefelter."""
try:
# Download PDF/billede fra Object Storage:
file_bytes = download_from_storage(file_url)
# Kør OCR (Tesseract eller cloud-API):
text = run_ocr(file_bytes)
# Regex-ekstraktion:
extracted = {
"total_kr": extract_amount(text, patterns=DANISH_AMOUNT_PATTERNS),
"total_mwh": extract_value(text, patterns=MWH_PATTERNS),
"period_start": extract_date(text, patterns=PERIOD_START_PATTERNS),
"period_end": extract_date(text, patterns=PERIOD_END_PATTERNS),
"confidence": calculate_confidence(extracted),
}
# Gem ekstraktion + status:
asyncio.run(_update_annual_input_ocr(annual_input_id, extracted))
except Exception as exc:
asyncio.run(_mark_ocr_failed(annual_input_id, str(exc)))
raise self.retry(exc=exc)
DMI graddage-sync
@shared_task
def sync_degree_days():
"""Hent graddage fra DMI Klimagrid og gem i degree_days-tabel."""
import httpx
# DMI's gratis API (kræver API-nøgle registreret hos DMI):
url = "https://dmigw.govcloud.dk/v2/climateData/collections/municipalityValue/items"
params = {
"parameterId": "mean_temp",
"datetime": f"{yesterday()}/{today()}",
"api-key": DMI_API_KEY,
}
response = httpx.get(url, params=params)
items = response.json()["features"]
degree_days = []
for item in items:
temp = item["properties"]["value"]
base_temp = 17.0 # Standard graddage-base
hdd = max(0, base_temp - temp)
degree_days.append({
"date": item["properties"]["from"][:10],
"municipality_id": item["properties"]["municipalityId"],
"hdd": hdd,
})
asyncio.run(_upsert_degree_days(degree_days))
return {"synced": len(degree_days)}
Docker Compose — Celery i produktion
# docker-compose.yml (udsnit)
services:
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
command: redis-server --appendonly yes # Persistens
celery-worker:
build: ./server
command: celery -A server.src.workers.celery_app worker --loglevel=info --concurrency=4
environment:
- REDIS_URL=redis://redis:6379/0
- DATABASE_URL=${DATABASE_URL}
- BREVO_API_KEY=${BREVO_API_KEY}
depends_on: [redis, db]
celery-beat:
build: ./server
command: celery -A server.src.workers.celery_app beat --loglevel=info
depends_on: [redis]
# OBS: Kun ÉN beat-instans — ellers dobbelt-kørsel af periodiske jobs
volumes:
redis_data:
Overvågning med Flower
# Flower — Celery web-UI til overvågning af jobs:
celery -A server.src.workers.celery_app flower --port=5555
# Tilgås via: http://hetzner-ip:5555
# Viser: Aktive workers, queue-dybde, job-historik, fejlede tasks
Konklusion
Celery + Redis håndterer al asynkron behandling i platformen: PDF-generering (WeasyPrint), email (Brevo), OCR, graddage-sync (DMI), Stripe-synkronisering og onboarding-emails. Beat-scheduler erstatter cron og giver fuld observabilitet via Flower. Retry-logik med eksponentiel backoff sikrer robusthed ved API-fejl.