You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
11 KiB
Python
283 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import math
|
|
import shutil
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Deque, Dict, List, Optional
|
|
|
|
# Reuse SDR reading logic from read_energy.py
|
|
from read_energy import Row, Worker, parse_env, collect_targets, parse_hackrf_info
|
|
|
|
|
|
@dataclass
|
|
class MetricHistory:
|
|
ts: Deque[float]
|
|
dbfs: Deque[float]
|
|
rms: Deque[float]
|
|
power: Deque[float]
|
|
last_updated_at: float = 0.0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
p = argparse.ArgumentParser(description="ASCII realtime graphs for HackRF energy metrics")
|
|
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
|
|
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
|
|
p.add_argument("--focus", default="", help="Which label to draw charts for (default: first selected)")
|
|
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
|
|
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate Hz (HackRF min ~2e6)")
|
|
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
|
|
p.add_argument("--interval", type=float, default=0.4, help="Per-device read interval (s)")
|
|
p.add_argument("--refresh", type=float, default=0.5, help="Screen refresh interval (s)")
|
|
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
|
|
p.add_argument("--gain", type=float, default=16.0, help="General gain")
|
|
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
|
|
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
|
|
p.add_argument("--history-sec", type=float, default=60.0, help="History window in seconds")
|
|
p.add_argument("--plot-height", type=int, default=10, help="Chart height (rows)")
|
|
p.add_argument("--plot-width", type=int, default=0, help="Chart width (cols), 0=auto")
|
|
return p
|
|
|
|
|
|
def prune_history(h: MetricHistory, cutoff: float) -> None:
|
|
while h.ts and h.ts[0] < cutoff:
|
|
h.ts.popleft(); h.dbfs.popleft(); h.rms.popleft(); h.power.popleft()
|
|
|
|
|
|
def maybe_append(history: MetricHistory, row: Row) -> None:
|
|
if row.updated_at <= 0:
|
|
return
|
|
if row.updated_at <= history.last_updated_at:
|
|
return
|
|
history.last_updated_at = row.updated_at
|
|
if row.status != "OK":
|
|
return
|
|
if row.dbfs is None or row.rms is None or row.power_lin is None:
|
|
return
|
|
history.ts.append(row.updated_at)
|
|
history.dbfs.append(float(row.dbfs))
|
|
history.rms.append(float(row.rms))
|
|
history.power.append(float(row.power_lin))
|
|
|
|
|
|
def _map_series(ts: List[float], ys: List[float], now: float, span: float, width: int, height: int,
|
|
y_floor: Optional[float] = None, clip_floor: bool = False):
|
|
if not ts or not ys:
|
|
return None
|
|
start = now - span
|
|
pts = [(t, y) for t, y in zip(ts, ys) if t >= start and math.isfinite(y)]
|
|
if not pts:
|
|
return None
|
|
vals = []
|
|
for _, y in pts:
|
|
if clip_floor and y_floor is not None and y < y_floor:
|
|
y = y_floor
|
|
vals.append(y)
|
|
ymin = min(vals)
|
|
ymax = max(vals)
|
|
if y_floor is not None and ymin < y_floor:
|
|
ymin = y_floor
|
|
if not math.isfinite(ymin) or not math.isfinite(ymax):
|
|
return None
|
|
if ymax == ymin:
|
|
pad = abs(ymax) * 0.05 or 1.0
|
|
ymin -= pad
|
|
ymax += pad
|
|
else:
|
|
pad = (ymax - ymin) * 0.08
|
|
ymin -= pad
|
|
ymax += pad
|
|
if y_floor is not None and ymin < y_floor:
|
|
ymin = y_floor
|
|
if ymax <= ymin:
|
|
ymax = ymin + 1.0
|
|
|
|
mapped = []
|
|
for t, y in pts:
|
|
if clip_floor and y_floor is not None and y < y_floor:
|
|
y = y_floor
|
|
xr = (t - start) / span if span > 0 else 1.0
|
|
xr = 0.0 if xr < 0 else (1.0 if xr > 1 else xr)
|
|
c = int(round(xr * (width - 1))) if width > 1 else 0
|
|
yr = (y - ymin) / (ymax - ymin)
|
|
yr = 0.0 if yr < 0 else (1.0 if yr > 1 else yr)
|
|
r = height - 1 - int(round(yr * (height - 1)))
|
|
mapped.append((c, r))
|
|
return mapped, ymin, ymax, vals[-1]
|
|
|
|
|
|
def _draw_line(grid: List[List[str]], x0: int, y0: int, x1: int, y1: int, ch: str = '*') -> None:
|
|
dx = x1 - x0
|
|
dy = y1 - y0
|
|
steps = max(abs(dx), abs(dy), 1)
|
|
for i in range(steps + 1):
|
|
x = int(round(x0 + dx * i / steps))
|
|
y = int(round(y0 + dy * i / steps))
|
|
if 0 <= y < len(grid) and 0 <= x < len(grid[0]):
|
|
grid[y][x] = ch
|
|
|
|
|
|
def render_chart(title: str, unit: str, ts: List[float], ys: List[float], now: float,
|
|
span: float, width: int, height: int,
|
|
y_floor: Optional[float] = None, clip_floor: bool = False) -> List[str]:
|
|
plot = _map_series(ts, ys, now, span, width, height, y_floor=y_floor, clip_floor=clip_floor)
|
|
lines: List[str] = []
|
|
if plot is None:
|
|
lines.append(f"{title} ({unit}) | no samples in last {span:.0f}s")
|
|
for _ in range(height):
|
|
lines.append(" " * (10 + 3 + width))
|
|
lines.append(f"{'':>10} +{'-' * width}")
|
|
lines.append(f"{'':>10} t-{int(span)}s{' ' * max(1, width - 8)}now")
|
|
return lines
|
|
|
|
points, ymin, ymax, last = plot
|
|
grid = [[' ' for _ in range(width)] for _ in range(height)]
|
|
|
|
for row_idx in range(height):
|
|
if row_idx == height // 2:
|
|
for c in range(width):
|
|
grid[row_idx][c] = '.'
|
|
|
|
prev = None
|
|
for c, r in points:
|
|
if prev is not None:
|
|
_draw_line(grid, prev[0], prev[1], c, r, '*')
|
|
prev = (c, r)
|
|
|
|
ytop = f"{ymax:.2f}" if abs(ymax) < 1e4 else f"{ymax:.2e}"
|
|
ybot = f"{ymin:.2f}" if abs(ymin) < 1e4 else f"{ymin:.2e}"
|
|
ylast = f"{last:.2f}" if abs(last) < 1e4 else f"{last:.2e}"
|
|
lines.append(f"{title} ({unit}) | last={ylast} min={ybot} max={ytop}")
|
|
|
|
for i, row in enumerate(grid):
|
|
if i == 0:
|
|
lbl = ytop
|
|
elif i == height - 1:
|
|
lbl = ybot
|
|
else:
|
|
lbl = ""
|
|
lines.append(f"{lbl:>10} |{''.join(row)}")
|
|
|
|
lines.append(f"{'':>10} +{'-' * width}")
|
|
left = f"t-{int(span)}s"
|
|
right = "now"
|
|
spacer = max(1, width - len(left) - len(right))
|
|
lines.append(f"{'':>10} {left}{' ' * spacer}{right}")
|
|
return lines
|
|
|
|
|
|
def render_screen(rows: Dict[str, Row], histories: Dict[str, MetricHistory], focus_label: str,
|
|
started: float, history_sec: float, plot_height: int, plot_width_arg: int,
|
|
env_path: Path, discovered: int) -> None:
|
|
now = time.time()
|
|
term_w = shutil.get_terminal_size((140, 50)).columns
|
|
plot_width = plot_width_arg if plot_width_arg > 0 else max(40, min(term_w - 14, 140))
|
|
|
|
print("\x1b[2J\x1b[H", end="")
|
|
print("HackRF Energy ASCII Monitor (relative levels: dBFS/RMS/power, not dBm)")
|
|
print(f"env: {env_path} | discovered: {discovered} | uptime: {int(now-started)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print()
|
|
|
|
header = f"{'band':>5} {'idx':>3} {'status':>9} {'dBFS':>9} {'RMS':>10} {'power':>12} {'N':>5} {'age':>5} {'serial':>12}"
|
|
print(header)
|
|
print('-' * len(header))
|
|
for label in sorted(rows, key=lambda x: int(x)):
|
|
r = rows[label]
|
|
idx = '-' if r.index is None else str(r.index)
|
|
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
|
|
dbfs = '-' if r.dbfs is None else f"{r.dbfs:.2f}"
|
|
rms = '-' if r.rms is None else f"{r.rms:.6f}"
|
|
pwr = '-' if r.power_lin is None else (f"{r.power_lin:.8f}" if abs(r.power_lin) < 1e4 else f"{r.power_lin:.3e}")
|
|
print(f"{label:>5} {idx:>3} {r.status:>9} {dbfs:>9} {rms:>10} {pwr:>12} {r.samples:>5} {age:>5} {r.serial[-12:]:>12}")
|
|
|
|
print()
|
|
if focus_label not in histories:
|
|
print(f"focus label '{focus_label}' not found")
|
|
print("Use --only or --focus to select a valid label.")
|
|
sys.stdout.flush()
|
|
return
|
|
|
|
h = histories[focus_label]
|
|
ts = list(h.ts)
|
|
print(f"Focus band: {focus_label}")
|
|
print()
|
|
for line in render_chart("dBFS vs time", "dBFS", ts, list(h.dbfs), now, history_sec, plot_width, plot_height, y_floor=-50.0, clip_floor=True):
|
|
print(line)
|
|
print()
|
|
for line in render_chart("RMS vs time", "RMS", ts, list(h.rms), now, history_sec, plot_width, plot_height):
|
|
print(line)
|
|
print()
|
|
for line in render_chart("Power vs time", "|IQ|^2", ts, list(h.power), now, history_sec, plot_width, plot_height):
|
|
print(line)
|
|
print()
|
|
print("Ctrl+C to stop. If status=BUSY, corresponding SDR service is using the HackRF.")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def main() -> int:
|
|
args = build_parser().parse_args()
|
|
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
|
|
|
|
env_path = Path(args.env)
|
|
if not env_path.is_absolute():
|
|
env_path = (Path(__file__).resolve().parent / env_path).resolve()
|
|
|
|
env = parse_env(env_path)
|
|
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
|
|
if not targets:
|
|
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
|
|
return 2
|
|
|
|
serial_to_index = parse_hackrf_info()
|
|
|
|
rows: Dict[str, Row] = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
|
|
histories: Dict[str, MetricHistory] = {
|
|
t.label: MetricHistory(deque(), deque(), deque(), deque()) for t in targets
|
|
}
|
|
|
|
focus_label = (args.focus.strip() if args.focus.strip() else sorted(rows, key=lambda x: int(x))[0])
|
|
|
|
lock = threading.Lock()
|
|
stop_event = threading.Event()
|
|
|
|
def _stop(signum, frame):
|
|
stop_event.set()
|
|
|
|
signal.signal(signal.SIGINT, _stop)
|
|
signal.signal(signal.SIGTERM, _stop)
|
|
|
|
workers: List[Worker] = []
|
|
for i, t in enumerate(targets):
|
|
wa = argparse.Namespace(**vars(args))
|
|
wa.stagger = i * 0.15
|
|
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
|
|
w.start()
|
|
workers.append(w)
|
|
|
|
started = time.time()
|
|
try:
|
|
while not stop_event.is_set():
|
|
cutoff = time.time() - args.history_sec
|
|
with lock:
|
|
snap = {k: Row(**vars(v)) for k, v in rows.items()}
|
|
for label, row in snap.items():
|
|
maybe_append(histories[label], row)
|
|
prune_history(histories[label], cutoff)
|
|
render_screen(snap, histories, focus_label, started, args.history_sec,
|
|
args.plot_height, args.plot_width, env_path, len(serial_to_index))
|
|
stop_event.wait(args.refresh)
|
|
finally:
|
|
stop_event.set()
|
|
for w in workers:
|
|
w.join(timeout=2.0)
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
raise SystemExit(main())
|