Python functools.lru_cache og caching mønstre — IoT platform
Python functools.lru_cache, cache, cached_property og Redis-baseret distributed cache til IoT-platform: TTL-strategier, cache invalidation, async patterns og memory-styring.
Af M-Bus Gateway
Effektiv caching reducerer database-belastning og API-svartider. Her er de produktionsklare mønstre fra Python's functools til Redis-distribueret cache.
lru_cache til statiske konfigurationer
# server/src/config/cache.py
from functools import lru_cache, cache
from datetime import timedelta
# cache = lru_cache(maxsize=None) — unbounded cache
# lru_cache(maxsize=128) — evict ældste ved grænse
@lru_cache(maxsize=256)
def get_meter_manufacturer_config(manufacturer_id: str) -> dict:
"""
Cachér HCA-fabrikantskonfiguration.
Indlæses én gang fra DB — ændres sjældent.
Trådsikker (GIL) — OK til sync kode.
"""
return _load_from_config_db(manufacturer_id)
@cache # Unbounded — brug kun til data der aldrig ændres
def get_wmbusmeters_driver(fab_id: str) -> str:
"""FAB ID → wmbusmeters driver-navn (konstant)."""
drivers = {
"KAM": "multical21",
"ELS": "sensostar",
"APT": "apator162",
"QDS": "q_caloric",
"AXI": "qalcosonic_w1",
"ZEN": "mbus_easy",
}
return drivers.get(fab_id, "unknown")
# Cache-info:
print(get_meter_manufacturer_config.cache_info())
# CacheInfo(hits=842, misses=12, maxsize=256, currsize=12)
# Invalidering (manuelt):
get_meter_manufacturer_config.cache_clear()
cached_property til Pydantic models
# cached_property: Beregnes én gang pr. instans
from functools import cached_property
from pydantic import BaseModel, model_validator
from decimal import Decimal
class SettlementCalculation(BaseModel):
"""Afregningsberegning med lazy-beregnede felter."""
total_heat_cost: Decimal
fixed_share: Decimal = Decimal("0.30")
units: list[UnitData]
model_config = {"arbitrary_types_allowed": True}
@cached_property
def fixed_pool(self) -> Decimal:
"""30%-pulje — beregnes kun én gang."""
return self.total_heat_cost * self.fixed_share
@cached_property
def variable_pool(self) -> Decimal:
"""70%-pulje."""
return self.total_heat_cost * (1 - self.fixed_share)
@cached_property
def total_area_m2(self) -> Decimal:
"""Samlet areal — bruges til fast-del fordeling."""
return sum(u.floor_area_m2 for u in self.units)
@cached_property
def total_hca_units(self) -> Decimal:
"""Samlede HCA-enheder — bruges til variabel fordeling."""
return sum(u.hca_units for u in self.units)
def calculate_unit_share(self, unit: UnitData) -> Decimal:
"""Beregn én lejligheds andel (bruger cached properties)."""
fixed = self.fixed_pool * (unit.floor_area_m2 / self.total_area_m2)
variable = self.variable_pool * (unit.hca_units / self.total_hca_units)
return fixed + variable
# OBS: cached_property virker ikke med Pydantic v2 model_fields
# Alternativ: Brug @property uden caching + dataclass til tunge beregninger
Async TTL cache med Redis
# server/src/cache/redis_cache.py
# Distribueret cache til multi-instance deployment
import json
import hashlib
from functools import wraps
from typing import Callable, Any
def redis_cache(ttl_seconds: int = 300, prefix: str = "mbus"):
"""
Decorator til Redis-baseret async cache med TTL.
Understøtter serialisering af Pydantic-modeller.
"""
def decorator(fn: Callable) -> Callable:
@wraps(fn)
async def wrapper(*args, **kwargs):
# Byg cache-nøgle fra funktion + argumenter:
key_data = f"{fn.__module__}.{fn.__qualname__}:{args}:{kwargs}"
cache_key = f"{prefix}:{hashlib.md5(key_data.encode()).hexdigest()}"
# Tjek cache:
cached = await redis.get(cache_key)
if cached is not None:
return json.loads(cached)
# Kald original funktion:
result = await fn(*args, **kwargs)
# Gem i cache:
if result is not None:
serialized = json.dumps(
result,
default=lambda o: o.model_dump() if hasattr(o, "model_dump") else str(o),
)
await redis.setex(cache_key, ttl_seconds, serialized)
return result
# Tilføj invaliderings-metode:
async def invalidate(*args, **kwargs):
key_data = f"{fn.__module__}.{fn.__qualname__}:{args}:{kwargs}"
cache_key = f"{prefix}:{hashlib.md5(key_data.encode()).hexdigest()}"
await redis.delete(cache_key)
wrapper.invalidate = invalidate
return wrapper
return decorator
# Brug:
@redis_cache(ttl_seconds=3600, prefix="property")
async def get_property_config(property_id: str) -> dict:
"""Hent ejendomskonfiguration — caches 1 time."""
async with get_async_session() as session:
prop = await session.get(Property, uuid.UUID(property_id))
return prop.model_dump() if prop else None
# Invalidér ved konfigurationsændring:
await get_property_config.invalidate(str(property_id))
Cache-strategi for IoT-data
# Differentieret TTL baseret på data-volatilitet:
# Statisk data (ændres sjældent):
@redis_cache(ttl_seconds=86400) # 24 timer
async def get_meter_master_data(meter_id: str) -> dict:
"""Måler-stamdata: Fabrikat, model, MID-dato."""
...
# Semi-statisk data (ændres månedligt):
@redis_cache(ttl_seconds=3600) # 1 time
async def get_property_distribution_config(property_id: str) -> dict:
"""BEK 563-konfiguration, fordelingsmetode, etagekorrektioner."""
...
# Dashboard-data (ændres dagligt):
@redis_cache(ttl_seconds=300) # 5 minutter
async def get_portfolio_kpi(tenant_id: str) -> dict:
"""Portfolio KPI-snapshot: Aktive gateways, stille målere, á conto."""
...
# Realtidsdata (aldrig cachés):
async def get_latest_reading(meter_installation_id: str) -> Reading:
"""Seneste aflæsning — altid fra database."""
...
# Pattern: Cache-aside
async def get_property_with_cache(property_id: uuid.UUID) -> Property:
"""Cache-aside pattern: Hent fra cache, fallback til DB."""
cached = await redis.get(f"property:{property_id}")
if cached:
return Property.model_validate_json(cached)
prop = await db_session.get(Property, property_id)
if prop:
await redis.setex(
f"property:{property_id}",
3600,
prop.model_dump_json(),
)
return prop
Memory-styring og cache-eviction
# lru_cache memory-styring — undgå memory leaks
# Problem: Ubounded lru_cache på instansmetode:
class GatewayService:
@lru_cache(maxsize=1000) # FARLIGT: Caches self (holder reference!)
def get_config(self, gateway_id: str) -> dict: ...
# Løsning A: Brug module-level funktion (ikke metode):
@lru_cache(maxsize=1000)
def get_gateway_config(gateway_id: str) -> dict:
return _load_gateway_config(gateway_id)
# Løsning B: methodtools.lru_cache (ekstern pakke):
from methodtools import lru_cache as method_lru_cache
class GatewayService:
@method_lru_cache(maxsize=100)
def get_config(self, gateway_id: str) -> dict: ...
# Monitorer cache-effektivitet:
def log_cache_stats():
info = get_meter_manufacturer_config.cache_info()
hit_rate = info.hits / (info.hits + info.misses) if info.hits + info.misses > 0 else 0
log.info(
"cache.stats",
function="get_meter_manufacturer_config",
hit_rate=round(hit_rate, 3),
cache_size=info.currsize,
max_size=info.maxsize,
)
# Konfigurer størrelse baseret på unikke keys:
# Antal fabrikanter: ~20 → maxsize=64 er rigeligt
# Antal ejendomme: Kan vokse → brug Redis, ikke lru_cache
Konklusion
lru_cache og cache er optimale til statiske konfigurationer (fabrikant-data, driver-mapping) der ændres sjældent og passer i én process. cached_property beregner afregnings-mellemprodukter (fixed/variable pool, total area) lazy og kun én gang pr. objekt. Redis-baseret distributed cache med TTL er nødvendig for multi-instance deployments (Celery workers + FastAPI) og data der deles på tværs af processer. Cache-strategi differentierer TTL: 24t for stamdata, 1t for konfiguration, 5 min for dashboards, ingen cache for realtids-aflæsninger.
Se Redis Celery guide eller Redis caching FastAPI patterns guide.