#!/usr/bin/env python3 import argparse import math import re import signal import subprocess import sys import threading import time from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Sequence, Tuple try: import numpy as np except Exception as exc: print(f"numpy import failed: {exc}", file=sys.stderr) sys.exit(1) try: from gnuradio import blocks, gr import osmosdr except Exception as exc: print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr) print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy.py", file=sys.stderr) sys.exit(1) EPS = 1e-20 IIO_MIN_SAMPLE_RATE = 2083333 @dataclass class Target: label: str device: str freq_hz: float source: str @dataclass class Row: label: str device: str index: Optional[str] = None freq_hz: float = 0.0 status: str = "INIT" rms: Optional[float] = None power_lin: Optional[float] = None dbfs: Optional[float] = None samples: int = 0 updated_at: float = 0.0 error: str = "" def label_sort_key(label: str) -> Tuple[int, float | str]: try: return (0, float(label)) except ValueError: return (1, label) class HackRfProbeTop(gr.top_block): def __init__( self, serial: str, freq_hz: float, sample_rate: float, vec_len: int, gain: float, if_gain: float, bb_gain: float, ): super().__init__("hackrf_energy_probe") self.probe = blocks.probe_signal_vc(vec_len) self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}") self.src.set_time_unknown_pps(osmosdr.time_spec_t()) self.src.set_sample_rate(sample_rate) self.src.set_center_freq(freq_hz, 0) try: self.src.set_freq_corr(0, 0) except Exception: pass for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): try: getattr(self.src, fn)(val, 0) except Exception as exc: raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc try: self.src.set_bandwidth(0, 0) except Exception: pass try: self.src.set_antenna("", 0) except Exception: pass self.connect((self.src, 0), (self.stream_to_vec, 0)) self.connect((self.stream_to_vec, 0), (self.probe, 0)) def read_metrics(self) -> Tuple[float, float, float, int]: arr = np.asarray(self.probe.level(), dtype=np.complex64) if arr.size == 0: raise RuntimeError("no samples") power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) rms = math.sqrt(max(power_lin, 0.0)) dbfs = 10.0 * math.log10(max(power_lin, EPS)) return rms, power_lin, dbfs, int(arr.size) class HackRfWorker(threading.Thread): def __init__( self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row], lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace, ): super().__init__(daemon=True) self.target = target self.serial_to_index = serial_to_index self.rows = rows self.lock = lock self.stop_event = stop_event self.args = args self.tb: Optional[HackRfProbeTop] = None def _set_row(self, **kwargs): with self.lock: row = self.rows[self.target.label] for key, value in kwargs.items(): setattr(row, key, value) row.updated_at = time.time() def _open(self) -> bool: if self.target.device not in self.serial_to_index: self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None) return False idx = self.serial_to_index[self.target.device] self._set_row(index=str(idx), status="OPENING", error="", freq_hz=self.target.freq_hz) try: self.tb = HackRfProbeTop( self.target.device, self.target.freq_hz, self.args.sample_rate, self.args.vec_len, self.args.gain, self.args.if_gain, self.args.bb_gain, ) self.tb.start() time.sleep(0.15) self._set_row(status="OK", error="") return True except Exception as exc: message = str(exc) status = "BUSY" if ("Resource busy" in message or "-1000" in message) else "ERR" self._set_row(status=status, error=message) self.tb = None return False def _close(self): if self.tb is None: return try: self.tb.stop() self.tb.wait() except Exception: pass self.tb = None def run(self): time.sleep(self.args.stagger) while not self.stop_event.is_set(): if self.tb is None and not self._open(): if self.stop_event.wait(self.args.reopen_delay): break continue try: rms, power_lin, dbfs, samples = self.tb.read_metrics() self._set_row( status="OK", rms=rms, power_lin=power_lin, dbfs=dbfs, samples=samples, error="", ) except Exception as exc: self._set_row(status="ERR", error=str(exc)) self._close() if self.stop_event.wait(self.args.reopen_delay): break continue if self.stop_event.wait(self.args.interval): break self._close() class IioProbe: def __init__(self, args: argparse.Namespace): self.args = args self._static_signature: Optional[Tuple[float, float, str, Optional[float], str]] = None self._last_freq_hz: Optional[int] = None self._input_channels = [args.iio_i_channel] if args.iio_q_channel not in self._input_channels: self._input_channels.append(args.iio_q_channel) def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes: proc = subprocess.run( list(cmd), capture_output=True, text=not binary, ) if proc.returncode != 0: stderr = proc.stderr if binary else (proc.stderr or "") stdout = "" if binary else (proc.stdout or "") details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}" raise RuntimeError(details) return proc.stdout def _set_input_attr(self, channel: str, attr: str, value: str): self._run( [ "iio_attr", "-u", self.args.uri, "-q", "-i", "-c", self.args.iio_phy_device, channel, attr, value, ] ) def _set_output_attr(self, channel: str, attr: str, value: str): self._run( [ "iio_attr", "-u", self.args.uri, "-q", "-o", "-c", self.args.iio_phy_device, channel, attr, value, ] ) def ensure_configured(self): sample_rate = None if self.args.sample_rate is not None: sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE))) bandwidth = None if self.args.bandwidth is not None: bandwidth = int(round(max(float(self.args.bandwidth), 200000.0))) signature = ( None if sample_rate is None else float(sample_rate), None if bandwidth is None else float(bandwidth), self.args.iio_gain_mode, self.args.iio_hardwaregain, self.args.iio_port_select, ) if signature == self._static_signature: return for channel in self._input_channels: if sample_rate is not None: self._set_input_attr(channel, "sampling_frequency", str(sample_rate)) if bandwidth is not None: self._set_input_attr(channel, "rf_bandwidth", str(bandwidth)) if self.args.iio_port_select: self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select) if self.args.iio_gain_mode: self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode) if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None: self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}") self._static_signature = signature def tune(self, freq_hz: float): target = int(round(freq_hz)) if self._last_freq_hz == target: return self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target)) self._last_freq_hz = target if self.args.settle > 0: time.sleep(self.args.settle) def read_metrics(self) -> Tuple[float, float, float, int]: cmd = [ "iio_readdev", "-u", self.args.uri, "-T", str(int(self.args.timeout_ms)), "-b", str(max(4, int(self.args.vec_len))), "-s", str(max(4, int(self.args.vec_len))), self.args.iio_device, self.args.iio_i_channel, self.args.iio_q_channel, ] raw = self._run(cmd, binary=True) values = np.frombuffer(raw, dtype=" Dict[str, str]: out: Dict[str, str] = {} if not path.exists(): return out for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) out[key.strip()] = value.strip().strip('"').strip("'") return out def collect_hackrf_targets( env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float], ) -> List[Target]: targets: List[Target] = [] for key, value in env.items(): match = re.fullmatch(r"hack_(\d+)", key) if match: label = match.group(1) else: match = re.fullmatch(r"HACKID_(\d+)", key) if not match: continue label = match.group(1) if only and label not in only: continue mhz = override_freq_mhz if override_freq_mhz is not None else float(label) targets.append(Target(label=label, device=value.lower(), freq_hz=mhz * 1e6, source=key)) unique: Dict[str, Target] = {} for target in sorted(targets, key=lambda item: (label_sort_key(item.label), 0 if item.source.startswith("hack_") else 1)): unique.setdefault(target.label, target) return [unique[key] for key in sorted(unique, key=label_sort_key)] def collect_iio_targets( env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float], uri: str, ) -> List[Target]: if only: labels = set(only) else: labels = set() for key in env: for pattern in (r"c_freq_(\d+)", r"hack_(\d+)", r"HACKID_(\d+)"): match = re.fullmatch(pattern, key) if match: labels.add(match.group(1)) break if not labels and override_freq_mhz is not None: labels.add(str(int(override_freq_mhz) if float(override_freq_mhz).is_integer() else override_freq_mhz)) targets: List[Target] = [] for label in sorted(labels, key=label_sort_key): raw_freq = env.get(f"c_freq_{label}", label) try: mhz = override_freq_mhz if override_freq_mhz is not None else float(raw_freq) except ValueError as exc: raise ValueError(f"cannot resolve frequency for target {label!r}") from exc targets.append(Target(label=label, device=uri, freq_hz=mhz * 1e6, source="iio")) return targets def parse_hackrf_info() -> Dict[str, int]: try: proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) except FileNotFoundError as exc: raise RuntimeError("hackrf_info not found") from exc except subprocess.TimeoutExpired as exc: raise RuntimeError("hackrf_info timeout") from exc text = (proc.stdout or "") + "\n" + (proc.stderr or "") out: Dict[str, int] = {} cur_idx: Optional[int] = None for line in text.splitlines(): match = re.search(r"^Index:\s*(\d+)", line) if match: cur_idx = int(match.group(1)) continue match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) if match and cur_idx is not None: out[match.group(1).lower()] = cur_idx if not out: raise RuntimeError("no devices parsed from hackrf_info") return out def fmt(value: Optional[float], spec: str) -> str: return "-" if value is None else format(value, spec) def render(rows: Dict[str, Row], started_at: float, env_path: Path, summary: str): now = time.time() print("\x1b[2J\x1b[H", end="") print("Read Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)") print(f"env: {env_path} | {summary} | uptime: {int(now - started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}") print() header = f"{'band':>5} {'idx':>4} {'freq':>8} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>6} {'age':>5} {'device':>18} error" print(header) print("-" * len(header)) for label in sorted(rows, key=label_sort_key): row = rows[label] idx = "-" if row.index is None else str(row.index) age = "-" if row.updated_at <= 0 else f"{(now - row.updated_at):.1f}" err = row.error or "" if len(err) > 64: err = err[:61] + "..." device = row.device[-18:] print( f"{row.label:>5} {idx:>4} {row.freq_hz / 1e6:>8.1f} {row.status:>9} " f"{fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} {fmt(row.dbfs, '.2f'):>9} " f"{row.samples:>6} {age:>5} {device:>18} {err}" ) print() print("Ctrl+C to stop. Use --backend iio --uri ip:192.168.2.1 --only 2400 for Neptune.") sys.stdout.flush() def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Realtime SDR relative energy monitor") parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend") parser.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)") parser.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)") parser.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all targets (MHz)") parser.add_argument( "--sample-rate", type=float, default=None, help="Sample rate in Hz (HackRF default: 2e6, IIO default: keep current device setting)", ) parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (default: keep device setting)") parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / capture size") parser.add_argument("--interval", type=float, default=0.5, help="Per-target read interval (s)") parser.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)") parser.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after ERR/BUSY (s)") parser.add_argument("--gain", type=float, default=0.0, help="General gain for HackRF") parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF") parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF") parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1") parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device") parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device") parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel") parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel") parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency") parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value") parser.add_argument( "--iio-gain-mode", choices=("manual", "fast_attack", "slow_attack", "hybrid"), default="slow_attack", help="IIO gain control mode", ) parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode") parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds") parser.add_argument("--settle", type=float, default=0.12, help="Wait after retune before reading in IIO mode (s)") return parser def main() -> int: args = build_parser().parse_args() only = {item.strip() for item in args.only.split(",") if item.strip()} or None env_path = Path(args.env) if not env_path.is_absolute(): env_path = (Path(__file__).resolve().parent / env_path).resolve() env = parse_env(env_path) if args.backend == "hackrf": targets = collect_hackrf_targets(env, only=only, override_freq_mhz=args.freq_mhz) if not targets: print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr) return 2 try: serial_to_index = parse_hackrf_info() except Exception as exc: print(f"hackrf discovery failed: {exc}", file=sys.stderr) return 3 summary = f"backend=hackrf | discovered={len(serial_to_index)}" if args.sample_rate is None: args.sample_rate = 2e6 else: try: targets = collect_iio_targets(env, only=only, override_freq_mhz=args.freq_mhz, uri=args.uri) except ValueError as exc: print(str(exc), file=sys.stderr) return 2 if not targets: print( f"No IIO targets resolved from {env_path}. Use c_freq_* in .env or pass --only / --freq-mhz.", file=sys.stderr, ) return 2 summary = f"backend=iio | uri={args.uri} | device={args.iio_device} | targets={len(targets)}" rows = {target.label: Row(label=target.label, device=target.device, freq_hz=target.freq_hz) for target in targets} lock = threading.Lock() stop_event = threading.Event() def on_signal(signum, frame): stop_event.set() signal.signal(signal.SIGINT, on_signal) signal.signal(signal.SIGTERM, on_signal) workers: List[threading.Thread] = [] if args.backend == "hackrf": for idx, target in enumerate(targets): worker_args = argparse.Namespace(**vars(args)) worker_args.stagger = idx * 0.15 worker = HackRfWorker(target, serial_to_index, rows, lock, stop_event, worker_args) workers.append(worker) worker.start() else: worker = IioWorker(targets, rows, lock, stop_event, args) workers.append(worker) worker.start() started = time.time() try: while not stop_event.is_set(): with lock: snapshot = {key: Row(**vars(value)) for key, value in rows.items()} render(snapshot, started, env_path, summary) stop_event.wait(args.refresh) finally: stop_event.set() for worker in workers: worker.join(timeout=2) return 0 if __name__ == "__main__": raise SystemExit(main())