#!/usr/bin/env python3 import argparse import math import shutil import signal import sys import threading import time from collections import deque from dataclasses import dataclass from pathlib import Path from typing import Deque, Dict, List, Optional # Reuse SDR reading logic from read_energy.py from read_energy import Row, Worker, parse_env, collect_targets, parse_hackrf_info @dataclass class MetricHistory: ts: Deque[float] dbfs: Deque[float] rms: Deque[float] power: Deque[float] last_updated_at: float = 0.0 def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="ASCII realtime graphs for HackRF energy metrics") p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)") p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)") p.add_argument("--focus", default="", help="Which label to draw charts for (default: first selected)") p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)") p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate Hz (HackRF min ~2e6)") p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") p.add_argument("--interval", type=float, default=0.4, help="Per-device read interval (s)") p.add_argument("--refresh", type=float, default=0.5, help="Screen refresh interval (s)") p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)") p.add_argument("--gain", type=float, default=16.0, help="General gain") p.add_argument("--if-gain", type=float, default=16.0, help="IF gain") p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") p.add_argument("--history-sec", type=float, default=60.0, help="History window in seconds") p.add_argument("--plot-height", type=int, default=10, help="Chart height (rows)") p.add_argument("--plot-width", type=int, default=0, help="Chart width (cols), 0=auto") return p def prune_history(h: MetricHistory, cutoff: float) -> None: while h.ts and h.ts[0] < cutoff: h.ts.popleft(); h.dbfs.popleft(); h.rms.popleft(); h.power.popleft() def maybe_append(history: MetricHistory, row: Row) -> None: if row.updated_at <= 0: return if row.updated_at <= history.last_updated_at: return history.last_updated_at = row.updated_at if row.status != "OK": return if row.dbfs is None or row.rms is None or row.power_lin is None: return history.ts.append(row.updated_at) history.dbfs.append(float(row.dbfs)) history.rms.append(float(row.rms)) history.power.append(float(row.power_lin)) def _map_series(ts: List[float], ys: List[float], now: float, span: float, width: int, height: int, y_floor: Optional[float] = None, clip_floor: bool = False): if not ts or not ys: return None start = now - span pts = [(t, y) for t, y in zip(ts, ys) if t >= start and math.isfinite(y)] if not pts: return None vals = [] for _, y in pts: if clip_floor and y_floor is not None and y < y_floor: y = y_floor vals.append(y) ymin = min(vals) ymax = max(vals) if y_floor is not None and ymin < y_floor: ymin = y_floor if not math.isfinite(ymin) or not math.isfinite(ymax): return None if ymax == ymin: pad = abs(ymax) * 0.05 or 1.0 ymin -= pad ymax += pad else: pad = (ymax - ymin) * 0.08 ymin -= pad ymax += pad if y_floor is not None and ymin < y_floor: ymin = y_floor if ymax <= ymin: ymax = ymin + 1.0 mapped = [] for t, y in pts: if clip_floor and y_floor is not None and y < y_floor: y = y_floor xr = (t - start) / span if span > 0 else 1.0 xr = 0.0 if xr < 0 else (1.0 if xr > 1 else xr) c = int(round(xr * (width - 1))) if width > 1 else 0 yr = (y - ymin) / (ymax - ymin) yr = 0.0 if yr < 0 else (1.0 if yr > 1 else yr) r = height - 1 - int(round(yr * (height - 1))) mapped.append((c, r)) return mapped, ymin, ymax, vals[-1] def _draw_line(grid: List[List[str]], x0: int, y0: int, x1: int, y1: int, ch: str = '*') -> None: dx = x1 - x0 dy = y1 - y0 steps = max(abs(dx), abs(dy), 1) for i in range(steps + 1): x = int(round(x0 + dx * i / steps)) y = int(round(y0 + dy * i / steps)) if 0 <= y < len(grid) and 0 <= x < len(grid[0]): grid[y][x] = ch def render_chart(title: str, unit: str, ts: List[float], ys: List[float], now: float, span: float, width: int, height: int, y_floor: Optional[float] = None, clip_floor: bool = False) -> List[str]: plot = _map_series(ts, ys, now, span, width, height, y_floor=y_floor, clip_floor=clip_floor) lines: List[str] = [] if plot is None: lines.append(f"{title} ({unit}) | no samples in last {span:.0f}s") for _ in range(height): lines.append(" " * (10 + 3 + width)) lines.append(f"{'':>10} +{'-' * width}") lines.append(f"{'':>10} t-{int(span)}s{' ' * max(1, width - 8)}now") return lines points, ymin, ymax, last = plot grid = [[' ' for _ in range(width)] for _ in range(height)] for row_idx in range(height): if row_idx == height // 2: for c in range(width): grid[row_idx][c] = '.' prev = None for c, r in points: if prev is not None: _draw_line(grid, prev[0], prev[1], c, r, '*') prev = (c, r) ytop = f"{ymax:.2f}" if abs(ymax) < 1e4 else f"{ymax:.2e}" ybot = f"{ymin:.2f}" if abs(ymin) < 1e4 else f"{ymin:.2e}" ylast = f"{last:.2f}" if abs(last) < 1e4 else f"{last:.2e}" lines.append(f"{title} ({unit}) | last={ylast} min={ybot} max={ytop}") for i, row in enumerate(grid): if i == 0: lbl = ytop elif i == height - 1: lbl = ybot else: lbl = "" lines.append(f"{lbl:>10} |{''.join(row)}") lines.append(f"{'':>10} +{'-' * width}") left = f"t-{int(span)}s" right = "now" spacer = max(1, width - len(left) - len(right)) lines.append(f"{'':>10} {left}{' ' * spacer}{right}") return lines def render_screen(rows: Dict[str, Row], histories: Dict[str, MetricHistory], focus_label: str, started: float, history_sec: float, plot_height: int, plot_width_arg: int, env_path: Path, discovered: int) -> None: now = time.time() term_w = shutil.get_terminal_size((140, 50)).columns plot_width = plot_width_arg if plot_width_arg > 0 else max(40, min(term_w - 14, 140)) print("\x1b[2J\x1b[H", end="") print("HackRF Energy ASCII Monitor (relative levels: dBFS/RMS/power, not dBm)") print(f"env: {env_path} | discovered: {discovered} | uptime: {int(now-started)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}") print() header = f"{'band':>5} {'idx':>3} {'status':>9} {'dBFS':>9} {'RMS':>10} {'power':>12} {'N':>5} {'age':>5} {'serial':>12}" print(header) print('-' * len(header)) for label in sorted(rows, key=lambda x: int(x)): r = rows[label] idx = '-' if r.index is None else str(r.index) age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}" dbfs = '-' if r.dbfs is None else f"{r.dbfs:.2f}" rms = '-' if r.rms is None else f"{r.rms:.6f}" pwr = '-' if r.power_lin is None else (f"{r.power_lin:.8f}" if abs(r.power_lin) < 1e4 else f"{r.power_lin:.3e}") print(f"{label:>5} {idx:>3} {r.status:>9} {dbfs:>9} {rms:>10} {pwr:>12} {r.samples:>5} {age:>5} {r.serial[-12:]:>12}") print() if focus_label not in histories: print(f"focus label '{focus_label}' not found") print("Use --only or --focus to select a valid label.") sys.stdout.flush() return h = histories[focus_label] ts = list(h.ts) print(f"Focus band: {focus_label}") print() for line in render_chart("dBFS vs time", "dBFS", ts, list(h.dbfs), now, history_sec, plot_width, plot_height, y_floor=-50.0, clip_floor=True): print(line) print() for line in render_chart("RMS vs time", "RMS", ts, list(h.rms), now, history_sec, plot_width, plot_height): print(line) print() for line in render_chart("Power vs time", "|IQ|^2", ts, list(h.power), now, history_sec, plot_width, plot_height): print(line) print() print("Ctrl+C to stop. If status=BUSY, corresponding SDR service is using the HackRF.") sys.stdout.flush() def main() -> int: args = build_parser().parse_args() only = {x.strip() for x in args.only.split(',') if x.strip()} or None env_path = Path(args.env) if not env_path.is_absolute(): env_path = (Path(__file__).resolve().parent / env_path).resolve() env = parse_env(env_path) targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz) if not targets: print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr) return 2 serial_to_index = parse_hackrf_info() rows: Dict[str, Row] = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets} histories: Dict[str, MetricHistory] = { t.label: MetricHistory(deque(), deque(), deque(), deque()) for t in targets } focus_label = (args.focus.strip() if args.focus.strip() else sorted(rows, key=lambda x: int(x))[0]) lock = threading.Lock() stop_event = threading.Event() def _stop(signum, frame): stop_event.set() signal.signal(signal.SIGINT, _stop) signal.signal(signal.SIGTERM, _stop) workers: List[Worker] = [] for i, t in enumerate(targets): wa = argparse.Namespace(**vars(args)) wa.stagger = i * 0.15 w = Worker(t, serial_to_index, rows, lock, stop_event, wa) w.start() workers.append(w) started = time.time() try: while not stop_event.is_set(): cutoff = time.time() - args.history_sec with lock: snap = {k: Row(**vars(v)) for k, v in rows.items()} for label, row in snap.items(): maybe_append(histories[label], row) prune_history(histories[label], cutoff) render_screen(snap, histories, focus_label, started, args.history_sec, args.plot_height, args.plot_width, env_path, len(serial_to_index)) stop_event.wait(args.refresh) finally: stop_event.set() for w in workers: w.join(timeout=2.0) return 0 if __name__ == '__main__': raise SystemExit(main())