From ea3dd8afc4d44c5e85e7af8d779bd5b3cbb2c172 Mon Sep 17 00:00:00 2001 From: Sergey Revyakin Date: Wed, 25 Feb 2026 11:58:34 +0700 Subject: [PATCH] =?UTF-8?q?=D1=81=D0=BA=D1=80=D0=B8=D0=BF=D1=82=D1=8B=20?= =?UTF-8?q?=D0=B4=D0=BB=D1=8F=20=D0=BF=D1=80=D0=BE=D0=B2=D0=B5=D1=80=D0=BA?= =?UTF-8?q?=D0=B8=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D1=8B=20=D1=85=D0=B0?= =?UTF-8?q?=D0=BA=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- read_energy.py | 321 +++++++++++++++++++++++++++++++++++++++++++ read_energy_ascii.py | 282 +++++++++++++++++++++++++++++++++++++ 2 files changed, 603 insertions(+) create mode 100755 read_energy.py create mode 100755 read_energy_ascii.py diff --git a/read_energy.py b/read_energy.py new file mode 100755 index 0000000..6bcc56d --- /dev/null +++ b/read_energy.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +import argparse +import math +import re +import signal +import subprocess +import sys +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +try: + import numpy as np +except Exception as exc: + print(f"numpy import failed: {exc}", file=sys.stderr) + sys.exit(1) + +try: + from gnuradio import blocks, gr + import osmosdr +except Exception as exc: + print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr) + print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy.py", file=sys.stderr) + sys.exit(1) + +EPS = 1e-20 + + +@dataclass +class Target: + label: str + serial: str + freq_hz: float + source: str + + +@dataclass +class Row: + label: str + serial: str + index: Optional[int] = None + freq_hz: float = 0.0 + status: str = "INIT" + rms: Optional[float] = None + power_lin: Optional[float] = None + dbfs: Optional[float] = None + samples: int = 0 + updated_at: float = 0.0 + error: str = "" + + +class ProbeTop(gr.top_block): + def __init__(self, index: int, freq_hz: float, sample_rate: float, vec_len: int, + gain: float, if_gain: float, bb_gain: float): + super().__init__("hackrf_energy_probe") + self.probe = blocks.probe_signal_vc(vec_len) + self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) + self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") + self.src.set_time_unknown_pps(osmosdr.time_spec_t()) + self.src.set_sample_rate(sample_rate) + self.src.set_center_freq(freq_hz, 0) + try: + self.src.set_freq_corr(0, 0) + except Exception: + pass + for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): + try: + getattr(self.src, fn)(val, 0) + except Exception: + pass + try: + self.src.set_bandwidth(0, 0) + except Exception: + pass + try: + self.src.set_antenna('', 0) + except Exception: + pass + self.connect((self.src, 0), (self.stream_to_vec, 0)) + self.connect((self.stream_to_vec, 0), (self.probe, 0)) + + def read_metrics(self) -> Tuple[float, float, float, int]: + arr = np.asarray(self.probe.level(), dtype=np.complex64) + if arr.size == 0: + raise RuntimeError("no samples") + p = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) + rms = math.sqrt(max(p, 0.0)) + dbfs = 10.0 * math.log10(max(p, EPS)) + return rms, p, dbfs, int(arr.size) + + +class Worker(threading.Thread): + def __init__(self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row], + lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace): + super().__init__(daemon=True) + self.target = target + self.serial_to_index = serial_to_index + self.rows = rows + self.lock = lock + self.stop_event = stop_event + self.args = args + self.tb: Optional[ProbeTop] = None + + def _set_row(self, **kwargs): + with self.lock: + row = self.rows[self.target.label] + for k, v in kwargs.items(): + setattr(row, k, v) + row.updated_at = time.time() + + def _open(self) -> bool: + idx = self.serial_to_index.get(self.target.serial) + if idx is None: + self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None) + return False + self._set_row(index=idx, status="OPENING", error="", freq_hz=self.target.freq_hz) + try: + self.tb = ProbeTop( + idx, self.target.freq_hz, self.args.sample_rate, self.args.vec_len, + self.args.gain, self.args.if_gain, self.args.bb_gain + ) + self.tb.start() + time.sleep(0.15) + self._set_row(status="OK", error="") + return True + except Exception as exc: + msg = str(exc) + status = "BUSY" if ("Resource busy" in msg or "-1000" in msg) else "ERR" + self._set_row(status=status, error=msg) + self.tb = None + return False + + def _close(self): + if self.tb is not None: + try: + self.tb.stop() + self.tb.wait() + except Exception: + pass + self.tb = None + + def run(self): + time.sleep(self.args.stagger) + while not self.stop_event.is_set(): + if self.tb is None and not self._open(): + if self.stop_event.wait(self.args.reopen_delay): + break + continue + try: + rms, p, dbfs, n = self.tb.read_metrics() + self._set_row(status="OK", rms=rms, power_lin=p, dbfs=dbfs, samples=n, error="") + except Exception as exc: + self._set_row(status="ERR", error=str(exc)) + self._close() + if self.stop_event.wait(self.args.reopen_delay): + break + continue + if self.stop_event.wait(self.args.interval): + break + self._close() + + +def parse_env(path: Path) -> Dict[str, str]: + out: Dict[str, str] = {} + if not path.exists(): + return out + for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines(): + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + out[k.strip()] = v.strip().strip('"').strip("'") + return out + + +def collect_targets(env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float]) -> List[Target]: + targets: List[Target] = [] + for k, v in env.items(): + m = re.fullmatch(r"hack_(\d+)", k) + if m: + label = m.group(1) + else: + m = re.fullmatch(r"HACKID_(\d+)", k) + if not m: + continue + label = m.group(1) + if only and label not in only: + continue + mhz = override_freq_mhz if override_freq_mhz is not None else float(label) + targets.append(Target(label=label, serial=v.lower(), freq_hz=mhz * 1e6, source=k)) + + uniq: Dict[str, Target] = {} + for t in sorted(targets, key=lambda x: (int(x.label), 0 if x.source.startswith("hack_") else 1)): + uniq.setdefault(t.label, t) + return [uniq[k] for k in sorted(uniq, key=lambda x: int(x))] + + +def parse_hackrf_info() -> Dict[str, int]: + try: + proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) + except FileNotFoundError: + raise RuntimeError("hackrf_info not found") + except subprocess.TimeoutExpired: + raise RuntimeError("hackrf_info timeout") + text = (proc.stdout or "") + "\n" + (proc.stderr or "") + out: Dict[str, int] = {} + cur_idx: Optional[int] = None + for line in text.splitlines(): + m = re.search(r"^Index:\s*(\d+)", line) + if m: + cur_idx = int(m.group(1)) + continue + m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) + if m and cur_idx is not None: + out[m.group(1).lower()] = cur_idx + if not out: + raise RuntimeError("no devices parsed from hackrf_info") + return out + + +def fmt(v: Optional[float], spec: str) -> str: + return "-" if v is None else format(v, spec) + + +def render(rows: Dict[str, Row], started_at: float, env_path: Path, serial_to_index: Dict[str, int]): + now = time.time() + print("\x1b[2J\x1b[H", end="") + print("HackRF Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)") + print(f"env: {env_path} | discovered: {len(serial_to_index)} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}") + print() + header = f"{'band':>5} {'idx':>3} {'freq':>7} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>5} {'age':>5} {'serial':>12} error" + print(header) + print('-' * len(header)) + for label in sorted(rows, key=lambda x: int(x)): + r = rows[label] + idx = '-' if r.index is None else str(r.index) + age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}" + err = (r.error or "") + if len(err) > 64: + err = err[:61] + '...' + print( + f"{r.label:>5} {idx:>3} {r.freq_hz/1e6:>7.1f} {r.status:>9} " + f"{fmt(r.rms, '.6f'):>10} {fmt(r.power_lin, '.8f'):>12} {fmt(r.dbfs, '.2f'):>9} " + f"{r.samples:>5} {age:>5} {r.serial[-12:]:>12} {err}" + ) + print() + print("Ctrl+C to stop. Use --only 2400,5200 or --freq-mhz 2450 to limit/override tuning.") + sys.stdout.flush() + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Realtime HackRF relative energy monitor") + p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)") + p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)") + p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)") + p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate in Hz (HackRF min ~2e6)") + p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") + p.add_argument("--interval", type=float, default=0.5, help="Per-device read interval (s)") + p.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)") + p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)") + p.add_argument("--gain", type=float, default=16.0, help="General gain") + p.add_argument("--if-gain", type=float, default=16.0, help="IF gain") + p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") + return p + + +def main() -> int: + args = build_parser().parse_args() + only = {x.strip() for x in args.only.split(',') if x.strip()} or None + + env_path = Path(args.env) + if not env_path.is_absolute(): + env_path = (Path(__file__).resolve().parent / env_path).resolve() + env = parse_env(env_path) + targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz) + if not targets: + print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr) + return 2 + + try: + serial_to_index = parse_hackrf_info() + except Exception as exc: + print(f"hackrf discovery failed: {exc}", file=sys.stderr) + return 3 + + rows = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets} + lock = threading.Lock() + stop_event = threading.Event() + + def on_signal(signum, frame): + stop_event.set() + + signal.signal(signal.SIGINT, on_signal) + signal.signal(signal.SIGTERM, on_signal) + + workers: List[Worker] = [] + for i, t in enumerate(targets): + wa = argparse.Namespace(**vars(args)) + wa.stagger = i * 0.15 + w = Worker(t, serial_to_index, rows, lock, stop_event, wa) + workers.append(w) + w.start() + + started = time.time() + try: + while not stop_event.is_set(): + with lock: + snap = {k: Row(**vars(v)) for k, v in rows.items()} + render(snap, started, env_path, serial_to_index) + stop_event.wait(args.refresh) + finally: + stop_event.set() + for w in workers: + w.join(timeout=2) + return 0 + + +if __name__ == '__main__': + raise SystemExit(main()) diff --git a/read_energy_ascii.py b/read_energy_ascii.py new file mode 100755 index 0000000..b04e2fa --- /dev/null +++ b/read_energy_ascii.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +import argparse +import math +import shutil +import signal +import sys +import threading +import time +from collections import deque +from dataclasses import dataclass +from pathlib import Path +from typing import Deque, Dict, List, Optional + +# Reuse SDR reading logic from read_energy.py +from read_energy import Row, Worker, parse_env, collect_targets, parse_hackrf_info + + +@dataclass +class MetricHistory: + ts: Deque[float] + dbfs: Deque[float] + rms: Deque[float] + power: Deque[float] + last_updated_at: float = 0.0 + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="ASCII realtime graphs for HackRF energy metrics") + p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)") + p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)") + p.add_argument("--focus", default="", help="Which label to draw charts for (default: first selected)") + p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)") + p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate Hz (HackRF min ~2e6)") + p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") + p.add_argument("--interval", type=float, default=0.4, help="Per-device read interval (s)") + p.add_argument("--refresh", type=float, default=0.5, help="Screen refresh interval (s)") + p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)") + p.add_argument("--gain", type=float, default=16.0, help="General gain") + p.add_argument("--if-gain", type=float, default=16.0, help="IF gain") + p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") + p.add_argument("--history-sec", type=float, default=60.0, help="History window in seconds") + p.add_argument("--plot-height", type=int, default=10, help="Chart height (rows)") + p.add_argument("--plot-width", type=int, default=0, help="Chart width (cols), 0=auto") + return p + + +def prune_history(h: MetricHistory, cutoff: float) -> None: + while h.ts and h.ts[0] < cutoff: + h.ts.popleft(); h.dbfs.popleft(); h.rms.popleft(); h.power.popleft() + + +def maybe_append(history: MetricHistory, row: Row) -> None: + if row.updated_at <= 0: + return + if row.updated_at <= history.last_updated_at: + return + history.last_updated_at = row.updated_at + if row.status != "OK": + return + if row.dbfs is None or row.rms is None or row.power_lin is None: + return + history.ts.append(row.updated_at) + history.dbfs.append(float(row.dbfs)) + history.rms.append(float(row.rms)) + history.power.append(float(row.power_lin)) + + +def _map_series(ts: List[float], ys: List[float], now: float, span: float, width: int, height: int, + y_floor: Optional[float] = None, clip_floor: bool = False): + if not ts or not ys: + return None + start = now - span + pts = [(t, y) for t, y in zip(ts, ys) if t >= start and math.isfinite(y)] + if not pts: + return None + vals = [] + for _, y in pts: + if clip_floor and y_floor is not None and y < y_floor: + y = y_floor + vals.append(y) + ymin = min(vals) + ymax = max(vals) + if y_floor is not None and ymin < y_floor: + ymin = y_floor + if not math.isfinite(ymin) or not math.isfinite(ymax): + return None + if ymax == ymin: + pad = abs(ymax) * 0.05 or 1.0 + ymin -= pad + ymax += pad + else: + pad = (ymax - ymin) * 0.08 + ymin -= pad + ymax += pad + if y_floor is not None and ymin < y_floor: + ymin = y_floor + if ymax <= ymin: + ymax = ymin + 1.0 + + mapped = [] + for t, y in pts: + if clip_floor and y_floor is not None and y < y_floor: + y = y_floor + xr = (t - start) / span if span > 0 else 1.0 + xr = 0.0 if xr < 0 else (1.0 if xr > 1 else xr) + c = int(round(xr * (width - 1))) if width > 1 else 0 + yr = (y - ymin) / (ymax - ymin) + yr = 0.0 if yr < 0 else (1.0 if yr > 1 else yr) + r = height - 1 - int(round(yr * (height - 1))) + mapped.append((c, r)) + return mapped, ymin, ymax, vals[-1] + + +def _draw_line(grid: List[List[str]], x0: int, y0: int, x1: int, y1: int, ch: str = '*') -> None: + dx = x1 - x0 + dy = y1 - y0 + steps = max(abs(dx), abs(dy), 1) + for i in range(steps + 1): + x = int(round(x0 + dx * i / steps)) + y = int(round(y0 + dy * i / steps)) + if 0 <= y < len(grid) and 0 <= x < len(grid[0]): + grid[y][x] = ch + + +def render_chart(title: str, unit: str, ts: List[float], ys: List[float], now: float, + span: float, width: int, height: int, + y_floor: Optional[float] = None, clip_floor: bool = False) -> List[str]: + plot = _map_series(ts, ys, now, span, width, height, y_floor=y_floor, clip_floor=clip_floor) + lines: List[str] = [] + if plot is None: + lines.append(f"{title} ({unit}) | no samples in last {span:.0f}s") + for _ in range(height): + lines.append(" " * (10 + 3 + width)) + lines.append(f"{'':>10} +{'-' * width}") + lines.append(f"{'':>10} t-{int(span)}s{' ' * max(1, width - 8)}now") + return lines + + points, ymin, ymax, last = plot + grid = [[' ' for _ in range(width)] for _ in range(height)] + + for row_idx in range(height): + if row_idx == height // 2: + for c in range(width): + grid[row_idx][c] = '.' + + prev = None + for c, r in points: + if prev is not None: + _draw_line(grid, prev[0], prev[1], c, r, '*') + prev = (c, r) + + ytop = f"{ymax:.2f}" if abs(ymax) < 1e4 else f"{ymax:.2e}" + ybot = f"{ymin:.2f}" if abs(ymin) < 1e4 else f"{ymin:.2e}" + ylast = f"{last:.2f}" if abs(last) < 1e4 else f"{last:.2e}" + lines.append(f"{title} ({unit}) | last={ylast} min={ybot} max={ytop}") + + for i, row in enumerate(grid): + if i == 0: + lbl = ytop + elif i == height - 1: + lbl = ybot + else: + lbl = "" + lines.append(f"{lbl:>10} |{''.join(row)}") + + lines.append(f"{'':>10} +{'-' * width}") + left = f"t-{int(span)}s" + right = "now" + spacer = max(1, width - len(left) - len(right)) + lines.append(f"{'':>10} {left}{' ' * spacer}{right}") + return lines + + +def render_screen(rows: Dict[str, Row], histories: Dict[str, MetricHistory], focus_label: str, + started: float, history_sec: float, plot_height: int, plot_width_arg: int, + env_path: Path, discovered: int) -> None: + now = time.time() + term_w = shutil.get_terminal_size((140, 50)).columns + plot_width = plot_width_arg if plot_width_arg > 0 else max(40, min(term_w - 14, 140)) + + print("\x1b[2J\x1b[H", end="") + print("HackRF Energy ASCII Monitor (relative levels: dBFS/RMS/power, not dBm)") + print(f"env: {env_path} | discovered: {discovered} | uptime: {int(now-started)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}") + print() + + header = f"{'band':>5} {'idx':>3} {'status':>9} {'dBFS':>9} {'RMS':>10} {'power':>12} {'N':>5} {'age':>5} {'serial':>12}" + print(header) + print('-' * len(header)) + for label in sorted(rows, key=lambda x: int(x)): + r = rows[label] + idx = '-' if r.index is None else str(r.index) + age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}" + dbfs = '-' if r.dbfs is None else f"{r.dbfs:.2f}" + rms = '-' if r.rms is None else f"{r.rms:.6f}" + pwr = '-' if r.power_lin is None else (f"{r.power_lin:.8f}" if abs(r.power_lin) < 1e4 else f"{r.power_lin:.3e}") + print(f"{label:>5} {idx:>3} {r.status:>9} {dbfs:>9} {rms:>10} {pwr:>12} {r.samples:>5} {age:>5} {r.serial[-12:]:>12}") + + print() + if focus_label not in histories: + print(f"focus label '{focus_label}' not found") + print("Use --only or --focus to select a valid label.") + sys.stdout.flush() + return + + h = histories[focus_label] + ts = list(h.ts) + print(f"Focus band: {focus_label}") + print() + for line in render_chart("dBFS vs time", "dBFS", ts, list(h.dbfs), now, history_sec, plot_width, plot_height, y_floor=-50.0, clip_floor=True): + print(line) + print() + for line in render_chart("RMS vs time", "RMS", ts, list(h.rms), now, history_sec, plot_width, plot_height): + print(line) + print() + for line in render_chart("Power vs time", "|IQ|^2", ts, list(h.power), now, history_sec, plot_width, plot_height): + print(line) + print() + print("Ctrl+C to stop. If status=BUSY, corresponding SDR service is using the HackRF.") + sys.stdout.flush() + + +def main() -> int: + args = build_parser().parse_args() + only = {x.strip() for x in args.only.split(',') if x.strip()} or None + + env_path = Path(args.env) + if not env_path.is_absolute(): + env_path = (Path(__file__).resolve().parent / env_path).resolve() + + env = parse_env(env_path) + targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz) + if not targets: + print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr) + return 2 + + serial_to_index = parse_hackrf_info() + + rows: Dict[str, Row] = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets} + histories: Dict[str, MetricHistory] = { + t.label: MetricHistory(deque(), deque(), deque(), deque()) for t in targets + } + + focus_label = (args.focus.strip() if args.focus.strip() else sorted(rows, key=lambda x: int(x))[0]) + + lock = threading.Lock() + stop_event = threading.Event() + + def _stop(signum, frame): + stop_event.set() + + signal.signal(signal.SIGINT, _stop) + signal.signal(signal.SIGTERM, _stop) + + workers: List[Worker] = [] + for i, t in enumerate(targets): + wa = argparse.Namespace(**vars(args)) + wa.stagger = i * 0.15 + w = Worker(t, serial_to_index, rows, lock, stop_event, wa) + w.start() + workers.append(w) + + started = time.time() + try: + while not stop_event.is_set(): + cutoff = time.time() - args.history_sec + with lock: + snap = {k: Row(**vars(v)) for k, v in rows.items()} + for label, row in snap.items(): + maybe_append(histories[label], row) + prune_history(histories[label], cutoff) + render_screen(snap, histories, focus_label, started, args.history_sec, + args.plot_height, args.plot_width, env_path, len(serial_to_index)) + stop_event.wait(args.refresh) + finally: + stop_event.set() + for w in workers: + w.join(timeout=2.0) + return 0 + + +if __name__ == '__main__': + raise SystemExit(main())