Python subprocess til IoT — wmbusmeters og system-kommandoer
Python subprocess til IoT gateway: wmbusmeters JSON-output parsing, asyncio.create_subprocess_exec, timeout, stderr håndtering, streaming output og sikkerhedshensyn.
Af M-Bus Gateway
Gateway-platformen bruger subprocess til at køre wmbusmeters og system-kommandoer. Her er de sikre, produktionsklare mønstre.
wmbusmeters subprocess — grundlæggende
# gateway/src/wmbus/listener.py
import asyncio
import json
import structlog
from pathlib import Path
log = structlog.get_logger()
WMBUSMETERS_BIN = Path("/usr/local/bin/wmbusmeters")
async def run_wmbusmeters(
device: str = "/dev/wmbus",
config_dir: str = "/etc/wmbusmeters.d",
mode: str = "c1",
) -> asyncio.subprocess.Process:
"""
Start wmbusmeters som subprocess.
Returnerer process-objekt — lytter kontinuerligt.
"""
cmd = [
str(WMBUSMETERS_BIN),
"--format=json", # JSON output til stdout
"--logfile=stderr", # Logs til stderr
"--no-init-file", # Ingen global config
f"--conf={config_dir}", # Per-måler konfig-filer
device, # USB dongle
mode, # c1, t1, s1, c1t1
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
log.info("wmbusmeters.started", pid=process.pid, mode=mode)
return process
async def stream_telegrams(process: asyncio.subprocess.Process):
"""
Læs wmbusmeters JSON-output linje for linje.
Hver linje er ét komplet telegram i JSON-format.
"""
while True:
line = await process.stdout.readline()
if not line:
break # EOF — wmbusmeters stoppet
line_str = line.decode("utf-8").strip()
if not line_str:
continue
try:
telegram = json.loads(line_str)
yield telegram
except json.JSONDecodeError as e:
log.warning("telegram.json_parse_error", line=line_str[:100], error=str(e))
Async streaming med timeout
# gateway/src/wmbus/listener.py — udvidet med timeout og restart
import asyncio
from datetime import datetime, timedelta
async def listen_with_restart(
device: str,
on_telegram: callable,
restart_on_silence_minutes: int = 60,
):
"""
Kør wmbusmeters med automatisk genstart ved:
- Process crash
- Ingen telegrams i X minutter (enhed muligvis hængt)
"""
while True:
process = await run_wmbusmeters(device)
last_received = datetime.utcnow()
try:
async with asyncio.timeout(restart_on_silence_minutes * 60):
async for telegram in stream_telegrams(process):
last_received = datetime.utcnow()
await on_telegram(telegram)
# Ingen timeout → process stoppede selv → genstart:
log.warning("wmbusmeters.stopped_unexpectedly")
except TimeoutError:
log.warning(
"wmbusmeters.silence_timeout",
minutes=restart_on_silence_minutes,
last_received=last_received.isoformat(),
)
process.kill()
except Exception as e:
log.error("wmbusmeters.error", error=str(e))
process.kill()
finally:
# Vent på process-afslutning (undgå zombie):
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
process.kill()
# Kort pause inden genstart (undgå rapid restart loop):
await asyncio.sleep(5)
log.info("wmbusmeters.restarting")
Stderr-håndtering og fejldiagnose
# gateway/src/wmbus/diagnostics.py
# Analyser wmbusmeters stderr for fejl og advarsler
import asyncio
import re
AES_ERROR_PATTERN = re.compile(r"DEC_ERR|decryption failed|bad key")
DRIVER_PATTERN = re.compile(r"Using driver (\w+) for meter")
RSSI_PATTERN = re.compile(r"rssi=(-?\d+)")
async def monitor_stderr(process: asyncio.subprocess.Process):
"""Læs stderr fra wmbusmeters og ekstraher diagnostikdata."""
async for line_bytes in process.stderr:
line = line_bytes.decode("utf-8").strip()
if not line:
continue
# Detektér AES-fejl:
if AES_ERROR_PATTERN.search(line):
log.error("wmbusmeters.aes_error", line=line)
# Alarm: Måler har forkert AES-nøgle
await send_alarm("aes_decryption_failed", line)
# Detektér driver-valg:
elif m := DRIVER_PATTERN.search(line):
log.info("wmbusmeters.driver_selected", driver=m.group(1))
# Generelle fejl:
elif "ERROR" in line.upper():
log.error("wmbusmeters.stderr_error", line=line)
# Debug:
else:
log.debug("wmbusmeters.stderr", line=line)
# Kør begge streams parallelt:
async def run_listener():
process = await run_wmbusmeters("/dev/wmbus")
# Parallel: telegram-stream + stderr-monitoring
async with asyncio.TaskGroup() as tg:
tg.create_task(stream_telegrams_task(process))
tg.create_task(monitor_stderr(process))
One-shot subprocess-kald
# gateway/src/system/commands.py
# Kortvarige subprocess-kald til system-information
import asyncio
async def get_wmbusmeters_version() -> str:
"""Hent wmbusmeters versionsnummer."""
result = await asyncio.create_subprocess_exec(
"/usr/local/bin/wmbusmeters",
"--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
result.communicate(),
timeout=10.0,
)
if result.returncode != 0:
raise RuntimeError(f"wmbusmeters --version fejlede: {stderr.decode()}")
# Format: "wmbusmeters: 1.14.0"
version_line = stdout.decode().strip()
return version_line.split(":")[-1].strip()
async def restart_systemd_service(service: str) -> bool:
"""Genstart systemd-service (kræver sudo-adgang)."""
result = await asyncio.create_subprocess_exec(
"systemctl", "restart", service,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
result.communicate(),
timeout=30.0,
)
return result.returncode == 0
async def check_usb_device_present(device: str = "/dev/wmbus") -> bool:
"""Tjek om USB wM-Bus dongle er tilsluttet."""
from pathlib import Path
return Path(device).exists()
Sikkerhed — undgå shell injection
# FARLIGT — shell=True med bruger-input:
import subprocess
meter_id = request.query_params["meter_id"] # Kan indeholde '; rm -rf /'
result = subprocess.run(
f"wmbusmeters --filter={meter_id}", # INJECTION RISIKO
shell=True, # Aldrig med bruger-input
)
# SIKKERT — liste af argumenter (ingen shell):
async def filter_by_meter(meter_id: str) -> bytes:
"""Sikkert subprocess-kald med bruger-kontrolleret input."""
# Valider input strikt:
if not re.match(r'^[0-9a-fA-F]{8}$', meter_id):
raise ValueError(f"Ugyldigt meter_id format: {meter_id}")
# Liste (ikke string) → ingen shell injection mulig:
result = await asyncio.create_subprocess_exec(
"/usr/local/bin/wmbusmeters",
f"--filter-field=id={meter_id}", # Kontrolleret argument
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await result.communicate()
return stdout
# Regel: Brug ALDRIG shell=True med input der kommer fra brugere
# Brug ALTID liste-form: ["cmd", "arg1", "arg2"]
# Valider ALTID input før det bruges som subprocess-argument
OTA-installation via subprocess
# gateway/src/ota/installer.py
# Anvend OTA-pakke via subprocess systemd-kommandoer
import asyncio
from pathlib import Path
async def apply_ota_package(package_path: Path, version: str) -> bool:
"""
Installer OTA-pakke:
1. Udpak tar.gz
2. Kør install-script
3. Genstart service
"""
# Udpak pakke:
extract_result = await asyncio.create_subprocess_exec(
"tar", "-xzf", str(package_path), "-C", "/opt/mbus-gateway-new/",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await asyncio.wait_for(extract_result.communicate(), timeout=60)
if extract_result.returncode != 0:
log.error("ota.extract_failed", version=version, stderr=stderr.decode())
return False
# Kør install-script:
install_result = await asyncio.create_subprocess_exec(
"/opt/mbus-gateway-new/install.sh",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await asyncio.wait_for(install_result.communicate(), timeout=120)
if install_result.returncode != 0:
log.error("ota.install_failed", version=version, stderr=stderr.decode())
return False
# Genstart service (systemd vil starte den nye version):
restart_result = await asyncio.create_subprocess_exec(
"systemctl", "restart", "mbus-gateway",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await asyncio.wait_for(restart_result.communicate(), timeout=30)
log.info("ota.applied", version=version)
return True
Konklusion
asyncio.create_subprocess_exec er den korrekte tilgang til langvarige processer som wmbusmeters — aldrig subprocess.run() i async kode. Brug altid liste-form (ikke shell=True) til at forhindre injection-angreb, selv med intern input. Stderr-monitoring i parallel asyncio-task fanger AES-fejl og driver-problemer i realtid. Timeout via asyncio.wait_for forhindrer at én hængt subprocess blokerer gateway-systemet. Genstart-logik med 5-sekunders backoff undgår rapid restart loops ved vedvarende fejl.
Se gateway fejlsøgning guide eller wmbusmeters installation guide.