From 73b0bc3298743721fb83c3e18b0178cd8f55b3e2 Mon Sep 17 00:00:00 2001 From: Sergey Revyakin Date: Thu, 9 Apr 2026 11:29:05 +0700 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20?= =?UTF-8?q?=D1=81=D0=BA=D1=80=D0=B8=D0=BF=D1=82=20=D0=BD=D0=B0=20=D0=BF?= =?UTF-8?q?=D1=80=D0=BE=D0=B2=D0=B5=D1=80=D0=BA=D1=83=20=D0=B4=D0=B8=D0=B0?= =?UTF-8?q?=D0=BF=D0=B0=D0=B7=D0=BE=D0=BD=D0=BE=D0=B2=20=D1=88=D0=B8=D1=80?= =?UTF-8?q?=D0=BE=D0=BA=D0=B8=D0=BC=20=D1=81=D0=BA=D0=B0=D0=BD=D0=B8=D1=80?= =?UTF-8?q?=D0=BE=D0=B2=D0=B0=D0=BD=D0=B8=D0=B5=D0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- read_energy_wide.py | 399 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 399 insertions(+) create mode 100644 read_energy_wide.py diff --git a/read_energy_wide.py b/read_energy_wide.py new file mode 100644 index 0000000..a594cab --- /dev/null +++ b/read_energy_wide.py @@ -0,0 +1,399 @@ +#!/usr/bin/env python3 +import argparse +import math +import re +import signal +import subprocess +import sys +import time +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +try: + import numpy as np +except Exception as exc: + print(f"numpy import failed: {exc}", file=sys.stderr) + sys.exit(1) + +try: + from gnuradio import blocks, gr + import osmosdr +except Exception as exc: + print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr) + print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy_wide.py", file=sys.stderr) + sys.exit(1) + +EPS = 1e-20 + + +@dataclass +class ScanWindow: + seq: int + start_mhz: float + end_mhz: float + low_mhz: float + high_mhz: float + center_mhz: float + status: str = "INIT" + rms: Optional[float] = None + power_lin: Optional[float] = None + dbfs: Optional[float] = None + samples: int = 0 + updated_at: float = 0.0 + error: str = "" + pass_no: int = 0 + + +class WideProbeTop(gr.top_block): + def __init__( + self, + index: int, + center_freq_hz: float, + sample_rate: float, + vec_len: int, + gain: float, + if_gain: float, + bb_gain: float, + ): + super().__init__("hackrf_energy_wide_probe") + self.probe = blocks.probe_signal_vc(vec_len) + self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) + self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") + self.src.set_time_unknown_pps(osmosdr.time_spec_t()) + self.src.set_sample_rate(sample_rate) + self.src.set_center_freq(center_freq_hz, 0) + try: + self.src.set_freq_corr(0, 0) + except Exception: + pass + try: + self.src.set_gain_mode(False, 0) + except Exception: + pass + for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): + try: + getattr(self.src, fn)(val, 0) + except Exception: + pass + try: + self.src.set_bandwidth(0, 0) + except Exception: + pass + try: + self.src.set_antenna("", 0) + except Exception: + pass + self.connect((self.src, 0), (self.stream_to_vec, 0)) + self.connect((self.stream_to_vec, 0), (self.probe, 0)) + + def tune(self, freq_hz: float) -> None: + self.src.set_center_freq(freq_hz, 0) + + def read_metrics(self) -> Tuple[float, float, float, int]: + arr = np.asarray(self.probe.level(), dtype=np.complex64) + if arr.size == 0: + raise RuntimeError("no samples") + power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) + rms = math.sqrt(max(power_lin, 0.0)) + dbfs = 10.0 * math.log10(max(power_lin, EPS)) + return rms, power_lin, dbfs, int(arr.size) + + def read_window(self, settle: float, avg_reads: int, pause_between_reads: float) -> Tuple[float, float, float, int]: + if settle > 0: + time.sleep(settle) + + read_count = max(1, avg_reads) + powers: List[float] = [] + sample_sizes: List[int] = [] + last_error: Optional[Exception] = None + + for idx in range(read_count): + deadline = time.time() + 1.0 + while True: + try: + _, power_lin, _, samples = self.read_metrics() + powers.append(power_lin) + sample_sizes.append(samples) + break + except Exception as exc: + last_error = exc + if time.time() >= deadline: + raise RuntimeError(str(last_error) if last_error else "no samples") + time.sleep(0.02) + if idx + 1 < read_count and pause_between_reads > 0: + time.sleep(pause_between_reads) + + power_lin = float(sum(powers) / len(powers)) + rms = math.sqrt(max(power_lin, 0.0)) + dbfs = 10.0 * math.log10(max(power_lin, EPS)) + samples = int(sum(sample_sizes) / len(sample_sizes)) + return rms, power_lin, dbfs, samples + + +def parse_hackrf_info() -> Dict[str, int]: + try: + proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) + except FileNotFoundError: + raise RuntimeError("hackrf_info not found") + except subprocess.TimeoutExpired: + raise RuntimeError("hackrf_info timeout") + text = (proc.stdout or "") + "\n" + (proc.stderr or "") + out: Dict[str, int] = {} + cur_idx: Optional[int] = None + for line in text.splitlines(): + m = re.search(r"^Index:\s*(\d+)", line) + if m: + cur_idx = int(m.group(1)) + continue + m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) + if m and cur_idx is not None: + out[m.group(1).lower()] = cur_idx + if not out: + raise RuntimeError("no devices parsed from hackrf_info") + return out + + +def fmt(value: Optional[float], spec: str) -> str: + return "-" if value is None else format(value, spec) + + +def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[ScanWindow]: + if step_mhz <= 0: + raise ValueError("step must be > 0") + if base_mhz == roof_mhz: + raise ValueError("base and roof must be different") + + direction = -1.0 if roof_mhz < base_mhz else 1.0 + edge = base_mhz + seq = 1 + windows: List[ScanWindow] = [] + + while True: + next_edge = edge + direction * step_mhz + if direction < 0 and next_edge < roof_mhz: + next_edge = roof_mhz + if direction > 0 and next_edge > roof_mhz: + next_edge = roof_mhz + + low_mhz = min(edge, next_edge) + high_mhz = max(edge, next_edge) + center_mhz = (low_mhz + high_mhz) / 2.0 + windows.append( + ScanWindow( + seq=seq, + start_mhz=edge, + end_mhz=next_edge, + low_mhz=low_mhz, + high_mhz=high_mhz, + center_mhz=center_mhz, + ) + ) + + if next_edge == roof_mhz: + break + edge = next_edge + seq += 1 + + return windows + + +def render( + windows: List[ScanWindow], + serial: str, + index: int, + sample_rate: float, + base_mhz: float, + roof_mhz: float, + step_mhz: float, + started_at: float, + pass_no: int, + current_seq: int, +) -> None: + now = time.time() + capture_bw_mhz = sample_rate / 1e6 + current_row = next((row for row in windows if row.seq == current_seq), None) + best_row = max( + (row for row in windows if row.status == "OK" and row.dbfs is not None), + key=lambda row: row.dbfs if row.dbfs is not None else float("-inf"), + default=None, + ) + print("\x1b[2J\x1b[H", end="") + print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)") + print( + f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | " + f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | " + f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}" + ) + print() + header = ( + f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} " + f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error" + ) + print(header) + print("-" * len(header)) + for row in windows: + age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}" + err = row.error + if len(err) > 50: + err = err[:47] + "..." + marker = ">>>" if row.seq == current_seq else "" + print( + f"{marker:>3} {row.seq:>3} " + f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} " + f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} " + f"{row.samples:>5} {age:>5} {err}" + ) + print() + if best_row is not None: + best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}" + print( + f"{'':>3} {'MAX':>3} " + f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} " + f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} " + f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}" + ) + elif current_row is not None: + current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}" + print( + f"{'':>3} {'MAX':>3} " + f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} " + f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} " + f"{0:>5} {current_age:>5} no successful windows yet" + ) + print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.") + sys.stdout.flush() + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy") + parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info") + parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz") + parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz") + parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz") + parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz") + parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") + parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)") + parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window") + parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)") + parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite") + parser.add_argument("--gain", type=float, default=16.0, help="General gain") + parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain") + parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") + return parser + + +def main() -> int: + args = build_parser().parse_args() + serial = args.serial.lower() + + try: + windows = build_windows(args.base, args.roof, args.step) + except ValueError as exc: + print(f"invalid scan range: {exc}", file=sys.stderr) + return 2 + + step_hz = args.step * 1e6 + if args.sample_rate < step_hz: + print( + f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; " + "this would leave gaps in the scan", + file=sys.stderr, + ) + return 2 + + try: + serial_to_index = parse_hackrf_info() + except Exception as exc: + print(f"hackrf discovery failed: {exc}", file=sys.stderr) + return 3 + + index = serial_to_index.get(serial) + if index is None: + print(f"serial {serial} not found in hackrf_info", file=sys.stderr) + print("available serials:", file=sys.stderr) + for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]): + print(f" idx={item_index} serial={item_serial}", file=sys.stderr) + return 4 + + stop_requested = False + + def on_signal(signum, frame): + nonlocal stop_requested + stop_requested = True + + signal.signal(signal.SIGINT, on_signal) + signal.signal(signal.SIGTERM, on_signal) + + probe: Optional[WideProbeTop] = None + started_at = time.time() + pass_no = 0 + current_seq = windows[0].seq + + try: + probe = WideProbeTop( + index=index, + center_freq_hz=windows[0].center_mhz * 1e6, + sample_rate=args.sample_rate, + vec_len=args.vec_len, + gain=args.gain, + if_gain=args.if_gain, + bb_gain=args.bb_gain, + ) + probe.start() + time.sleep(max(args.settle, 0.12)) + + while not stop_requested: + pass_no += 1 + for row in windows: + if stop_requested: + break + current_seq = row.seq + try: + probe.tune(row.center_mhz * 1e6) + rms, power_lin, dbfs, samples = probe.read_window( + settle=args.settle, + avg_reads=args.avg_reads, + pause_between_reads=args.pause_between_reads, + ) + row.status = "OK" + row.rms = rms + row.power_lin = power_lin + row.dbfs = dbfs + row.samples = samples + row.error = "" + row.updated_at = time.time() + row.pass_no = pass_no + except Exception as exc: + row.status = "ERR" + row.error = str(exc) + row.updated_at = time.time() + render( + windows=windows, + serial=serial, + index=index, + sample_rate=args.sample_rate, + base_mhz=args.base, + roof_mhz=args.roof, + step_mhz=args.step, + started_at=started_at, + pass_no=pass_no, + current_seq=current_seq, + ) + if args.passes > 0 and pass_no >= args.passes: + break + except Exception as exc: + print(f"scanner failed: {exc}", file=sys.stderr) + return 5 + finally: + if probe is not None: + try: + probe.stop() + probe.wait() + except Exception: + pass + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())