M-Bus Gateway
← Tilbage til blog
· Python· lru_cache· caching· functools· Redis· FastAPI· performance· IoT

Python functools.lru_cache og caching mønstre — IoT platform

Python functools.lru_cache, cache, cached_property og Redis-baseret distributed cache til IoT-platform: TTL-strategier, cache invalidation, async patterns og memory-styring.

Af M-Bus Gateway

Effektiv caching reducerer database-belastning og API-svartider. Her er de produktionsklare mønstre fra Python's functools til Redis-distribueret cache.


lru_cache til statiske konfigurationer

# server/src/config/cache.py

from functools import lru_cache, cache
from datetime import timedelta


# cache = lru_cache(maxsize=None) — unbounded cache
# lru_cache(maxsize=128) — evict ældste ved grænse

@lru_cache(maxsize=256)
def get_meter_manufacturer_config(manufacturer_id: str) -> dict:
    """
    Cachér HCA-fabrikantskonfiguration.
    Indlæses én gang fra DB — ændres sjældent.
    Trådsikker (GIL) — OK til sync kode.
    """
    return _load_from_config_db(manufacturer_id)


@cache    # Unbounded — brug kun til data der aldrig ændres
def get_wmbusmeters_driver(fab_id: str) -> str:
    """FAB ID → wmbusmeters driver-navn (konstant)."""
    drivers = {
        "KAM": "multical21",
        "ELS": "sensostar",
        "APT": "apator162",
        "QDS": "q_caloric",
        "AXI": "qalcosonic_w1",
        "ZEN": "mbus_easy",
    }
    return drivers.get(fab_id, "unknown")


# Cache-info:
print(get_meter_manufacturer_config.cache_info())
# CacheInfo(hits=842, misses=12, maxsize=256, currsize=12)

# Invalidering (manuelt):
get_meter_manufacturer_config.cache_clear()

cached_property til Pydantic models

# cached_property: Beregnes én gang pr. instans

from functools import cached_property
from pydantic import BaseModel, model_validator
from decimal import Decimal


class SettlementCalculation(BaseModel):
    """Afregningsberegning med lazy-beregnede felter."""
    total_heat_cost: Decimal
    fixed_share: Decimal = Decimal("0.30")
    units: list[UnitData]

    model_config = {"arbitrary_types_allowed": True}

    @cached_property
    def fixed_pool(self) -> Decimal:
        """30%-pulje — beregnes kun én gang."""
        return self.total_heat_cost * self.fixed_share

    @cached_property
    def variable_pool(self) -> Decimal:
        """70%-pulje."""
        return self.total_heat_cost * (1 - self.fixed_share)

    @cached_property
    def total_area_m2(self) -> Decimal:
        """Samlet areal — bruges til fast-del fordeling."""
        return sum(u.floor_area_m2 for u in self.units)

    @cached_property
    def total_hca_units(self) -> Decimal:
        """Samlede HCA-enheder — bruges til variabel fordeling."""
        return sum(u.hca_units for u in self.units)

    def calculate_unit_share(self, unit: UnitData) -> Decimal:
        """Beregn én lejligheds andel (bruger cached properties)."""
        fixed = self.fixed_pool * (unit.floor_area_m2 / self.total_area_m2)
        variable = self.variable_pool * (unit.hca_units / self.total_hca_units)
        return fixed + variable

    # OBS: cached_property virker ikke med Pydantic v2 model_fields
    # Alternativ: Brug @property uden caching + dataclass til tunge beregninger

Async TTL cache med Redis

# server/src/cache/redis_cache.py
# Distribueret cache til multi-instance deployment

import json
import hashlib
from functools import wraps
from typing import Callable, Any


def redis_cache(ttl_seconds: int = 300, prefix: str = "mbus"):
    """
    Decorator til Redis-baseret async cache med TTL.
    Understøtter serialisering af Pydantic-modeller.
    """
    def decorator(fn: Callable) -> Callable:
        @wraps(fn)
        async def wrapper(*args, **kwargs):
            # Byg cache-nøgle fra funktion + argumenter:
            key_data = f"{fn.__module__}.{fn.__qualname__}:{args}:{kwargs}"
            cache_key = f"{prefix}:{hashlib.md5(key_data.encode()).hexdigest()}"

            # Tjek cache:
            cached = await redis.get(cache_key)
            if cached is not None:
                return json.loads(cached)

            # Kald original funktion:
            result = await fn(*args, **kwargs)

            # Gem i cache:
            if result is not None:
                serialized = json.dumps(
                    result,
                    default=lambda o: o.model_dump() if hasattr(o, "model_dump") else str(o),
                )
                await redis.setex(cache_key, ttl_seconds, serialized)

            return result

        # Tilføj invaliderings-metode:
        async def invalidate(*args, **kwargs):
            key_data = f"{fn.__module__}.{fn.__qualname__}:{args}:{kwargs}"
            cache_key = f"{prefix}:{hashlib.md5(key_data.encode()).hexdigest()}"
            await redis.delete(cache_key)

        wrapper.invalidate = invalidate
        return wrapper
    return decorator


# Brug:
@redis_cache(ttl_seconds=3600, prefix="property")
async def get_property_config(property_id: str) -> dict:
    """Hent ejendomskonfiguration — caches 1 time."""
    async with get_async_session() as session:
        prop = await session.get(Property, uuid.UUID(property_id))
        return prop.model_dump() if prop else None


# Invalidér ved konfigurationsændring:
await get_property_config.invalidate(str(property_id))

Cache-strategi for IoT-data

# Differentieret TTL baseret på data-volatilitet:

# Statisk data (ændres sjældent):
@redis_cache(ttl_seconds=86400)    # 24 timer
async def get_meter_master_data(meter_id: str) -> dict:
    """Måler-stamdata: Fabrikat, model, MID-dato."""
    ...


# Semi-statisk data (ændres månedligt):
@redis_cache(ttl_seconds=3600)    # 1 time
async def get_property_distribution_config(property_id: str) -> dict:
    """BEK 563-konfiguration, fordelingsmetode, etagekorrektioner."""
    ...


# Dashboard-data (ændres dagligt):
@redis_cache(ttl_seconds=300)     # 5 minutter
async def get_portfolio_kpi(tenant_id: str) -> dict:
    """Portfolio KPI-snapshot: Aktive gateways, stille målere, á conto."""
    ...


# Realtidsdata (aldrig cachés):
async def get_latest_reading(meter_installation_id: str) -> Reading:
    """Seneste aflæsning — altid fra database."""
    ...


# Pattern: Cache-aside
async def get_property_with_cache(property_id: uuid.UUID) -> Property:
    """Cache-aside pattern: Hent fra cache, fallback til DB."""
    cached = await redis.get(f"property:{property_id}")
    if cached:
        return Property.model_validate_json(cached)

    prop = await db_session.get(Property, property_id)
    if prop:
        await redis.setex(
            f"property:{property_id}",
            3600,
            prop.model_dump_json(),
        )
    return prop

Memory-styring og cache-eviction

# lru_cache memory-styring — undgå memory leaks

# Problem: Ubounded lru_cache på instansmetode:
class GatewayService:
    @lru_cache(maxsize=1000)    # FARLIGT: Caches self (holder reference!)
    def get_config(self, gateway_id: str) -> dict: ...


# Løsning A: Brug module-level funktion (ikke metode):
@lru_cache(maxsize=1000)
def get_gateway_config(gateway_id: str) -> dict:
    return _load_gateway_config(gateway_id)


# Løsning B: methodtools.lru_cache (ekstern pakke):
from methodtools import lru_cache as method_lru_cache

class GatewayService:
    @method_lru_cache(maxsize=100)
    def get_config(self, gateway_id: str) -> dict: ...


# Monitorer cache-effektivitet:
def log_cache_stats():
    info = get_meter_manufacturer_config.cache_info()
    hit_rate = info.hits / (info.hits + info.misses) if info.hits + info.misses > 0 else 0

    log.info(
        "cache.stats",
        function="get_meter_manufacturer_config",
        hit_rate=round(hit_rate, 3),
        cache_size=info.currsize,
        max_size=info.maxsize,
    )

# Konfigurer størrelse baseret på unikke keys:
# Antal fabrikanter: ~20 → maxsize=64 er rigeligt
# Antal ejendomme: Kan vokse → brug Redis, ikke lru_cache

Konklusion

lru_cache og cache er optimale til statiske konfigurationer (fabrikant-data, driver-mapping) der ændres sjældent og passer i én process. cached_property beregner afregnings-mellemprodukter (fixed/variable pool, total area) lazy og kun én gang pr. objekt. Redis-baseret distributed cache med TTL er nødvendig for multi-instance deployments (Celery workers + FastAPI) og data der deles på tværs af processer. Cache-strategi differentierer TTL: 24t for stamdata, 1t for konfiguration, 5 min for dashboards, ingen cache for realtids-aflæsninger.

Se Redis Celery guide eller Redis caching FastAPI patterns guide.