M-Bus Gateway
← Tilbage til blog
· Python· Protocol· typing· structural subtyping· dependency injection· FastAPI· backend

Python Protocol og structural subtyping — duck typing med type safety

Python Protocol klasser til strukturel subtyping: dependency injection, IoT sensor interfaces, runtime checking med runtime_checkable, vs ABC og duck typing, og FastAPI Depends() integration.

Af M-Bus Gateway

Protocol fra Python 3.8+ giver duck typing med fuld mypy-support. Her er de praktiske mønstre til IoT-platform og FastAPI.


Protocol vs. ABC

# ABC (Abstract Base Class) — nominativ typing:
from abc import ABC, abstractmethod

class MeterReader(ABC):
    @abstractmethod
    async def read(self, meter_id: str) -> float: ...

# Problem: Klasse skal eksplicit arve fra MeterReader
class WmbusReader(MeterReader):    # Kræver explicit arv
    async def read(self, meter_id: str) -> float:
        return await _wmbus_read(meter_id)


# Protocol — strukturel subtyping (duck typing + type safety):
from typing import Protocol, runtime_checkable

@runtime_checkable
class MeterReader(Protocol):
    async def read(self, meter_id: str) -> float: ...

# Klasse behøver IKKE arve fra Protocol — blot have read()-metoden:
class WmbusReader:                  # Ingen arv påkrævet
    async def read(self, meter_id: str) -> float:
        return await _wmbus_read(meter_id)

class MockReader:                   # Til tests — ingen arv
    async def read(self, meter_id: str) -> float:
        return 1234.5

# Begge satisfier MeterReader Protocol:
reader: MeterReader = WmbusReader()    # mypy: OK
reader: MeterReader = MockReader()     # mypy: OK

IoT sensor Protocol-hierarki

# gateway/src/protocols.py

from typing import Protocol, AsyncIterator
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Reading:
    meter_id: str
    timestamp: datetime
    value: float
    unit: str


class MetricSource(Protocol):
    """Alt der kan levere en aflæsning."""
    async def read(self, meter_id: str) -> Reading: ...


class StreamingSource(Protocol):
    """Alt der kan streame kontinuerlige aflæsninger."""
    def stream(self) -> AsyncIterator[Reading]: ...


class ConfigurableSource(MetricSource, Protocol):
    """MetricSource med konfigurationsevne."""
    async def configure(self, config: dict) -> None: ...
    async def read(self, meter_id: str) -> Reading: ...


# Konkrete implementeringer:
class WmbusListener:
    """Lytter på wM-Bus via USB-dongle — satisfier MetricSource."""
    async def read(self, meter_id: str) -> Reading:
        data = await _read_from_wmbus(meter_id)
        return Reading(meter_id=meter_id, **data)

    def stream(self) -> AsyncIterator[Reading]:
        return _wmbus_stream()


class ModbusReader:
    """Modbus TCP læser — satisfier MetricSource."""
    async def read(self, meter_id: str) -> Reading:
        data = await _modbus_read(meter_id)
        return Reading(meter_id=meter_id, **data)

Dependency injection med Protocol

# server/src/readings/service.py

from typing import Protocol


class ReadingRepository(Protocol):
    """Database-interface — satisfies af SQLAlchemy repo og mock."""
    async def save(self, reading: Reading) -> None: ...
    async def get_latest(self, meter_id: str) -> Reading | None: ...
    async def bulk_save(self, readings: list[Reading]) -> int: ...


class AlertNotifier(Protocol):
    """Notifikationsinterface — email, Slack eller webhook."""
    async def notify(self, message: str, severity: str) -> None: ...


class ReadingService:
    """
    Afhænger af Protocols — ikke konkrete klasser.
    Giver ren separation og nem testbarhed.
    """
    def __init__(
        self,
        repository: ReadingRepository,
        notifier: AlertNotifier,
        source: MetricSource,
    ) -> None:
        self._repo = repository
        self._notifier = notifier
        self._source = source

    async def ingest(self, meter_id: str) -> Reading:
        reading = await self._source.read(meter_id)
        await self._repo.save(reading)

        if reading.value > ALARM_THRESHOLD:
            await self._notifier.notify(
                f"Høj aflæsning: {reading.value} {reading.unit}",
                severity="warning",
            )

        return reading

FastAPI Depends() med Protocol

# server/src/dependencies.py

from fastapi import Depends
from functools import lru_cache

# Konkret implementation (produktion):
class SQLReadingRepository:
    async def save(self, reading: Reading) -> None:
        async with get_async_session() as session:
            session.add(ReadingModel(**asdict(reading)))
            await session.commit()

    async def get_latest(self, meter_id: str) -> Reading | None:
        ...

    async def bulk_save(self, readings: list[Reading]) -> int:
        ...


@lru_cache
def get_reading_repository() -> ReadingRepository:
    return SQLReadingRepository()


@lru_cache
def get_alert_notifier() -> AlertNotifier:
    return BrevoEmailNotifier()


# FastAPI endpoint bruger Protocols (ikke konkrete klasser):
@router.post("/readings/ingest/{meter_id}")
async def ingest_reading(
    meter_id: str,
    repo: ReadingRepository = Depends(get_reading_repository),
    notifier: AlertNotifier = Depends(get_alert_notifier),
) -> ReadingOut:
    service = ReadingService(repo, notifier, source=WmbusListener())
    reading = await service.ingest(meter_id)
    return ReadingOut.model_validate(asdict(reading))

Test override med Protocol

# server/tests/conftest.py

class MockReadingRepository:
    """In-memory implementation — satisfier ReadingRepository Protocol."""
    def __init__(self):
        self._store: list[Reading] = []

    async def save(self, reading: Reading) -> None:
        self._store.append(reading)

    async def get_latest(self, meter_id: str) -> Reading | None:
        matches = [r for r in self._store if r.meter_id == meter_id]
        return matches[-1] if matches else None

    async def bulk_save(self, readings: list[Reading]) -> int:
        self._store.extend(readings)
        return len(readings)


@pytest.fixture
def mock_repo() -> MockReadingRepository:
    return MockReadingRepository()


# Test bruger mock — ingen database:
@pytest.mark.asyncio
async def test_ingest_saves_reading(mock_repo):
    notifier = MockAlertNotifier()
    source = MockMeterSource(value=1234.5)
    service = ReadingService(mock_repo, notifier, source)

    reading = await service.ingest("12345678")

    assert len(mock_repo._store) == 1
    assert mock_repo._store[0].value == 1234.5

runtime_checkable og isinstance

# @runtime_checkable muliggør isinstance-check:

@runtime_checkable
class HasRead(Protocol):
    async def read(self, meter_id: str) -> Reading: ...


wmbus = WmbusListener()
mock = MockMeterSource()

# isinstance virker med runtime_checkable:
assert isinstance(wmbus, HasRead)   # True (har read-metode)
assert isinstance(mock, HasRead)    # True

# Bruges i factory-funktion:
def create_source(config: dict) -> MetricSource:
    source_type = config.get("type", "wmbus")
    sources = {
        "wmbus": WmbusListener,
        "modbus": ModbusReader,
        "mock": MockMeterSource,
    }
    cls = sources.get(source_type, WmbusListener)
    source = cls()
    
    if not isinstance(source, MetricSource):
        raise TypeError(f"{cls} satisfier ikke MetricSource Protocol")
    
    return source

Konklusion

Protocol giver strukturel subtyping — klasser behøver ikke arve, blot implementere de nødvendige metoder. Dette er idéelt til IoT-platforme med multiple sensor-sources (wM-Bus, Modbus, Mock) og til dependency injection i FastAPI via Depends(). @runtime_checkable muliggør isinstance-check i factory-funktioner. Til tests erstatter simple mock-klasser komplekse stubs — ingen unittest.mock.MagicMock nødvendig.

Se Python type hints avancerede mønstre eller FastAPI dependency injection guide.