diff --git a/read_energy.py b/read_energy.py old mode 100755 new mode 100644 index 781fd50..514d9e4 --- a/read_energy.py +++ b/read_energy.py @@ -9,7 +9,7 @@ import threading import time from dataclasses import dataclass from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Sequence, Tuple try: import numpy as np @@ -26,12 +26,13 @@ except Exception as exc: sys.exit(1) EPS = 1e-20 +IIO_MIN_SAMPLE_RATE = 2083333 @dataclass class Target: label: str - serial: str + device: str freq_hz: float source: str @@ -39,8 +40,8 @@ class Target: @dataclass class Row: label: str - serial: str - index: Optional[int] = None + device: str + index: Optional[str] = None freq_hz: float = 0.0 status: str = "INIT" rms: Optional[float] = None @@ -51,13 +52,28 @@ class Row: error: str = "" -class ProbeTop(gr.top_block): - def __init__(self, index: int, freq_hz: float, sample_rate: float, vec_len: int, - gain: float, if_gain: float, bb_gain: float): +def label_sort_key(label: str) -> Tuple[int, float | str]: + try: + return (0, float(label)) + except ValueError: + return (1, label) + + +class HackRfProbeTop(gr.top_block): + def __init__( + self, + serial: str, + freq_hz: float, + sample_rate: float, + vec_len: int, + gain: float, + if_gain: float, + bb_gain: float, + ): super().__init__("hackrf_energy_probe") self.probe = blocks.probe_signal_vc(vec_len) self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) - self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") + self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}") self.src.set_time_unknown_pps(osmosdr.time_spec_t()) self.src.set_sample_rate(sample_rate) self.src.set_center_freq(freq_hz, 0) @@ -68,14 +84,14 @@ class ProbeTop(gr.top_block): for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): try: getattr(self.src, fn)(val, 0) - except Exception: - raise Exception("не ставится усиление") + except Exception as exc: + raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc try: self.src.set_bandwidth(0, 0) except Exception: pass try: - self.src.set_antenna('', 0) + self.src.set_antenna("", 0) except Exception: pass self.connect((self.src, 0), (self.stream_to_vec, 0)) @@ -85,15 +101,22 @@ class ProbeTop(gr.top_block): arr = np.asarray(self.probe.level(), dtype=np.complex64) if arr.size == 0: raise RuntimeError("no samples") - p = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) - rms = math.sqrt(max(p, 0.0)) - dbfs = 10.0 * math.log10(max(p, EPS)) - return rms, p, dbfs, int(arr.size) - - -class Worker(threading.Thread): - def __init__(self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row], - lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace): + power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) + rms = math.sqrt(max(power_lin, 0.0)) + dbfs = 10.0 * math.log10(max(power_lin, EPS)) + return rms, power_lin, dbfs, int(arr.size) + + +class HackRfWorker(threading.Thread): + def __init__( + self, + target: Target, + serial_to_index: Dict[str, int], + rows: Dict[str, Row], + lock: threading.Lock, + stop_event: threading.Event, + args: argparse.Namespace, + ): super().__init__(daemon=True) self.target = target self.serial_to_index = serial_to_index @@ -101,45 +124,52 @@ class Worker(threading.Thread): self.lock = lock self.stop_event = stop_event self.args = args - self.tb: Optional[ProbeTop] = None + self.tb: Optional[HackRfProbeTop] = None def _set_row(self, **kwargs): with self.lock: row = self.rows[self.target.label] - for k, v in kwargs.items(): - setattr(row, k, v) + for key, value in kwargs.items(): + setattr(row, key, value) row.updated_at = time.time() def _open(self) -> bool: - idx = self.serial_to_index.get(self.target.serial) - if idx is None: + if self.target.device not in self.serial_to_index: self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None) return False - self._set_row(index=idx, status="OPENING", error="", freq_hz=self.target.freq_hz) + + idx = self.serial_to_index[self.target.device] + self._set_row(index=str(idx), status="OPENING", error="", freq_hz=self.target.freq_hz) try: - self.tb = ProbeTop( - idx, self.target.freq_hz, self.args.sample_rate, self.args.vec_len, - self.args.gain, self.args.if_gain, self.args.bb_gain + self.tb = HackRfProbeTop( + self.target.device, + self.target.freq_hz, + self.args.sample_rate, + self.args.vec_len, + self.args.gain, + self.args.if_gain, + self.args.bb_gain, ) self.tb.start() time.sleep(0.15) self._set_row(status="OK", error="") return True except Exception as exc: - msg = str(exc) - status = "BUSY" if ("Resource busy" in msg or "-1000" in msg) else "ERR" - self._set_row(status=status, error=msg) + message = str(exc) + status = "BUSY" if ("Resource busy" in message or "-1000" in message) else "ERR" + self._set_row(status=status, error=message) self.tb = None return False def _close(self): - if self.tb is not None: - try: - self.tb.stop() - self.tb.wait() - except Exception: - pass - self.tb = None + if self.tb is None: + return + try: + self.tb.stop() + self.tb.wait() + except Exception: + pass + self.tb = None def run(self): time.sleep(self.args.stagger) @@ -149,8 +179,15 @@ class Worker(threading.Thread): break continue try: - rms, p, dbfs, n = self.tb.read_metrics() - self._set_row(status="OK", rms=rms, power_lin=p, dbfs=dbfs, samples=n, error="") + rms, power_lin, dbfs, samples = self.tb.read_metrics() + self._set_row( + status="OK", + rms=rms, + power_lin=power_lin, + dbfs=dbfs, + samples=samples, + error="", + ) except Exception as exc: self._set_row(status="ERR", error=str(exc)) self._close() @@ -162,6 +199,190 @@ class Worker(threading.Thread): self._close() +class IioProbe: + def __init__(self, args: argparse.Namespace): + self.args = args + self._static_signature: Optional[Tuple[float, float, str, Optional[float], str]] = None + self._last_freq_hz: Optional[int] = None + self._input_channels = [args.iio_i_channel] + if args.iio_q_channel not in self._input_channels: + self._input_channels.append(args.iio_q_channel) + + def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes: + proc = subprocess.run( + list(cmd), + capture_output=True, + text=not binary, + ) + if proc.returncode != 0: + stderr = proc.stderr if binary else (proc.stderr or "") + stdout = "" if binary else (proc.stdout or "") + details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}" + raise RuntimeError(details) + return proc.stdout + + def _set_input_attr(self, channel: str, attr: str, value: str): + self._run( + [ + "iio_attr", + "-u", + self.args.uri, + "-q", + "-i", + "-c", + self.args.iio_phy_device, + channel, + attr, + value, + ] + ) + + def _set_output_attr(self, channel: str, attr: str, value: str): + self._run( + [ + "iio_attr", + "-u", + self.args.uri, + "-q", + "-o", + "-c", + self.args.iio_phy_device, + channel, + attr, + value, + ] + ) + + def ensure_configured(self): + sample_rate = None + if self.args.sample_rate is not None: + sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE))) + + bandwidth = None + if self.args.bandwidth is not None: + bandwidth = int(round(max(float(self.args.bandwidth), 200000.0))) + + signature = ( + None if sample_rate is None else float(sample_rate), + None if bandwidth is None else float(bandwidth), + self.args.iio_gain_mode, + self.args.iio_hardwaregain, + self.args.iio_port_select, + ) + if signature == self._static_signature: + return + + for channel in self._input_channels: + if sample_rate is not None: + self._set_input_attr(channel, "sampling_frequency", str(sample_rate)) + if bandwidth is not None: + self._set_input_attr(channel, "rf_bandwidth", str(bandwidth)) + if self.args.iio_port_select: + self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select) + if self.args.iio_gain_mode: + self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode) + if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None: + self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}") + + self._static_signature = signature + + def tune(self, freq_hz: float): + target = int(round(freq_hz)) + if self._last_freq_hz == target: + return + self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target)) + self._last_freq_hz = target + if self.args.settle > 0: + time.sleep(self.args.settle) + + def read_metrics(self) -> Tuple[float, float, float, int]: + cmd = [ + "iio_readdev", + "-u", + self.args.uri, + "-T", + str(int(self.args.timeout_ms)), + "-b", + str(max(4, int(self.args.vec_len))), + "-s", + str(max(4, int(self.args.vec_len))), + self.args.iio_device, + self.args.iio_i_channel, + self.args.iio_q_channel, + ] + raw = self._run(cmd, binary=True) + values = np.frombuffer(raw, dtype=" Dict[str, str]: out: Dict[str, str] = {} if not path.exists(): @@ -170,122 +391,198 @@ def parse_env(path: Path) -> Dict[str, str]: line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue - k, v = line.split("=", 1) - out[k.strip()] = v.strip().strip('"').strip("'") + key, value = line.split("=", 1) + out[key.strip()] = value.strip().strip('"').strip("'") return out -def collect_targets(env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float]) -> List[Target]: +def collect_hackrf_targets( + env: Dict[str, str], + only: Optional[set], + override_freq_mhz: Optional[float], +) -> List[Target]: targets: List[Target] = [] - for k, v in env.items(): - m = re.fullmatch(r"hack_(\d+)", k) - if m: - label = m.group(1) + for key, value in env.items(): + match = re.fullmatch(r"hack_(\d+)", key) + if match: + label = match.group(1) else: - m = re.fullmatch(r"HACKID_(\d+)", k) - if not m: + match = re.fullmatch(r"HACKID_(\d+)", key) + if not match: continue - label = m.group(1) + label = match.group(1) if only and label not in only: continue mhz = override_freq_mhz if override_freq_mhz is not None else float(label) - targets.append(Target(label=label, serial=v.lower(), freq_hz=mhz * 1e6, source=k)) + targets.append(Target(label=label, device=value.lower(), freq_hz=mhz * 1e6, source=key)) + + unique: Dict[str, Target] = {} + for target in sorted(targets, key=lambda item: (label_sort_key(item.label), 0 if item.source.startswith("hack_") else 1)): + unique.setdefault(target.label, target) + return [unique[key] for key in sorted(unique, key=label_sort_key)] + + +def collect_iio_targets( + env: Dict[str, str], + only: Optional[set], + override_freq_mhz: Optional[float], + uri: str, +) -> List[Target]: + if only: + labels = set(only) + else: + labels = set() + for key in env: + for pattern in (r"c_freq_(\d+)", r"hack_(\d+)", r"HACKID_(\d+)"): + match = re.fullmatch(pattern, key) + if match: + labels.add(match.group(1)) + break + + if not labels and override_freq_mhz is not None: + labels.add(str(int(override_freq_mhz) if float(override_freq_mhz).is_integer() else override_freq_mhz)) - uniq: Dict[str, Target] = {} - for t in sorted(targets, key=lambda x: (int(x.label), 0 if x.source.startswith("hack_") else 1)): - uniq.setdefault(t.label, t) - return [uniq[k] for k in sorted(uniq, key=lambda x: int(x))] + targets: List[Target] = [] + for label in sorted(labels, key=label_sort_key): + raw_freq = env.get(f"c_freq_{label}", label) + try: + mhz = override_freq_mhz if override_freq_mhz is not None else float(raw_freq) + except ValueError as exc: + raise ValueError(f"cannot resolve frequency for target {label!r}") from exc + targets.append(Target(label=label, device=uri, freq_hz=mhz * 1e6, source="iio")) + return targets def parse_hackrf_info() -> Dict[str, int]: try: proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) - except FileNotFoundError: - raise RuntimeError("hackrf_info not found") - except subprocess.TimeoutExpired: - raise RuntimeError("hackrf_info timeout") + except FileNotFoundError as exc: + raise RuntimeError("hackrf_info not found") from exc + except subprocess.TimeoutExpired as exc: + raise RuntimeError("hackrf_info timeout") from exc text = (proc.stdout or "") + "\n" + (proc.stderr or "") out: Dict[str, int] = {} cur_idx: Optional[int] = None for line in text.splitlines(): - m = re.search(r"^Index:\s*(\d+)", line) - if m: - cur_idx = int(m.group(1)) + match = re.search(r"^Index:\s*(\d+)", line) + if match: + cur_idx = int(match.group(1)) continue - m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) - if m and cur_idx is not None: - out[m.group(1).lower()] = cur_idx + match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) + if match and cur_idx is not None: + out[match.group(1).lower()] = cur_idx if not out: raise RuntimeError("no devices parsed from hackrf_info") return out -def fmt(v: Optional[float], spec: str) -> str: - return "-" if v is None else format(v, spec) +def fmt(value: Optional[float], spec: str) -> str: + return "-" if value is None else format(value, spec) -def render(rows: Dict[str, Row], started_at: float, env_path: Path, serial_to_index: Dict[str, int]): +def render(rows: Dict[str, Row], started_at: float, env_path: Path, summary: str): now = time.time() print("\x1b[2J\x1b[H", end="") - print("HackRF Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)") - print(f"env: {env_path} | discovered: {len(serial_to_index)} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}") + print("Read Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)") + print(f"env: {env_path} | {summary} | uptime: {int(now - started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}") print() - header = f"{'band':>5} {'idx':>3} {'freq':>7} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>5} {'age':>5} {'serial':>12} error" + header = f"{'band':>5} {'idx':>4} {'freq':>8} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>6} {'age':>5} {'device':>18} error" print(header) - print('-' * len(header)) - for label in sorted(rows, key=lambda x: int(x)): - r = rows[label] - idx = '-' if r.index is None else str(r.index) - age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}" - err = (r.error or "") + print("-" * len(header)) + for label in sorted(rows, key=label_sort_key): + row = rows[label] + idx = "-" if row.index is None else str(row.index) + age = "-" if row.updated_at <= 0 else f"{(now - row.updated_at):.1f}" + err = row.error or "" if len(err) > 64: - err = err[:61] + '...' + err = err[:61] + "..." + device = row.device[-18:] print( - f"{r.label:>5} {idx:>3} {r.freq_hz/1e6:>7.1f} {r.status:>9} " - f"{fmt(r.rms, '.6f'):>10} {fmt(r.power_lin, '.8f'):>12} {fmt(r.dbfs, '.2f'):>9} " - f"{r.samples:>5} {age:>5} {r.serial[-12:]:>12} {err}" + f"{row.label:>5} {idx:>4} {row.freq_hz / 1e6:>8.1f} {row.status:>9} " + f"{fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} {fmt(row.dbfs, '.2f'):>9} " + f"{row.samples:>6} {age:>5} {device:>18} {err}" ) print() - print("Ctrl+C to stop. Use --only 2400,5200 or --freq-mhz 2450 to limit/override tuning.") + print("Ctrl+C to stop. Use --backend iio --uri ip:192.168.2.1 --only 2400 for Neptune.") sys.stdout.flush() def build_parser() -> argparse.ArgumentParser: - p = argparse.ArgumentParser(description="Realtime HackRF relative energy monitor") - p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)") - p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)") - p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)") - p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate in Hz (HackRF min ~2e6)") - p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") - p.add_argument("--interval", type=float, default=0.5, help="Per-device read interval (s)") - p.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)") - p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)") - p.add_argument("--gain", type=float, default=0.0, help="General gain") - p.add_argument("--if-gain", type=float, default=16.0, help="IF gain") - p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") - return p + parser = argparse.ArgumentParser(description="Realtime SDR relative energy monitor") + parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend") + parser.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)") + parser.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)") + parser.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all targets (MHz)") + parser.add_argument( + "--sample-rate", + type=float, + default=None, + help="Sample rate in Hz (HackRF default: 2e6, IIO default: keep current device setting)", + ) + parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (default: keep device setting)") + parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / capture size") + parser.add_argument("--interval", type=float, default=0.5, help="Per-target read interval (s)") + parser.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)") + parser.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after ERR/BUSY (s)") + parser.add_argument("--gain", type=float, default=0.0, help="General gain for HackRF") + parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF") + parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF") + parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1") + parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device") + parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device") + parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel") + parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel") + parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency") + parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value") + parser.add_argument( + "--iio-gain-mode", + choices=("manual", "fast_attack", "slow_attack", "hybrid"), + default="slow_attack", + help="IIO gain control mode", + ) + parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode") + parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds") + parser.add_argument("--settle", type=float, default=0.12, help="Wait after retune before reading in IIO mode (s)") + return parser def main() -> int: args = build_parser().parse_args() - only = {x.strip() for x in args.only.split(',') if x.strip()} or None + only = {item.strip() for item in args.only.split(",") if item.strip()} or None env_path = Path(args.env) if not env_path.is_absolute(): env_path = (Path(__file__).resolve().parent / env_path).resolve() env = parse_env(env_path) - targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz) - if not targets: - print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr) - return 2 - try: - serial_to_index = parse_hackrf_info() - except Exception as exc: - print(f"hackrf discovery failed: {exc}", file=sys.stderr) - return 3 + if args.backend == "hackrf": + targets = collect_hackrf_targets(env, only=only, override_freq_mhz=args.freq_mhz) + if not targets: + print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr) + return 2 + try: + serial_to_index = parse_hackrf_info() + except Exception as exc: + print(f"hackrf discovery failed: {exc}", file=sys.stderr) + return 3 + summary = f"backend=hackrf | discovered={len(serial_to_index)}" + if args.sample_rate is None: + args.sample_rate = 2e6 + else: + try: + targets = collect_iio_targets(env, only=only, override_freq_mhz=args.freq_mhz, uri=args.uri) + except ValueError as exc: + print(str(exc), file=sys.stderr) + return 2 + if not targets: + print( + f"No IIO targets resolved from {env_path}. Use c_freq_* in .env or pass --only / --freq-mhz.", + file=sys.stderr, + ) + return 2 + summary = f"backend=iio | uri={args.uri} | device={args.iio_device} | targets={len(targets)}" - rows = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets} + rows = {target.label: Row(label=target.label, device=target.device, freq_hz=target.freq_hz) for target in targets} lock = threading.Lock() stop_event = threading.Event() @@ -295,27 +592,32 @@ def main() -> int: signal.signal(signal.SIGINT, on_signal) signal.signal(signal.SIGTERM, on_signal) - workers: List[Worker] = [] - for i, t in enumerate(targets): - wa = argparse.Namespace(**vars(args)) - wa.stagger = i * 0.15 - w = Worker(t, serial_to_index, rows, lock, stop_event, wa) - workers.append(w) - w.start() + workers: List[threading.Thread] = [] + if args.backend == "hackrf": + for idx, target in enumerate(targets): + worker_args = argparse.Namespace(**vars(args)) + worker_args.stagger = idx * 0.15 + worker = HackRfWorker(target, serial_to_index, rows, lock, stop_event, worker_args) + workers.append(worker) + worker.start() + else: + worker = IioWorker(targets, rows, lock, stop_event, args) + workers.append(worker) + worker.start() started = time.time() try: while not stop_event.is_set(): with lock: - snap = {k: Row(**vars(v)) for k, v in rows.items()} - render(snap, started, env_path, serial_to_index) + snapshot = {key: Row(**vars(value)) for key, value in rows.items()} + render(snapshot, started, env_path, summary) stop_event.wait(args.refresh) finally: stop_event.set() - for w in workers: - w.join(timeout=2) + for worker in workers: + worker.join(timeout=2) return 0 -if __name__ == '__main__': +if __name__ == "__main__": raise SystemExit(main()) diff --git a/read_energy_wide.py b/read_energy_wide.py index d7bdb1b..e7124b6 100644 --- a/read_energy_wide.py +++ b/read_energy_wide.py @@ -1,13 +1,25 @@ +#!/usr/bin/env python3 """ -./.venv-sdr/bin/python read_energy_wide.py \ - --serial 0000000000000000a18c63dc2a83b813 \ - --sample-rate 20000000 \ - --base 6000 \ - --roof 5700 \ - --step 20 - +Examples: + +HackRF: + ./.venv-sdr/bin/python read_energy_wide.py \ + --backend hackrf \ + --serial 0000000000000000a18c63dc2a83b813 \ + --sample-rate 20000000 \ + --base 6000 \ + --roof 5700 \ + --step 20 + +Neptune / Pluto over IIO: + ./.venv-sdr/bin/python read_energy_wide.py \ + --backend iio \ + --uri ip:192.168.2.1 \ + --base 2402 \ + --roof 2398 \ + --step 1 """ -#!/usr/bin/env python3 + import argparse import math import re @@ -16,7 +28,7 @@ import subprocess import sys import time from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Sequence, Tuple try: import numpy as np @@ -33,6 +45,7 @@ except Exception as exc: sys.exit(1) EPS = 1e-20 +IIO_MIN_SAMPLE_RATE = 2083333 @dataclass @@ -53,10 +66,10 @@ class ScanWindow: pass_no: int = 0 -class WideProbeTop(gr.top_block): +class HackRfWideProbe(gr.top_block): def __init__( self, - index: int, + serial: str, center_freq_hz: float, sample_rate: float, vec_len: int, @@ -67,7 +80,7 @@ class WideProbeTop(gr.top_block): super().__init__("hackrf_energy_wide_probe") self.probe = blocks.probe_signal_vc(vec_len) self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) - self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") + self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}") self.src.set_time_unknown_pps(osmosdr.time_spec_t()) self.src.set_sample_rate(sample_rate) self.src.set_center_freq(center_freq_hz, 0) @@ -82,8 +95,8 @@ class WideProbeTop(gr.top_block): for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): try: getattr(self.src, fn)(val, 0) - except Exception: - pass + except Exception as exc: + raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc try: self.src.set_bandwidth(0, 0) except Exception: @@ -95,7 +108,7 @@ class WideProbeTop(gr.top_block): self.connect((self.src, 0), (self.stream_to_vec, 0)) self.connect((self.stream_to_vec, 0), (self.probe, 0)) - def tune(self, freq_hz: float) -> None: + def tune(self, freq_hz: float): self.src.set_center_freq(freq_hz, 0) def read_metrics(self) -> Tuple[float, float, float, int]: @@ -139,374 +152,154 @@ class WideProbeTop(gr.top_block): return rms, power_lin, dbfs, samples -def parse_hackrf_info() -> Dict[str, int]: - try: - proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) - except FileNotFoundError: - raise RuntimeError("hackrf_info not found") - except subprocess.TimeoutExpired: - raise RuntimeError("hackrf_info timeout") - text = (proc.stdout or "") + "\n" + (proc.stderr or "") - out: Dict[str, int] = {} - cur_idx: Optional[int] = None - for line in text.splitlines(): - m = re.search(r"^Index:\s*(\d+)", line) - if m: - cur_idx = int(m.group(1)) - continue - m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) - if m and cur_idx is not None: - out[m.group(1).lower()] = cur_idx - if not out: - raise RuntimeError("no devices parsed from hackrf_info") - return out - - -def fmt(value: Optional[float], spec: str) -> str: - return "-" if value is None else format(value, spec) - - -def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[ScanWindow]: - if step_mhz <= 0: - raise ValueError("step must be > 0") - if base_mhz == roof_mhz: - raise ValueError("base and roof must be different") - - direction = -1.0 if roof_mhz < base_mhz else 1.0 - edge = base_mhz - seq = 1 - windows: List[ScanWindow] = [] - - while True: - next_edge = edge + direction * step_mhz - if direction < 0 and next_edge < roof_mhz: - next_edge = roof_mhz - if direction > 0 and next_edge > roof_mhz: - next_edge = roof_mhz - - low_mhz = min(edge, next_edge) - high_mhz = max(edge, next_edge) - center_mhz = (low_mhz + high_mhz) / 2.0 - windows.append( - ScanWindow( - seq=seq, - start_mhz=edge, - end_mhz=next_edge, - low_mhz=low_mhz, - high_mhz=high_mhz, - center_mhz=center_mhz, - ) +class IioWideProbe: + def __init__(self, args: argparse.Namespace): + self.args = args + self._static_signature: Optional[Tuple[Optional[float], Optional[float], str, Optional[float], str]] = None + self._last_freq_hz: Optional[int] = None + self._input_channels = [args.iio_i_channel] + if args.iio_q_channel not in self._input_channels: + self._input_channels.append(args.iio_q_channel) + + def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes: + proc = subprocess.run(list(cmd), capture_output=True, text=not binary) + if proc.returncode != 0: + stderr = proc.stderr if binary else (proc.stderr or "") + stdout = "" if binary else (proc.stdout or "") + details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}" + raise RuntimeError(details) + return proc.stdout + + def _run_binary(self, cmd: Sequence[str]) -> bytes: + proc = subprocess.run(list(cmd), capture_output=True) + if proc.returncode != 0: + details = proc.stderr.decode("utf-8", errors="ignore").strip() or proc.stdout.decode("utf-8", errors="ignore").strip() + raise RuntimeError(details or f"exit code {proc.returncode}") + return proc.stdout + + def _read_input_attr(self, channel: str, attr: str) -> str: + out = self._run( + [ + "iio_attr", + "-u", + self.args.uri, + "-i", + "-c", + self.args.iio_phy_device, + channel, + attr, + ] ) - - if next_edge == roof_mhz: - break - edge = next_edge - seq += 1 - - return windows - - -def render( - windows: List[ScanWindow], - serial: str, - index: int, - sample_rate: float, - base_mhz: float, - roof_mhz: float, - step_mhz: float, - started_at: float, - pass_no: int, - current_seq: int, -) -> None: - now = time.time() - capture_bw_mhz = sample_rate / 1e6 - current_row = next((row for row in windows if row.seq == current_seq), None) - best_row = max( - (row for row in windows if row.status == "OK" and row.dbfs is not None), - key=lambda row: row.dbfs if row.dbfs is not None else float("-inf"), - default=None, - ) - print("\x1b[2J\x1b[H", end="") - print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)") - print( - f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | " - f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | " - f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}" - ) - print() - header = ( - f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} " - f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error" - ) - print(header) - print("-" * len(header)) - for row in windows: - age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}" - err = row.error - if len(err) > 50: - err = err[:47] + "..." - marker = ">>>" if row.seq == current_seq else "" - print( - f"{marker:>3} {row.seq:>3} " - f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} " - f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} " - f"{row.samples:>5} {age:>5} {err}" + return str(out).strip() + + def _set_input_attr(self, channel: str, attr: str, value: str): + self._run( + [ + "iio_attr", + "-u", + self.args.uri, + "-q", + "-i", + "-c", + self.args.iio_phy_device, + channel, + attr, + value, + ] ) - print() - if best_row is not None: - best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}" - print( - f"{'':>3} {'MAX':>3} " - f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} " - f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} " - f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}" - ) - elif current_row is not None: - current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}" - print( - f"{'':>3} {'MAX':>3} " - f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} " - f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} " - f"{0:>5} {current_age:>5} no successful windows yet" - ) - print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.") - sys.stdout.flush() - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy") - parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info") - parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz") - parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz") - parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz") - parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz") - parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") - parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)") - parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window") - parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)") - parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite") - parser.add_argument("--gain", type=float, default=16.0, help="General gain") - parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain") - parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") - return parser - -def main() -> int: - args = build_parser().parse_args() - serial = args.serial.lower() - - try: - windows = build_windows(args.base, args.roof, args.step) - except ValueError as exc: - print(f"invalid scan range: {exc}", file=sys.stderr) - return 2 - - step_hz = args.step * 1e6 - if args.sample_rate < step_hz: - print( - f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; " - "this would leave gaps in the scan", - file=sys.stderr, + def _set_output_attr(self, channel: str, attr: str, value: str): + self._run( + [ + "iio_attr", + "-u", + self.args.uri, + "-q", + "-o", + "-c", + self.args.iio_phy_device, + channel, + attr, + value, + ] ) - return 2 - - try: - serial_to_index = parse_hackrf_info() - except Exception as exc: - print(f"hackrf discovery failed: {exc}", file=sys.stderr) - return 3 - - index = serial_to_index.get(serial) - if index is None: - print(f"serial {serial} not found in hackrf_info", file=sys.stderr) - print("available serials:", file=sys.stderr) - for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]): - print(f" idx={item_index} serial={item_serial}", file=sys.stderr) - return 4 - - stop_requested = False - - def on_signal(signum, frame): - nonlocal stop_requested - stop_requested = True - - signal.signal(signal.SIGINT, on_signal) - signal.signal(signal.SIGTERM, on_signal) - - probe: Optional[WideProbeTop] = None - started_at = time.time() - pass_no = 0 - current_seq = windows[0].seq - try: - probe = WideProbeTop( - index=index, - center_freq_hz=windows[0].center_mhz * 1e6, - sample_rate=args.sample_rate, - vec_len=args.vec_len, - gain=args.gain, - if_gain=args.if_gain, - bb_gain=args.bb_gain, + def get_effective_sample_rate(self) -> float: + if self.args.sample_rate is not None: + return float(self.args.sample_rate) + raw = self._read_input_attr(self.args.iio_i_channel, "sampling_frequency") + return float(raw) + + def ensure_configured(self): + sample_rate = None + if self.args.sample_rate is not None: + sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE))) + + bandwidth = None + if self.args.bandwidth is not None: + bandwidth = int(round(max(float(self.args.bandwidth), 200000.0))) + + signature = ( + None if sample_rate is None else float(sample_rate), + None if bandwidth is None else float(bandwidth), + self.args.iio_gain_mode, + self.args.iio_hardwaregain, + self.args.iio_port_select, ) - probe.start() - time.sleep(max(args.settle, 0.12)) - - while not stop_requested: - pass_no += 1 - for row in windows: - if stop_requested: - break - current_seq = row.seq - try: - probe.tune(row.center_mhz * 1e6) - rms, power_lin, dbfs, samples = probe.read_window( - settle=args.settle, - avg_reads=args.avg_reads, - pause_between_reads=args.pause_between_reads, - ) - row.status = "OK" - row.rms = rms - row.power_lin = power_lin - row.dbfs = dbfs - row.samples = samples - row.error = "" - row.updated_at = time.time() - row.pass_no = pass_no - except Exception as exc: - row.status = "ERR" - row.error = str(exc) - row.updated_at = time.time() - render( - windows=windows, - serial=serial, - index=index, - sample_rate=args.sample_rate, - base_mhz=args.base, - roof_mhz=args.roof, - step_mhz=args.step, - started_at=started_at, - pass_no=pass_no, - current_seq=current_seq, - ) - if args.passes > 0 and pass_no >= args.passes: - break - except Exception as exc: - print(f"scanner failed: {exc}", file=sys.stderr) - return 5 - finally: - if probe is not None: - try: - probe.stop() - probe.wait() - except Exception: - pass - - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) -#!/usr/bin/env python3 -import argparse -import math -import re -import signal -import subprocess -import sys -import time -from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple - -try: - import numpy as np -except Exception as exc: - print(f"numpy import failed: {exc}", file=sys.stderr) - sys.exit(1) - -try: - from gnuradio import blocks, gr - import osmosdr -except Exception as exc: - print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr) - print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy_wide.py", file=sys.stderr) - sys.exit(1) - -EPS = 1e-20 - - -@dataclass -class ScanWindow: - seq: int - start_mhz: float - end_mhz: float - low_mhz: float - high_mhz: float - center_mhz: float - status: str = "INIT" - rms: Optional[float] = None - power_lin: Optional[float] = None - dbfs: Optional[float] = None - samples: int = 0 - updated_at: float = 0.0 - error: str = "" - pass_no: int = 0 - - -class WideProbeTop(gr.top_block): - def __init__( - self, - index: int, - center_freq_hz: float, - sample_rate: float, - vec_len: int, - gain: float, - if_gain: float, - bb_gain: float, - ): - super().__init__("hackrf_energy_wide_probe") - self.probe = blocks.probe_signal_vc(vec_len) - self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) - self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") - self.src.set_time_unknown_pps(osmosdr.time_spec_t()) - self.src.set_sample_rate(sample_rate) - self.src.set_center_freq(center_freq_hz, 0) - try: - self.src.set_freq_corr(0, 0) - except Exception: - pass - try: - self.src.set_gain_mode(False, 0) - except Exception: - pass - for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): - try: - getattr(self.src, fn)(val, 0) - except Exception: - pass - try: - self.src.set_bandwidth(0, 0) - except Exception: - pass - try: - self.src.set_antenna("", 0) - except Exception: - pass - self.connect((self.src, 0), (self.stream_to_vec, 0)) - self.connect((self.stream_to_vec, 0), (self.probe, 0)) - - def tune(self, freq_hz: float) -> None: - self.src.set_center_freq(freq_hz, 0) - - def read_metrics(self) -> Tuple[float, float, float, int]: - arr = np.asarray(self.probe.level(), dtype=np.complex64) - if arr.size == 0: + if signature == self._static_signature: + return + + for channel in self._input_channels: + if sample_rate is not None: + self._set_input_attr(channel, "sampling_frequency", str(sample_rate)) + if bandwidth is not None: + self._set_input_attr(channel, "rf_bandwidth", str(bandwidth)) + if self.args.iio_port_select: + self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select) + if self.args.iio_gain_mode: + self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode) + if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None: + self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}") + + self._static_signature = signature + + def tune(self, freq_hz: float): + target = int(round(freq_hz)) + if self._last_freq_hz == target: + return + self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target)) + self._last_freq_hz = target + + def read_metrics(self, samples_per_read: int) -> Tuple[float, float, float, int]: + cmd = [ + "iio_readdev", + "-u", + self.args.uri, + "-T", + str(int(self.args.timeout_ms)), + "-b", + str(max(4, int(samples_per_read))), + "-s", + str(max(4, int(samples_per_read))), + self.args.iio_device, + self.args.iio_i_channel, + self.args.iio_q_channel, + ] + raw = self._run_binary(cmd) + values = np.frombuffer(raw, dtype=" Tuple[float, float, float, int]: + def read_window(self, settle: float, avg_reads: int, pause_between_reads: float, samples_per_read: int) -> Tuple[float, float, float, int]: if settle > 0: time.sleep(settle) @@ -516,10 +309,10 @@ class WideProbeTop(gr.top_block): last_error: Optional[Exception] = None for idx in range(read_count): - deadline = time.time() + 1.0 + deadline = time.time() + max(1.0, self.args.timeout_ms / 1000.0) while True: try: - _, power_lin, _, samples = self.read_metrics() + _, power_lin, _, samples = self.read_metrics(samples_per_read) powers.append(power_lin) sample_sizes.append(samples) break @@ -541,21 +334,21 @@ class WideProbeTop(gr.top_block): def parse_hackrf_info() -> Dict[str, int]: try: proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) - except FileNotFoundError: - raise RuntimeError("hackrf_info not found") - except subprocess.TimeoutExpired: - raise RuntimeError("hackrf_info timeout") + except FileNotFoundError as exc: + raise RuntimeError("hackrf_info not found") from exc + except subprocess.TimeoutExpired as exc: + raise RuntimeError("hackrf_info timeout") from exc text = (proc.stdout or "") + "\n" + (proc.stderr or "") out: Dict[str, int] = {} cur_idx: Optional[int] = None for line in text.splitlines(): - m = re.search(r"^Index:\s*(\d+)", line) - if m: - cur_idx = int(m.group(1)) + match = re.search(r"^Index:\s*(\d+)", line) + if match: + cur_idx = int(match.group(1)) continue - m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) - if m and cur_idx is not None: - out[m.group(1).lower()] = cur_idx + match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) + if match and cur_idx is not None: + out[match.group(1).lower()] = cur_idx if not out: raise RuntimeError("no devices parsed from hackrf_info") return out @@ -607,8 +400,9 @@ def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[Sca def render( windows: List[ScanWindow], - serial: str, - index: int, + backend: str, + device_ref: str, + index_ref: str, sample_rate: float, base_mhz: float, roof_mhz: float, @@ -626,73 +420,93 @@ def render( default=None, ) print("\x1b[2J\x1b[H", end="") - print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)") + print("Wide Energy Monitor (relative power: RMS / linear / dBFS)") print( - f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | " + f"backend: {backend} | device: {device_ref} | idx: {index_ref} | sample-rate: {capture_bw_mhz:.3f} MHz | " f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | " f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}" ) print() header = ( f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} " - f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error" + f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>6} {'age':>5} error" ) print(header) print("-" * len(header)) for row in windows: - age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}" + age = "-" if row.updated_at <= 0 else f"{(now - row.updated_at):.1f}" err = row.error - if len(err) > 50: - err = err[:47] + "..." + if len(err) > 56: + err = err[:53] + "..." marker = ">>>" if row.seq == current_seq else "" print( f"{marker:>3} {row.seq:>3} " f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} " f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} " - f"{row.samples:>5} {age:>5} {err}" + f"{row.samples:>6} {age:>5} {err}" ) print() if best_row is not None: - best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}" + best_age = "-" if best_row.updated_at <= 0 else f"{(now - best_row.updated_at):.1f}" print( f"{'':>3} {'MAX':>3} " f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} " f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} " - f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}" + f"{best_row.samples:>6} {best_age:>5} pass={best_row.pass_no}" ) elif current_row is not None: - current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}" + current_age = "-" if current_row.updated_at <= 0 else f"{(now - current_row.updated_at):.1f}" print( f"{'':>3} {'MAX':>3} " f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} " f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} " - f"{0:>5} {current_age:>5} no successful windows yet" + f"{0:>6} {current_age:>5} no successful windows yet" ) print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.") sys.stdout.flush() def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy") - parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info") - parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz") + parser = argparse.ArgumentParser(description="Retune one SDR across a wide frequency range and measure energy") + parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend") + parser.add_argument("--serial", help="HackRF serial number from hackrf_info") + parser.add_argument( + "--sample-rate", + type=float, + default=None, + help="Sample rate in Hz (required for hackrf, optional for iio to keep device setting)", + ) + parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (iio only, optional)") parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz") parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz") parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz") - parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") + parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / samples per read") parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)") - parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window") + parser.add_argument("--avg-reads", type=int, default=3, help="How many reads to average per window") parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)") - parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite") - parser.add_argument("--gain", type=float, default=16.0, help="General gain") - parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain") - parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") + parser.add_argument("--gain", type=float, default=16.0, help="General gain for HackRF") + parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF") + parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF") + parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1") + parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device") + parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device") + parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel") + parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel") + parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency") + parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value") + parser.add_argument( + "--iio-gain-mode", + choices=("manual", "fast_attack", "slow_attack", "hybrid"), + default="slow_attack", + help="IIO gain control mode", + ) + parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode") + parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds") return parser def main() -> int: args = build_parser().parse_args() - serial = args.serial.lower() try: windows = build_windows(args.base, args.roof, args.step) @@ -700,28 +514,72 @@ def main() -> int: print(f"invalid scan range: {exc}", file=sys.stderr) return 2 - step_hz = args.step * 1e6 - if args.sample_rate < step_hz: - print( - f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; " - "this would leave gaps in the scan", - file=sys.stderr, - ) - return 2 + probe: Optional[HackRfWideProbe | IioWideProbe] = None + device_ref = "" + index_ref = "-" + + if args.backend == "hackrf": + if not args.serial: + print("--serial is required for --backend hackrf", file=sys.stderr) + return 2 + if args.sample_rate is None: + print("--sample-rate is required for --backend hackrf", file=sys.stderr) + return 2 + + serial = args.serial.lower() + step_hz = args.step * 1e6 + if args.sample_rate < step_hz: + print( + f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; this would leave gaps in the scan", + file=sys.stderr, + ) + return 2 - try: - serial_to_index = parse_hackrf_info() - except Exception as exc: - print(f"hackrf discovery failed: {exc}", file=sys.stderr) - return 3 + try: + serial_to_index = parse_hackrf_info() + except Exception as exc: + print(f"hackrf discovery failed: {exc}", file=sys.stderr) + return 3 + + index = serial_to_index.get(serial) + if index is None: + print(f"serial {serial} not found in hackrf_info", file=sys.stderr) + print("available serials:", file=sys.stderr) + for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]): + print(f" idx={item_index} serial={item_serial}", file=sys.stderr) + return 4 + + probe = HackRfWideProbe( + serial=serial, + center_freq_hz=windows[0].center_mhz * 1e6, + sample_rate=args.sample_rate, + vec_len=args.vec_len, + gain=args.gain, + if_gain=args.if_gain, + bb_gain=args.bb_gain, + ) + effective_sample_rate = float(args.sample_rate) + device_ref = serial + index_ref = str(index) + else: + probe = IioWideProbe(args) + try: + probe.ensure_configured() + effective_sample_rate = probe.get_effective_sample_rate() + except Exception as exc: + print(f"iio setup failed: {exc}", file=sys.stderr) + return 3 + + step_hz = args.step * 1e6 + if effective_sample_rate < step_hz: + print( + f"effective sample-rate {effective_sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; this would leave gaps in the scan", + file=sys.stderr, + ) + return 2 - index = serial_to_index.get(serial) - if index is None: - print(f"serial {serial} not found in hackrf_info", file=sys.stderr) - print("available serials:", file=sys.stderr) - for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]): - print(f" idx={item_index} serial={item_serial}", file=sys.stderr) - return 4 + device_ref = args.uri + index_ref = "iio" stop_requested = False @@ -732,23 +590,14 @@ def main() -> int: signal.signal(signal.SIGINT, on_signal) signal.signal(signal.SIGTERM, on_signal) - probe: Optional[WideProbeTop] = None started_at = time.time() pass_no = 0 current_seq = windows[0].seq try: - probe = WideProbeTop( - index=index, - center_freq_hz=windows[0].center_mhz * 1e6, - sample_rate=args.sample_rate, - vec_len=args.vec_len, - gain=args.gain, - if_gain=args.if_gain, - bb_gain=args.bb_gain, - ) - probe.start() - time.sleep(max(args.settle, 0.12)) + if isinstance(probe, HackRfWideProbe): + probe.start() + time.sleep(max(args.settle, 0.12)) while not stop_requested: pass_no += 1 @@ -758,11 +607,19 @@ def main() -> int: current_seq = row.seq try: probe.tune(row.center_mhz * 1e6) - rms, power_lin, dbfs, samples = probe.read_window( - settle=args.settle, - avg_reads=args.avg_reads, - pause_between_reads=args.pause_between_reads, - ) + if isinstance(probe, HackRfWideProbe): + rms, power_lin, dbfs, samples = probe.read_window( + settle=args.settle, + avg_reads=args.avg_reads, + pause_between_reads=args.pause_between_reads, + ) + else: + rms, power_lin, dbfs, samples = probe.read_window( + settle=args.settle, + avg_reads=args.avg_reads, + pause_between_reads=args.pause_between_reads, + samples_per_read=args.vec_len, + ) row.status = "OK" row.rms = rms row.power_lin = power_lin @@ -775,11 +632,13 @@ def main() -> int: row.status = "ERR" row.error = str(exc) row.updated_at = time.time() + render( windows=windows, - serial=serial, - index=index, - sample_rate=args.sample_rate, + backend=args.backend, + device_ref=device_ref, + index_ref=index_ref, + sample_rate=effective_sample_rate, base_mhz=args.base, roof_mhz=args.roof, step_mhz=args.step, @@ -787,13 +646,12 @@ def main() -> int: pass_no=pass_no, current_seq=current_seq, ) - if args.passes > 0 and pass_no >= args.passes: - break + except Exception as exc: print(f"scanner failed: {exc}", file=sys.stderr) return 5 finally: - if probe is not None: + if isinstance(probe, HackRfWideProbe): try: probe.stop() probe.wait()