import os import subprocess from pathlib import Path from typing import Callable, Dict, Any from dotenv import load_dotenv class EnvValidationError(RuntimeError): """Raised when required environment variables are missing or malformed.""" def load_root_env(file_path: str) -> Path: """Load repository root .env by walking up from file_path.""" start = Path(file_path).resolve() for parent in [start.parent, *start.parents]: env_file = parent / ".env" if env_file.exists(): load_dotenv(env_file, override=True) return env_file raise EnvValidationError(f"Root .env was not found for {file_path}") def as_int(raw: str) -> int: return int(str(raw).strip()) def as_float(raw: str) -> float: return float(str(raw).strip()) def as_str(raw: str) -> str: value = str(raw).strip() if value.startswith("\"") and value.endswith("\""): value = value[1:-1] if value.startswith("'") and value.endswith("'"): value = value[1:-1] return value def as_bool(raw: str) -> bool: value = as_str(raw).lower() if value in {"1", "true", "yes", "y", "on"}: return True if value in {"0", "false", "no", "n", "off"}: return False raise ValueError("expected one of 1/0 true/false yes/no on/off") def validate_env(source: str, schema: Dict[str, Callable[[str], Any]]) -> Dict[str, Any]: """Validate required env vars against simple caster schema.""" values: Dict[str, Any] = {} errors = [] for key, caster in schema.items(): raw = os.getenv(key) if raw is None or str(raw).strip() == "": errors.append(f"{key}: missing") continue try: values[key] = caster(raw) except Exception as exc: # pragma: no cover - used in runtime only errors.append(f"{key}: invalid value {raw!r} ({exc})") if errors: msg = "\n - " + "\n - ".join(errors) raise EnvValidationError(f"[{source}] invalid .env configuration:{msg}") return values def resolve_hackrf_index(serial_env_key: str, source: str) -> str: """Resolve HackRF osmosdr selector from expected serial in env.""" serial = validate_env(source, {serial_env_key: as_str})[serial_env_key] try: output = subprocess.check_output( "lsusb -v -d 1d50:6089 | grep iSerial", shell=True, text=True, ) except subprocess.CalledProcessError as exc: raise EnvValidationError( f"[{source}] could not read HackRF serials via lsusb: {exc}" ) from exc lines = [line.strip() for line in output.splitlines() if line.strip()] if not lines: raise EnvValidationError( f"[{source}] no HackRF devices found (lsusb returned empty serial list)" ) serials = [line.split()[-1] for line in lines] if serial not in serials: raise EnvValidationError( f"[{source}] serial {serial!r} not found among connected HackRF devices: {serials}" ) return serial