You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
100 lines
3.0 KiB
Python
100 lines
3.0 KiB
Python
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Callable, Dict, Any
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
class EnvValidationError(RuntimeError):
|
|
"""Raised when required environment variables are missing or malformed."""
|
|
|
|
|
|
def load_root_env(file_path: str) -> Path:
|
|
"""Load repository root .env by walking up from file_path."""
|
|
start = Path(file_path).resolve()
|
|
for parent in [start.parent, *start.parents]:
|
|
env_file = parent / ".env"
|
|
if env_file.exists():
|
|
load_dotenv(env_file, override=True)
|
|
return env_file
|
|
raise EnvValidationError(f"Root .env was not found for {file_path}")
|
|
|
|
|
|
def as_int(raw: str) -> int:
|
|
return int(str(raw).strip())
|
|
|
|
|
|
def as_float(raw: str) -> float:
|
|
return float(str(raw).strip())
|
|
|
|
|
|
def as_str(raw: str) -> str:
|
|
value = str(raw).strip()
|
|
if value.startswith("\"") and value.endswith("\""):
|
|
value = value[1:-1]
|
|
if value.startswith("'") and value.endswith("'"):
|
|
value = value[1:-1]
|
|
return value
|
|
|
|
|
|
def as_bool(raw: str) -> bool:
|
|
value = as_str(raw).lower()
|
|
if value in {"1", "true", "yes", "y", "on"}:
|
|
return True
|
|
if value in {"0", "false", "no", "n", "off"}:
|
|
return False
|
|
raise ValueError("expected one of 1/0 true/false yes/no on/off")
|
|
|
|
|
|
def validate_env(source: str, schema: Dict[str, Callable[[str], Any]]) -> Dict[str, Any]:
|
|
"""Validate required env vars against simple caster schema."""
|
|
values: Dict[str, Any] = {}
|
|
errors = []
|
|
|
|
for key, caster in schema.items():
|
|
raw = os.getenv(key)
|
|
if raw is None or str(raw).strip() == "":
|
|
errors.append(f"{key}: missing")
|
|
continue
|
|
try:
|
|
values[key] = caster(raw)
|
|
except Exception as exc: # pragma: no cover - used in runtime only
|
|
errors.append(f"{key}: invalid value {raw!r} ({exc})")
|
|
|
|
if errors:
|
|
msg = "\n - " + "\n - ".join(errors)
|
|
raise EnvValidationError(f"[{source}] invalid .env configuration:{msg}")
|
|
|
|
return values
|
|
|
|
|
|
def resolve_hackrf_index(serial_env_key: str, source: str) -> str:
|
|
"""Resolve HackRF index from expected serial in env."""
|
|
serial = validate_env(source, {serial_env_key: as_str})[serial_env_key]
|
|
|
|
try:
|
|
output = subprocess.check_output(
|
|
"lsusb -v -d 1d50:6089 | grep iSerial",
|
|
shell=True,
|
|
text=True,
|
|
)
|
|
except subprocess.CalledProcessError as exc:
|
|
raise EnvValidationError(
|
|
f"[{source}] could not read HackRF serials via lsusb: {exc}"
|
|
) from exc
|
|
|
|
lines = [line.strip() for line in output.splitlines() if line.strip()]
|
|
if not lines:
|
|
raise EnvValidationError(
|
|
f"[{source}] no HackRF devices found (lsusb returned empty serial list)"
|
|
)
|
|
|
|
serials = [line.split()[-1] for line in lines]
|
|
if serial not in serials:
|
|
raise EnvValidationError(
|
|
f"[{source}] serial {serial!r} not found among connected HackRF devices: {serials}"
|
|
)
|
|
|
|
return str(serials.index(serial))
|