You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/common/runtime.py

100 lines
3.0 KiB
Python

import os
import subprocess
from pathlib import Path
from typing import Callable, Dict, Any
from dotenv import load_dotenv
class EnvValidationError(RuntimeError):
"""Raised when required environment variables are missing or malformed."""
def load_root_env(file_path: str) -> Path:
"""Load repository root .env by walking up from file_path."""
start = Path(file_path).resolve()
for parent in [start.parent, *start.parents]:
env_file = parent / ".env"
if env_file.exists():
load_dotenv(env_file, override=True)
return env_file
raise EnvValidationError(f"Root .env was not found for {file_path}")
def as_int(raw: str) -> int:
return int(str(raw).strip())
def as_float(raw: str) -> float:
return float(str(raw).strip())
def as_str(raw: str) -> str:
value = str(raw).strip()
if value.startswith("\"") and value.endswith("\""):
value = value[1:-1]
if value.startswith("'") and value.endswith("'"):
value = value[1:-1]
return value
def as_bool(raw: str) -> bool:
value = as_str(raw).lower()
if value in {"1", "true", "yes", "y", "on"}:
return True
if value in {"0", "false", "no", "n", "off"}:
return False
raise ValueError("expected one of 1/0 true/false yes/no on/off")
def validate_env(source: str, schema: Dict[str, Callable[[str], Any]]) -> Dict[str, Any]:
"""Validate required env vars against simple caster schema."""
values: Dict[str, Any] = {}
errors = []
for key, caster in schema.items():
raw = os.getenv(key)
if raw is None or str(raw).strip() == "":
errors.append(f"{key}: missing")
continue
try:
values[key] = caster(raw)
except Exception as exc: # pragma: no cover - used in runtime only
errors.append(f"{key}: invalid value {raw!r} ({exc})")
if errors:
msg = "\n - " + "\n - ".join(errors)
raise EnvValidationError(f"[{source}] invalid .env configuration:{msg}")
return values
def resolve_hackrf_index(serial_env_key: str, source: str) -> str:
"""Resolve HackRF osmosdr selector from expected serial in env."""
serial = validate_env(source, {serial_env_key: as_str})[serial_env_key]
try:
output = subprocess.check_output(
"lsusb -v -d 1d50:6089 | grep iSerial",
shell=True,
text=True,
)
except subprocess.CalledProcessError as exc:
raise EnvValidationError(
f"[{source}] could not read HackRF serials via lsusb: {exc}"
) from exc
lines = [line.strip() for line in output.splitlines() if line.strip()]
if not lines:
raise EnvValidationError(
f"[{source}] no HackRF devices found (lsusb returned empty serial list)"
)
serials = [line.split()[-1] for line in lines]
if serial not in serials:
raise EnvValidationError(
f"[{source}] serial {serial!r} not found among connected HackRF devices: {serials}"
)
return serial