You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/read_energy_ascii.py

283 lines
11 KiB
Python

#!/usr/bin/env python3
import argparse
import math
import shutil
import signal
import sys
import threading
import time
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Deque, Dict, List, Optional
# Reuse SDR reading logic from read_energy.py
from read_energy import Row, Worker, parse_env, collect_targets, parse_hackrf_info
@dataclass
class MetricHistory:
ts: Deque[float]
dbfs: Deque[float]
rms: Deque[float]
power: Deque[float]
last_updated_at: float = 0.0
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="ASCII realtime graphs for HackRF energy metrics")
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
p.add_argument("--focus", default="", help="Which label to draw charts for (default: first selected)")
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate Hz (HackRF min ~2e6)")
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
p.add_argument("--interval", type=float, default=0.4, help="Per-device read interval (s)")
p.add_argument("--refresh", type=float, default=0.5, help="Screen refresh interval (s)")
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
p.add_argument("--gain", type=float, default=16.0, help="General gain")
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
p.add_argument("--history-sec", type=float, default=60.0, help="History window in seconds")
p.add_argument("--plot-height", type=int, default=10, help="Chart height (rows)")
p.add_argument("--plot-width", type=int, default=0, help="Chart width (cols), 0=auto")
return p
def prune_history(h: MetricHistory, cutoff: float) -> None:
while h.ts and h.ts[0] < cutoff:
h.ts.popleft(); h.dbfs.popleft(); h.rms.popleft(); h.power.popleft()
def maybe_append(history: MetricHistory, row: Row) -> None:
if row.updated_at <= 0:
return
if row.updated_at <= history.last_updated_at:
return
history.last_updated_at = row.updated_at
if row.status != "OK":
return
if row.dbfs is None or row.rms is None or row.power_lin is None:
return
history.ts.append(row.updated_at)
history.dbfs.append(float(row.dbfs))
history.rms.append(float(row.rms))
history.power.append(float(row.power_lin))
def _map_series(ts: List[float], ys: List[float], now: float, span: float, width: int, height: int,
y_floor: Optional[float] = None, clip_floor: bool = False):
if not ts or not ys:
return None
start = now - span
pts = [(t, y) for t, y in zip(ts, ys) if t >= start and math.isfinite(y)]
if not pts:
return None
vals = []
for _, y in pts:
if clip_floor and y_floor is not None and y < y_floor:
y = y_floor
vals.append(y)
ymin = min(vals)
ymax = max(vals)
if y_floor is not None and ymin < y_floor:
ymin = y_floor
if not math.isfinite(ymin) or not math.isfinite(ymax):
return None
if ymax == ymin:
pad = abs(ymax) * 0.05 or 1.0
ymin -= pad
ymax += pad
else:
pad = (ymax - ymin) * 0.08
ymin -= pad
ymax += pad
if y_floor is not None and ymin < y_floor:
ymin = y_floor
if ymax <= ymin:
ymax = ymin + 1.0
mapped = []
for t, y in pts:
if clip_floor and y_floor is not None and y < y_floor:
y = y_floor
xr = (t - start) / span if span > 0 else 1.0
xr = 0.0 if xr < 0 else (1.0 if xr > 1 else xr)
c = int(round(xr * (width - 1))) if width > 1 else 0
yr = (y - ymin) / (ymax - ymin)
yr = 0.0 if yr < 0 else (1.0 if yr > 1 else yr)
r = height - 1 - int(round(yr * (height - 1)))
mapped.append((c, r))
return mapped, ymin, ymax, vals[-1]
def _draw_line(grid: List[List[str]], x0: int, y0: int, x1: int, y1: int, ch: str = '*') -> None:
dx = x1 - x0
dy = y1 - y0
steps = max(abs(dx), abs(dy), 1)
for i in range(steps + 1):
x = int(round(x0 + dx * i / steps))
y = int(round(y0 + dy * i / steps))
if 0 <= y < len(grid) and 0 <= x < len(grid[0]):
grid[y][x] = ch
def render_chart(title: str, unit: str, ts: List[float], ys: List[float], now: float,
span: float, width: int, height: int,
y_floor: Optional[float] = None, clip_floor: bool = False) -> List[str]:
plot = _map_series(ts, ys, now, span, width, height, y_floor=y_floor, clip_floor=clip_floor)
lines: List[str] = []
if plot is None:
lines.append(f"{title} ({unit}) | no samples in last {span:.0f}s")
for _ in range(height):
lines.append(" " * (10 + 3 + width))
lines.append(f"{'':>10} +{'-' * width}")
lines.append(f"{'':>10} t-{int(span)}s{' ' * max(1, width - 8)}now")
return lines
points, ymin, ymax, last = plot
grid = [[' ' for _ in range(width)] for _ in range(height)]
for row_idx in range(height):
if row_idx == height // 2:
for c in range(width):
grid[row_idx][c] = '.'
prev = None
for c, r in points:
if prev is not None:
_draw_line(grid, prev[0], prev[1], c, r, '*')
prev = (c, r)
ytop = f"{ymax:.2f}" if abs(ymax) < 1e4 else f"{ymax:.2e}"
ybot = f"{ymin:.2f}" if abs(ymin) < 1e4 else f"{ymin:.2e}"
ylast = f"{last:.2f}" if abs(last) < 1e4 else f"{last:.2e}"
lines.append(f"{title} ({unit}) | last={ylast} min={ybot} max={ytop}")
for i, row in enumerate(grid):
if i == 0:
lbl = ytop
elif i == height - 1:
lbl = ybot
else:
lbl = ""
lines.append(f"{lbl:>10} |{''.join(row)}")
lines.append(f"{'':>10} +{'-' * width}")
left = f"t-{int(span)}s"
right = "now"
spacer = max(1, width - len(left) - len(right))
lines.append(f"{'':>10} {left}{' ' * spacer}{right}")
return lines
def render_screen(rows: Dict[str, Row], histories: Dict[str, MetricHistory], focus_label: str,
started: float, history_sec: float, plot_height: int, plot_width_arg: int,
env_path: Path, discovered: int) -> None:
now = time.time()
term_w = shutil.get_terminal_size((140, 50)).columns
plot_width = plot_width_arg if plot_width_arg > 0 else max(40, min(term_w - 14, 140))
print("\x1b[2J\x1b[H", end="")
print("HackRF Energy ASCII Monitor (relative levels: dBFS/RMS/power, not dBm)")
print(f"env: {env_path} | discovered: {discovered} | uptime: {int(now-started)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
header = f"{'band':>5} {'idx':>3} {'status':>9} {'dBFS':>9} {'RMS':>10} {'power':>12} {'N':>5} {'age':>5} {'serial':>12}"
print(header)
print('-' * len(header))
for label in sorted(rows, key=lambda x: int(x)):
r = rows[label]
idx = '-' if r.index is None else str(r.index)
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
dbfs = '-' if r.dbfs is None else f"{r.dbfs:.2f}"
rms = '-' if r.rms is None else f"{r.rms:.6f}"
pwr = '-' if r.power_lin is None else (f"{r.power_lin:.8f}" if abs(r.power_lin) < 1e4 else f"{r.power_lin:.3e}")
print(f"{label:>5} {idx:>3} {r.status:>9} {dbfs:>9} {rms:>10} {pwr:>12} {r.samples:>5} {age:>5} {r.serial[-12:]:>12}")
print()
if focus_label not in histories:
print(f"focus label '{focus_label}' not found")
print("Use --only or --focus to select a valid label.")
sys.stdout.flush()
return
h = histories[focus_label]
ts = list(h.ts)
print(f"Focus band: {focus_label}")
print()
for line in render_chart("dBFS vs time", "dBFS", ts, list(h.dbfs), now, history_sec, plot_width, plot_height, y_floor=-50.0, clip_floor=True):
print(line)
print()
for line in render_chart("RMS vs time", "RMS", ts, list(h.rms), now, history_sec, plot_width, plot_height):
print(line)
print()
for line in render_chart("Power vs time", "|IQ|^2", ts, list(h.power), now, history_sec, plot_width, plot_height):
print(line)
print()
print("Ctrl+C to stop. If status=BUSY, corresponding SDR service is using the HackRF.")
sys.stdout.flush()
def main() -> int:
args = build_parser().parse_args()
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
env_path = Path(args.env)
if not env_path.is_absolute():
env_path = (Path(__file__).resolve().parent / env_path).resolve()
env = parse_env(env_path)
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
if not targets:
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
return 2
serial_to_index = parse_hackrf_info()
rows: Dict[str, Row] = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
histories: Dict[str, MetricHistory] = {
t.label: MetricHistory(deque(), deque(), deque(), deque()) for t in targets
}
focus_label = (args.focus.strip() if args.focus.strip() else sorted(rows, key=lambda x: int(x))[0])
lock = threading.Lock()
stop_event = threading.Event()
def _stop(signum, frame):
stop_event.set()
signal.signal(signal.SIGINT, _stop)
signal.signal(signal.SIGTERM, _stop)
workers: List[Worker] = []
for i, t in enumerate(targets):
wa = argparse.Namespace(**vars(args))
wa.stagger = i * 0.15
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
w.start()
workers.append(w)
started = time.time()
try:
while not stop_event.is_set():
cutoff = time.time() - args.history_sec
with lock:
snap = {k: Row(**vars(v)) for k, v in rows.items()}
for label, row in snap.items():
maybe_append(histories[label], row)
prune_history(histories[label], cutoff)
render_screen(snap, histories, focus_label, started, args.history_sec,
args.plot_height, args.plot_width, env_path, len(serial_to_index))
stop_event.wait(args.refresh)
finally:
stop_event.set()
for w in workers:
w.join(timeout=2.0)
return 0
if __name__ == '__main__':
raise SystemExit(main())