Python Protocol og structural subtyping — duck typing med type safety
Python Protocol klasser til strukturel subtyping: dependency injection, IoT sensor interfaces, runtime checking med runtime_checkable, vs ABC og duck typing, og FastAPI Depends() integration.
Af M-Bus Gateway
Protocol fra Python 3.8+ giver duck typing med fuld mypy-support. Her er de praktiske mønstre til IoT-platform og FastAPI.
Protocol vs. ABC
# ABC (Abstract Base Class) — nominativ typing:
from abc import ABC, abstractmethod
class MeterReader(ABC):
@abstractmethod
async def read(self, meter_id: str) -> float: ...
# Problem: Klasse skal eksplicit arve fra MeterReader
class WmbusReader(MeterReader): # Kræver explicit arv
async def read(self, meter_id: str) -> float:
return await _wmbus_read(meter_id)
# Protocol — strukturel subtyping (duck typing + type safety):
from typing import Protocol, runtime_checkable
@runtime_checkable
class MeterReader(Protocol):
async def read(self, meter_id: str) -> float: ...
# Klasse behøver IKKE arve fra Protocol — blot have read()-metoden:
class WmbusReader: # Ingen arv påkrævet
async def read(self, meter_id: str) -> float:
return await _wmbus_read(meter_id)
class MockReader: # Til tests — ingen arv
async def read(self, meter_id: str) -> float:
return 1234.5
# Begge satisfier MeterReader Protocol:
reader: MeterReader = WmbusReader() # mypy: OK
reader: MeterReader = MockReader() # mypy: OK
IoT sensor Protocol-hierarki
# gateway/src/protocols.py
from typing import Protocol, AsyncIterator
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Reading:
meter_id: str
timestamp: datetime
value: float
unit: str
class MetricSource(Protocol):
"""Alt der kan levere en aflæsning."""
async def read(self, meter_id: str) -> Reading: ...
class StreamingSource(Protocol):
"""Alt der kan streame kontinuerlige aflæsninger."""
def stream(self) -> AsyncIterator[Reading]: ...
class ConfigurableSource(MetricSource, Protocol):
"""MetricSource med konfigurationsevne."""
async def configure(self, config: dict) -> None: ...
async def read(self, meter_id: str) -> Reading: ...
# Konkrete implementeringer:
class WmbusListener:
"""Lytter på wM-Bus via USB-dongle — satisfier MetricSource."""
async def read(self, meter_id: str) -> Reading:
data = await _read_from_wmbus(meter_id)
return Reading(meter_id=meter_id, **data)
def stream(self) -> AsyncIterator[Reading]:
return _wmbus_stream()
class ModbusReader:
"""Modbus TCP læser — satisfier MetricSource."""
async def read(self, meter_id: str) -> Reading:
data = await _modbus_read(meter_id)
return Reading(meter_id=meter_id, **data)
Dependency injection med Protocol
# server/src/readings/service.py
from typing import Protocol
class ReadingRepository(Protocol):
"""Database-interface — satisfies af SQLAlchemy repo og mock."""
async def save(self, reading: Reading) -> None: ...
async def get_latest(self, meter_id: str) -> Reading | None: ...
async def bulk_save(self, readings: list[Reading]) -> int: ...
class AlertNotifier(Protocol):
"""Notifikationsinterface — email, Slack eller webhook."""
async def notify(self, message: str, severity: str) -> None: ...
class ReadingService:
"""
Afhænger af Protocols — ikke konkrete klasser.
Giver ren separation og nem testbarhed.
"""
def __init__(
self,
repository: ReadingRepository,
notifier: AlertNotifier,
source: MetricSource,
) -> None:
self._repo = repository
self._notifier = notifier
self._source = source
async def ingest(self, meter_id: str) -> Reading:
reading = await self._source.read(meter_id)
await self._repo.save(reading)
if reading.value > ALARM_THRESHOLD:
await self._notifier.notify(
f"Høj aflæsning: {reading.value} {reading.unit}",
severity="warning",
)
return reading
FastAPI Depends() med Protocol
# server/src/dependencies.py
from fastapi import Depends
from functools import lru_cache
# Konkret implementation (produktion):
class SQLReadingRepository:
async def save(self, reading: Reading) -> None:
async with get_async_session() as session:
session.add(ReadingModel(**asdict(reading)))
await session.commit()
async def get_latest(self, meter_id: str) -> Reading | None:
...
async def bulk_save(self, readings: list[Reading]) -> int:
...
@lru_cache
def get_reading_repository() -> ReadingRepository:
return SQLReadingRepository()
@lru_cache
def get_alert_notifier() -> AlertNotifier:
return BrevoEmailNotifier()
# FastAPI endpoint bruger Protocols (ikke konkrete klasser):
@router.post("/readings/ingest/{meter_id}")
async def ingest_reading(
meter_id: str,
repo: ReadingRepository = Depends(get_reading_repository),
notifier: AlertNotifier = Depends(get_alert_notifier),
) -> ReadingOut:
service = ReadingService(repo, notifier, source=WmbusListener())
reading = await service.ingest(meter_id)
return ReadingOut.model_validate(asdict(reading))
Test override med Protocol
# server/tests/conftest.py
class MockReadingRepository:
"""In-memory implementation — satisfier ReadingRepository Protocol."""
def __init__(self):
self._store: list[Reading] = []
async def save(self, reading: Reading) -> None:
self._store.append(reading)
async def get_latest(self, meter_id: str) -> Reading | None:
matches = [r for r in self._store if r.meter_id == meter_id]
return matches[-1] if matches else None
async def bulk_save(self, readings: list[Reading]) -> int:
self._store.extend(readings)
return len(readings)
@pytest.fixture
def mock_repo() -> MockReadingRepository:
return MockReadingRepository()
# Test bruger mock — ingen database:
@pytest.mark.asyncio
async def test_ingest_saves_reading(mock_repo):
notifier = MockAlertNotifier()
source = MockMeterSource(value=1234.5)
service = ReadingService(mock_repo, notifier, source)
reading = await service.ingest("12345678")
assert len(mock_repo._store) == 1
assert mock_repo._store[0].value == 1234.5
runtime_checkable og isinstance
# @runtime_checkable muliggør isinstance-check:
@runtime_checkable
class HasRead(Protocol):
async def read(self, meter_id: str) -> Reading: ...
wmbus = WmbusListener()
mock = MockMeterSource()
# isinstance virker med runtime_checkable:
assert isinstance(wmbus, HasRead) # True (har read-metode)
assert isinstance(mock, HasRead) # True
# Bruges i factory-funktion:
def create_source(config: dict) -> MetricSource:
source_type = config.get("type", "wmbus")
sources = {
"wmbus": WmbusListener,
"modbus": ModbusReader,
"mock": MockMeterSource,
}
cls = sources.get(source_type, WmbusListener)
source = cls()
if not isinstance(source, MetricSource):
raise TypeError(f"{cls} satisfier ikke MetricSource Protocol")
return source
Konklusion
Protocol giver strukturel subtyping — klasser behøver ikke arve, blot implementere de nødvendige metoder. Dette er idéelt til IoT-platforme med multiple sensor-sources (wM-Bus, Modbus, Mock) og til dependency injection i FastAPI via Depends(). @runtime_checkable muliggør isinstance-check i factory-funktioner. Til tests erstatter simple mock-klasser komplekse stubs — ingen unittest.mock.MagicMock nødvendig.
Se Python type hints avancerede mønstre eller FastAPI dependency injection guide.