M-Bus Gateway
← Tilbage til blog
· Python· subprocess· asyncio· wmbusmeters· IoT· gateway· JSON· sikkerhed

Python subprocess til IoT — wmbusmeters og system-kommandoer

Python subprocess til IoT gateway: wmbusmeters JSON-output parsing, asyncio.create_subprocess_exec, timeout, stderr håndtering, streaming output og sikkerhedshensyn.

Af M-Bus Gateway

Gateway-platformen bruger subprocess til at køre wmbusmeters og system-kommandoer. Her er de sikre, produktionsklare mønstre.


wmbusmeters subprocess — grundlæggende

# gateway/src/wmbus/listener.py

import asyncio
import json
import structlog
from pathlib import Path


log = structlog.get_logger()
WMBUSMETERS_BIN = Path("/usr/local/bin/wmbusmeters")


async def run_wmbusmeters(
    device: str = "/dev/wmbus",
    config_dir: str = "/etc/wmbusmeters.d",
    mode: str = "c1",
) -> asyncio.subprocess.Process:
    """
    Start wmbusmeters som subprocess.
    Returnerer process-objekt — lytter kontinuerligt.
    """
    cmd = [
        str(WMBUSMETERS_BIN),
        "--format=json",           # JSON output til stdout
        "--logfile=stderr",        # Logs til stderr
        "--no-init-file",          # Ingen global config
        f"--conf={config_dir}",    # Per-måler konfig-filer
        device,                    # USB dongle
        mode,                      # c1, t1, s1, c1t1
    ]

    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    log.info("wmbusmeters.started", pid=process.pid, mode=mode)
    return process


async def stream_telegrams(process: asyncio.subprocess.Process):
    """
    Læs wmbusmeters JSON-output linje for linje.
    Hver linje er ét komplet telegram i JSON-format.
    """
    while True:
        line = await process.stdout.readline()
        if not line:
            break    # EOF — wmbusmeters stoppet

        line_str = line.decode("utf-8").strip()
        if not line_str:
            continue

        try:
            telegram = json.loads(line_str)
            yield telegram
        except json.JSONDecodeError as e:
            log.warning("telegram.json_parse_error", line=line_str[:100], error=str(e))

Async streaming med timeout

# gateway/src/wmbus/listener.py — udvidet med timeout og restart

import asyncio
from datetime import datetime, timedelta


async def listen_with_restart(
    device: str,
    on_telegram: callable,
    restart_on_silence_minutes: int = 60,
):
    """
    Kør wmbusmeters med automatisk genstart ved:
    - Process crash
    - Ingen telegrams i X minutter (enhed muligvis hængt)
    """
    while True:
        process = await run_wmbusmeters(device)
        last_received = datetime.utcnow()

        try:
            async with asyncio.timeout(restart_on_silence_minutes * 60):
                async for telegram in stream_telegrams(process):
                    last_received = datetime.utcnow()
                    await on_telegram(telegram)

            # Ingen timeout → process stoppede selv → genstart:
            log.warning("wmbusmeters.stopped_unexpectedly")

        except TimeoutError:
            log.warning(
                "wmbusmeters.silence_timeout",
                minutes=restart_on_silence_minutes,
                last_received=last_received.isoformat(),
            )
            process.kill()

        except Exception as e:
            log.error("wmbusmeters.error", error=str(e))
            process.kill()

        finally:
            # Vent på process-afslutning (undgå zombie):
            try:
                await asyncio.wait_for(process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                process.kill()

        # Kort pause inden genstart (undgå rapid restart loop):
        await asyncio.sleep(5)
        log.info("wmbusmeters.restarting")

Stderr-håndtering og fejldiagnose

# gateway/src/wmbus/diagnostics.py
# Analyser wmbusmeters stderr for fejl og advarsler

import asyncio
import re


AES_ERROR_PATTERN = re.compile(r"DEC_ERR|decryption failed|bad key")
DRIVER_PATTERN = re.compile(r"Using driver (\w+) for meter")
RSSI_PATTERN = re.compile(r"rssi=(-?\d+)")


async def monitor_stderr(process: asyncio.subprocess.Process):
    """Læs stderr fra wmbusmeters og ekstraher diagnostikdata."""
    async for line_bytes in process.stderr:
        line = line_bytes.decode("utf-8").strip()
        if not line:
            continue

        # Detektér AES-fejl:
        if AES_ERROR_PATTERN.search(line):
            log.error("wmbusmeters.aes_error", line=line)
            # Alarm: Måler har forkert AES-nøgle
            await send_alarm("aes_decryption_failed", line)

        # Detektér driver-valg:
        elif m := DRIVER_PATTERN.search(line):
            log.info("wmbusmeters.driver_selected", driver=m.group(1))

        # Generelle fejl:
        elif "ERROR" in line.upper():
            log.error("wmbusmeters.stderr_error", line=line)

        # Debug:
        else:
            log.debug("wmbusmeters.stderr", line=line)


# Kør begge streams parallelt:
async def run_listener():
    process = await run_wmbusmeters("/dev/wmbus")

    # Parallel: telegram-stream + stderr-monitoring
    async with asyncio.TaskGroup() as tg:
        tg.create_task(stream_telegrams_task(process))
        tg.create_task(monitor_stderr(process))

One-shot subprocess-kald

# gateway/src/system/commands.py
# Kortvarige subprocess-kald til system-information

import asyncio


async def get_wmbusmeters_version() -> str:
    """Hent wmbusmeters versionsnummer."""
    result = await asyncio.create_subprocess_exec(
        "/usr/local/bin/wmbusmeters",
        "--version",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await asyncio.wait_for(
        result.communicate(),
        timeout=10.0,
    )

    if result.returncode != 0:
        raise RuntimeError(f"wmbusmeters --version fejlede: {stderr.decode()}")

    # Format: "wmbusmeters: 1.14.0"
    version_line = stdout.decode().strip()
    return version_line.split(":")[-1].strip()


async def restart_systemd_service(service: str) -> bool:
    """Genstart systemd-service (kræver sudo-adgang)."""
    result = await asyncio.create_subprocess_exec(
        "systemctl", "restart", service,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await asyncio.wait_for(
        result.communicate(),
        timeout=30.0,
    )
    return result.returncode == 0


async def check_usb_device_present(device: str = "/dev/wmbus") -> bool:
    """Tjek om USB wM-Bus dongle er tilsluttet."""
    from pathlib import Path
    return Path(device).exists()

Sikkerhed — undgå shell injection

# FARLIGT — shell=True med bruger-input:
import subprocess

meter_id = request.query_params["meter_id"]  # Kan indeholde '; rm -rf /'
result = subprocess.run(
    f"wmbusmeters --filter={meter_id}",  # INJECTION RISIKO
    shell=True,    # Aldrig med bruger-input
)


# SIKKERT — liste af argumenter (ingen shell):
async def filter_by_meter(meter_id: str) -> bytes:
    """Sikkert subprocess-kald med bruger-kontrolleret input."""
    # Valider input strikt:
    if not re.match(r'^[0-9a-fA-F]{8}$', meter_id):
        raise ValueError(f"Ugyldigt meter_id format: {meter_id}")

    # Liste (ikke string) → ingen shell injection mulig:
    result = await asyncio.create_subprocess_exec(
        "/usr/local/bin/wmbusmeters",
        f"--filter-field=id={meter_id}",    # Kontrolleret argument
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, _ = await result.communicate()
    return stdout

# Regel: Brug ALDRIG shell=True med input der kommer fra brugere
# Brug ALTID liste-form: ["cmd", "arg1", "arg2"]
# Valider ALTID input før det bruges som subprocess-argument

OTA-installation via subprocess

# gateway/src/ota/installer.py
# Anvend OTA-pakke via subprocess systemd-kommandoer

import asyncio
from pathlib import Path


async def apply_ota_package(package_path: Path, version: str) -> bool:
    """
    Installer OTA-pakke:
    1. Udpak tar.gz
    2. Kør install-script
    3. Genstart service
    """
    # Udpak pakke:
    extract_result = await asyncio.create_subprocess_exec(
        "tar", "-xzf", str(package_path), "-C", "/opt/mbus-gateway-new/",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    _, stderr = await asyncio.wait_for(extract_result.communicate(), timeout=60)

    if extract_result.returncode != 0:
        log.error("ota.extract_failed", version=version, stderr=stderr.decode())
        return False

    # Kør install-script:
    install_result = await asyncio.create_subprocess_exec(
        "/opt/mbus-gateway-new/install.sh",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    _, stderr = await asyncio.wait_for(install_result.communicate(), timeout=120)

    if install_result.returncode != 0:
        log.error("ota.install_failed", version=version, stderr=stderr.decode())
        return False

    # Genstart service (systemd vil starte den nye version):
    restart_result = await asyncio.create_subprocess_exec(
        "systemctl", "restart", "mbus-gateway",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    await asyncio.wait_for(restart_result.communicate(), timeout=30)

    log.info("ota.applied", version=version)
    return True

Konklusion

asyncio.create_subprocess_exec er den korrekte tilgang til langvarige processer som wmbusmeters — aldrig subprocess.run() i async kode. Brug altid liste-form (ikke shell=True) til at forhindre injection-angreb, selv med intern input. Stderr-monitoring i parallel asyncio-task fanger AES-fejl og driver-problemer i realtid. Timeout via asyncio.wait_for forhindrer at én hængt subprocess blokerer gateway-systemet. Genstart-logik med 5-sekunders backoff undgår rapid restart loops ved vedvarende fejl.

Se gateway fejlsøgning guide eller wmbusmeters installation guide.