Compare commits
No commits in common. '9f3fbd46c7fd6dc1807ec4a14b513dd7c9d1918f' and '378fa1c6442283cdf90adaaf7952a6a2f2406f7f' have entirely different histories.
9f3fbd46c7
...
378fa1c644
@ -1,321 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import math
|
||||
import re
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
try:
|
||||
import numpy as np
|
||||
except Exception as exc:
|
||||
print(f"numpy import failed: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
from gnuradio import blocks, gr
|
||||
import osmosdr
|
||||
except Exception as exc:
|
||||
print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr)
|
||||
print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy.py", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
EPS = 1e-20
|
||||
|
||||
|
||||
@dataclass
|
||||
class Target:
|
||||
label: str
|
||||
serial: str
|
||||
freq_hz: float
|
||||
source: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Row:
|
||||
label: str
|
||||
serial: str
|
||||
index: Optional[int] = None
|
||||
freq_hz: float = 0.0
|
||||
status: str = "INIT"
|
||||
rms: Optional[float] = None
|
||||
power_lin: Optional[float] = None
|
||||
dbfs: Optional[float] = None
|
||||
samples: int = 0
|
||||
updated_at: float = 0.0
|
||||
error: str = ""
|
||||
|
||||
|
||||
class ProbeTop(gr.top_block):
|
||||
def __init__(self, index: int, freq_hz: float, sample_rate: float, vec_len: int,
|
||||
gain: float, if_gain: float, bb_gain: float):
|
||||
super().__init__("hackrf_energy_probe")
|
||||
self.probe = blocks.probe_signal_vc(vec_len)
|
||||
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
|
||||
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
|
||||
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
|
||||
self.src.set_sample_rate(sample_rate)
|
||||
self.src.set_center_freq(freq_hz, 0)
|
||||
try:
|
||||
self.src.set_freq_corr(0, 0)
|
||||
except Exception:
|
||||
pass
|
||||
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
|
||||
try:
|
||||
getattr(self.src, fn)(val, 0)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self.src.set_bandwidth(0, 0)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self.src.set_antenna('', 0)
|
||||
except Exception:
|
||||
pass
|
||||
self.connect((self.src, 0), (self.stream_to_vec, 0))
|
||||
self.connect((self.stream_to_vec, 0), (self.probe, 0))
|
||||
|
||||
def read_metrics(self) -> Tuple[float, float, float, int]:
|
||||
arr = np.asarray(self.probe.level(), dtype=np.complex64)
|
||||
if arr.size == 0:
|
||||
raise RuntimeError("no samples")
|
||||
p = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
|
||||
rms = math.sqrt(max(p, 0.0))
|
||||
dbfs = 10.0 * math.log10(max(p, EPS))
|
||||
return rms, p, dbfs, int(arr.size)
|
||||
|
||||
|
||||
class Worker(threading.Thread):
|
||||
def __init__(self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row],
|
||||
lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace):
|
||||
super().__init__(daemon=True)
|
||||
self.target = target
|
||||
self.serial_to_index = serial_to_index
|
||||
self.rows = rows
|
||||
self.lock = lock
|
||||
self.stop_event = stop_event
|
||||
self.args = args
|
||||
self.tb: Optional[ProbeTop] = None
|
||||
|
||||
def _set_row(self, **kwargs):
|
||||
with self.lock:
|
||||
row = self.rows[self.target.label]
|
||||
for k, v in kwargs.items():
|
||||
setattr(row, k, v)
|
||||
row.updated_at = time.time()
|
||||
|
||||
def _open(self) -> bool:
|
||||
idx = self.serial_to_index.get(self.target.serial)
|
||||
if idx is None:
|
||||
self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None)
|
||||
return False
|
||||
self._set_row(index=idx, status="OPENING", error="", freq_hz=self.target.freq_hz)
|
||||
try:
|
||||
self.tb = ProbeTop(
|
||||
idx, self.target.freq_hz, self.args.sample_rate, self.args.vec_len,
|
||||
self.args.gain, self.args.if_gain, self.args.bb_gain
|
||||
)
|
||||
self.tb.start()
|
||||
time.sleep(0.15)
|
||||
self._set_row(status="OK", error="")
|
||||
return True
|
||||
except Exception as exc:
|
||||
msg = str(exc)
|
||||
status = "BUSY" if ("Resource busy" in msg or "-1000" in msg) else "ERR"
|
||||
self._set_row(status=status, error=msg)
|
||||
self.tb = None
|
||||
return False
|
||||
|
||||
def _close(self):
|
||||
if self.tb is not None:
|
||||
try:
|
||||
self.tb.stop()
|
||||
self.tb.wait()
|
||||
except Exception:
|
||||
pass
|
||||
self.tb = None
|
||||
|
||||
def run(self):
|
||||
time.sleep(self.args.stagger)
|
||||
while not self.stop_event.is_set():
|
||||
if self.tb is None and not self._open():
|
||||
if self.stop_event.wait(self.args.reopen_delay):
|
||||
break
|
||||
continue
|
||||
try:
|
||||
rms, p, dbfs, n = self.tb.read_metrics()
|
||||
self._set_row(status="OK", rms=rms, power_lin=p, dbfs=dbfs, samples=n, error="")
|
||||
except Exception as exc:
|
||||
self._set_row(status="ERR", error=str(exc))
|
||||
self._close()
|
||||
if self.stop_event.wait(self.args.reopen_delay):
|
||||
break
|
||||
continue
|
||||
if self.stop_event.wait(self.args.interval):
|
||||
break
|
||||
self._close()
|
||||
|
||||
|
||||
def parse_env(path: Path) -> Dict[str, str]:
|
||||
out: Dict[str, str] = {}
|
||||
if not path.exists():
|
||||
return out
|
||||
for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
||||
line = raw.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
k, v = line.split("=", 1)
|
||||
out[k.strip()] = v.strip().strip('"').strip("'")
|
||||
return out
|
||||
|
||||
|
||||
def collect_targets(env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float]) -> List[Target]:
|
||||
targets: List[Target] = []
|
||||
for k, v in env.items():
|
||||
m = re.fullmatch(r"hack_(\d+)", k)
|
||||
if m:
|
||||
label = m.group(1)
|
||||
else:
|
||||
m = re.fullmatch(r"HACKID_(\d+)", k)
|
||||
if not m:
|
||||
continue
|
||||
label = m.group(1)
|
||||
if only and label not in only:
|
||||
continue
|
||||
mhz = override_freq_mhz if override_freq_mhz is not None else float(label)
|
||||
targets.append(Target(label=label, serial=v.lower(), freq_hz=mhz * 1e6, source=k))
|
||||
|
||||
uniq: Dict[str, Target] = {}
|
||||
for t in sorted(targets, key=lambda x: (int(x.label), 0 if x.source.startswith("hack_") else 1)):
|
||||
uniq.setdefault(t.label, t)
|
||||
return [uniq[k] for k in sorted(uniq, key=lambda x: int(x))]
|
||||
|
||||
|
||||
def parse_hackrf_info() -> Dict[str, int]:
|
||||
try:
|
||||
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
|
||||
except FileNotFoundError:
|
||||
raise RuntimeError("hackrf_info not found")
|
||||
except subprocess.TimeoutExpired:
|
||||
raise RuntimeError("hackrf_info timeout")
|
||||
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
|
||||
out: Dict[str, int] = {}
|
||||
cur_idx: Optional[int] = None
|
||||
for line in text.splitlines():
|
||||
m = re.search(r"^Index:\s*(\d+)", line)
|
||||
if m:
|
||||
cur_idx = int(m.group(1))
|
||||
continue
|
||||
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
|
||||
if m and cur_idx is not None:
|
||||
out[m.group(1).lower()] = cur_idx
|
||||
if not out:
|
||||
raise RuntimeError("no devices parsed from hackrf_info")
|
||||
return out
|
||||
|
||||
|
||||
def fmt(v: Optional[float], spec: str) -> str:
|
||||
return "-" if v is None else format(v, spec)
|
||||
|
||||
|
||||
def render(rows: Dict[str, Row], started_at: float, env_path: Path, serial_to_index: Dict[str, int]):
|
||||
now = time.time()
|
||||
print("\x1b[2J\x1b[H", end="")
|
||||
print("HackRF Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)")
|
||||
print(f"env: {env_path} | discovered: {len(serial_to_index)} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print()
|
||||
header = f"{'band':>5} {'idx':>3} {'freq':>7} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>5} {'age':>5} {'serial':>12} error"
|
||||
print(header)
|
||||
print('-' * len(header))
|
||||
for label in sorted(rows, key=lambda x: int(x)):
|
||||
r = rows[label]
|
||||
idx = '-' if r.index is None else str(r.index)
|
||||
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
|
||||
err = (r.error or "")
|
||||
if len(err) > 64:
|
||||
err = err[:61] + '...'
|
||||
print(
|
||||
f"{r.label:>5} {idx:>3} {r.freq_hz/1e6:>7.1f} {r.status:>9} "
|
||||
f"{fmt(r.rms, '.6f'):>10} {fmt(r.power_lin, '.8f'):>12} {fmt(r.dbfs, '.2f'):>9} "
|
||||
f"{r.samples:>5} {age:>5} {r.serial[-12:]:>12} {err}"
|
||||
)
|
||||
print()
|
||||
print("Ctrl+C to stop. Use --only 2400,5200 or --freq-mhz 2450 to limit/override tuning.")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(description="Realtime HackRF relative energy monitor")
|
||||
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
|
||||
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
|
||||
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
|
||||
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate in Hz (HackRF min ~2e6)")
|
||||
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
|
||||
p.add_argument("--interval", type=float, default=0.5, help="Per-device read interval (s)")
|
||||
p.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)")
|
||||
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
|
||||
p.add_argument("--gain", type=float, default=16.0, help="General gain")
|
||||
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
|
||||
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
|
||||
return p
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = build_parser().parse_args()
|
||||
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
|
||||
|
||||
env_path = Path(args.env)
|
||||
if not env_path.is_absolute():
|
||||
env_path = (Path(__file__).resolve().parent / env_path).resolve()
|
||||
env = parse_env(env_path)
|
||||
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
|
||||
if not targets:
|
||||
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
try:
|
||||
serial_to_index = parse_hackrf_info()
|
||||
except Exception as exc:
|
||||
print(f"hackrf discovery failed: {exc}", file=sys.stderr)
|
||||
return 3
|
||||
|
||||
rows = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
|
||||
lock = threading.Lock()
|
||||
stop_event = threading.Event()
|
||||
|
||||
def on_signal(signum, frame):
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGINT, on_signal)
|
||||
signal.signal(signal.SIGTERM, on_signal)
|
||||
|
||||
workers: List[Worker] = []
|
||||
for i, t in enumerate(targets):
|
||||
wa = argparse.Namespace(**vars(args))
|
||||
wa.stagger = i * 0.15
|
||||
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
|
||||
workers.append(w)
|
||||
w.start()
|
||||
|
||||
started = time.time()
|
||||
try:
|
||||
while not stop_event.is_set():
|
||||
with lock:
|
||||
snap = {k: Row(**vars(v)) for k, v in rows.items()}
|
||||
render(snap, started, env_path, serial_to_index)
|
||||
stop_event.wait(args.refresh)
|
||||
finally:
|
||||
stop_event.set()
|
||||
for w in workers:
|
||||
w.join(timeout=2)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
raise SystemExit(main())
|
||||
@ -1,282 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import math
|
||||
import shutil
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Deque, Dict, List, Optional
|
||||
|
||||
# Reuse SDR reading logic from read_energy.py
|
||||
from read_energy import Row, Worker, parse_env, collect_targets, parse_hackrf_info
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricHistory:
|
||||
ts: Deque[float]
|
||||
dbfs: Deque[float]
|
||||
rms: Deque[float]
|
||||
power: Deque[float]
|
||||
last_updated_at: float = 0.0
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(description="ASCII realtime graphs for HackRF energy metrics")
|
||||
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
|
||||
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
|
||||
p.add_argument("--focus", default="", help="Which label to draw charts for (default: first selected)")
|
||||
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
|
||||
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate Hz (HackRF min ~2e6)")
|
||||
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
|
||||
p.add_argument("--interval", type=float, default=0.4, help="Per-device read interval (s)")
|
||||
p.add_argument("--refresh", type=float, default=0.5, help="Screen refresh interval (s)")
|
||||
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
|
||||
p.add_argument("--gain", type=float, default=16.0, help="General gain")
|
||||
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
|
||||
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
|
||||
p.add_argument("--history-sec", type=float, default=60.0, help="History window in seconds")
|
||||
p.add_argument("--plot-height", type=int, default=10, help="Chart height (rows)")
|
||||
p.add_argument("--plot-width", type=int, default=0, help="Chart width (cols), 0=auto")
|
||||
return p
|
||||
|
||||
|
||||
def prune_history(h: MetricHistory, cutoff: float) -> None:
|
||||
while h.ts and h.ts[0] < cutoff:
|
||||
h.ts.popleft(); h.dbfs.popleft(); h.rms.popleft(); h.power.popleft()
|
||||
|
||||
|
||||
def maybe_append(history: MetricHistory, row: Row) -> None:
|
||||
if row.updated_at <= 0:
|
||||
return
|
||||
if row.updated_at <= history.last_updated_at:
|
||||
return
|
||||
history.last_updated_at = row.updated_at
|
||||
if row.status != "OK":
|
||||
return
|
||||
if row.dbfs is None or row.rms is None or row.power_lin is None:
|
||||
return
|
||||
history.ts.append(row.updated_at)
|
||||
history.dbfs.append(float(row.dbfs))
|
||||
history.rms.append(float(row.rms))
|
||||
history.power.append(float(row.power_lin))
|
||||
|
||||
|
||||
def _map_series(ts: List[float], ys: List[float], now: float, span: float, width: int, height: int,
|
||||
y_floor: Optional[float] = None, clip_floor: bool = False):
|
||||
if not ts or not ys:
|
||||
return None
|
||||
start = now - span
|
||||
pts = [(t, y) for t, y in zip(ts, ys) if t >= start and math.isfinite(y)]
|
||||
if not pts:
|
||||
return None
|
||||
vals = []
|
||||
for _, y in pts:
|
||||
if clip_floor and y_floor is not None and y < y_floor:
|
||||
y = y_floor
|
||||
vals.append(y)
|
||||
ymin = min(vals)
|
||||
ymax = max(vals)
|
||||
if y_floor is not None and ymin < y_floor:
|
||||
ymin = y_floor
|
||||
if not math.isfinite(ymin) or not math.isfinite(ymax):
|
||||
return None
|
||||
if ymax == ymin:
|
||||
pad = abs(ymax) * 0.05 or 1.0
|
||||
ymin -= pad
|
||||
ymax += pad
|
||||
else:
|
||||
pad = (ymax - ymin) * 0.08
|
||||
ymin -= pad
|
||||
ymax += pad
|
||||
if y_floor is not None and ymin < y_floor:
|
||||
ymin = y_floor
|
||||
if ymax <= ymin:
|
||||
ymax = ymin + 1.0
|
||||
|
||||
mapped = []
|
||||
for t, y in pts:
|
||||
if clip_floor and y_floor is not None and y < y_floor:
|
||||
y = y_floor
|
||||
xr = (t - start) / span if span > 0 else 1.0
|
||||
xr = 0.0 if xr < 0 else (1.0 if xr > 1 else xr)
|
||||
c = int(round(xr * (width - 1))) if width > 1 else 0
|
||||
yr = (y - ymin) / (ymax - ymin)
|
||||
yr = 0.0 if yr < 0 else (1.0 if yr > 1 else yr)
|
||||
r = height - 1 - int(round(yr * (height - 1)))
|
||||
mapped.append((c, r))
|
||||
return mapped, ymin, ymax, vals[-1]
|
||||
|
||||
|
||||
def _draw_line(grid: List[List[str]], x0: int, y0: int, x1: int, y1: int, ch: str = '*') -> None:
|
||||
dx = x1 - x0
|
||||
dy = y1 - y0
|
||||
steps = max(abs(dx), abs(dy), 1)
|
||||
for i in range(steps + 1):
|
||||
x = int(round(x0 + dx * i / steps))
|
||||
y = int(round(y0 + dy * i / steps))
|
||||
if 0 <= y < len(grid) and 0 <= x < len(grid[0]):
|
||||
grid[y][x] = ch
|
||||
|
||||
|
||||
def render_chart(title: str, unit: str, ts: List[float], ys: List[float], now: float,
|
||||
span: float, width: int, height: int,
|
||||
y_floor: Optional[float] = None, clip_floor: bool = False) -> List[str]:
|
||||
plot = _map_series(ts, ys, now, span, width, height, y_floor=y_floor, clip_floor=clip_floor)
|
||||
lines: List[str] = []
|
||||
if plot is None:
|
||||
lines.append(f"{title} ({unit}) | no samples in last {span:.0f}s")
|
||||
for _ in range(height):
|
||||
lines.append(" " * (10 + 3 + width))
|
||||
lines.append(f"{'':>10} +{'-' * width}")
|
||||
lines.append(f"{'':>10} t-{int(span)}s{' ' * max(1, width - 8)}now")
|
||||
return lines
|
||||
|
||||
points, ymin, ymax, last = plot
|
||||
grid = [[' ' for _ in range(width)] for _ in range(height)]
|
||||
|
||||
for row_idx in range(height):
|
||||
if row_idx == height // 2:
|
||||
for c in range(width):
|
||||
grid[row_idx][c] = '.'
|
||||
|
||||
prev = None
|
||||
for c, r in points:
|
||||
if prev is not None:
|
||||
_draw_line(grid, prev[0], prev[1], c, r, '*')
|
||||
prev = (c, r)
|
||||
|
||||
ytop = f"{ymax:.2f}" if abs(ymax) < 1e4 else f"{ymax:.2e}"
|
||||
ybot = f"{ymin:.2f}" if abs(ymin) < 1e4 else f"{ymin:.2e}"
|
||||
ylast = f"{last:.2f}" if abs(last) < 1e4 else f"{last:.2e}"
|
||||
lines.append(f"{title} ({unit}) | last={ylast} min={ybot} max={ytop}")
|
||||
|
||||
for i, row in enumerate(grid):
|
||||
if i == 0:
|
||||
lbl = ytop
|
||||
elif i == height - 1:
|
||||
lbl = ybot
|
||||
else:
|
||||
lbl = ""
|
||||
lines.append(f"{lbl:>10} |{''.join(row)}")
|
||||
|
||||
lines.append(f"{'':>10} +{'-' * width}")
|
||||
left = f"t-{int(span)}s"
|
||||
right = "now"
|
||||
spacer = max(1, width - len(left) - len(right))
|
||||
lines.append(f"{'':>10} {left}{' ' * spacer}{right}")
|
||||
return lines
|
||||
|
||||
|
||||
def render_screen(rows: Dict[str, Row], histories: Dict[str, MetricHistory], focus_label: str,
|
||||
started: float, history_sec: float, plot_height: int, plot_width_arg: int,
|
||||
env_path: Path, discovered: int) -> None:
|
||||
now = time.time()
|
||||
term_w = shutil.get_terminal_size((140, 50)).columns
|
||||
plot_width = plot_width_arg if plot_width_arg > 0 else max(40, min(term_w - 14, 140))
|
||||
|
||||
print("\x1b[2J\x1b[H", end="")
|
||||
print("HackRF Energy ASCII Monitor (relative levels: dBFS/RMS/power, not dBm)")
|
||||
print(f"env: {env_path} | discovered: {discovered} | uptime: {int(now-started)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print()
|
||||
|
||||
header = f"{'band':>5} {'idx':>3} {'status':>9} {'dBFS':>9} {'RMS':>10} {'power':>12} {'N':>5} {'age':>5} {'serial':>12}"
|
||||
print(header)
|
||||
print('-' * len(header))
|
||||
for label in sorted(rows, key=lambda x: int(x)):
|
||||
r = rows[label]
|
||||
idx = '-' if r.index is None else str(r.index)
|
||||
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
|
||||
dbfs = '-' if r.dbfs is None else f"{r.dbfs:.2f}"
|
||||
rms = '-' if r.rms is None else f"{r.rms:.6f}"
|
||||
pwr = '-' if r.power_lin is None else (f"{r.power_lin:.8f}" if abs(r.power_lin) < 1e4 else f"{r.power_lin:.3e}")
|
||||
print(f"{label:>5} {idx:>3} {r.status:>9} {dbfs:>9} {rms:>10} {pwr:>12} {r.samples:>5} {age:>5} {r.serial[-12:]:>12}")
|
||||
|
||||
print()
|
||||
if focus_label not in histories:
|
||||
print(f"focus label '{focus_label}' not found")
|
||||
print("Use --only or --focus to select a valid label.")
|
||||
sys.stdout.flush()
|
||||
return
|
||||
|
||||
h = histories[focus_label]
|
||||
ts = list(h.ts)
|
||||
print(f"Focus band: {focus_label}")
|
||||
print()
|
||||
for line in render_chart("dBFS vs time", "dBFS", ts, list(h.dbfs), now, history_sec, plot_width, plot_height, y_floor=-50.0, clip_floor=True):
|
||||
print(line)
|
||||
print()
|
||||
for line in render_chart("RMS vs time", "RMS", ts, list(h.rms), now, history_sec, plot_width, plot_height):
|
||||
print(line)
|
||||
print()
|
||||
for line in render_chart("Power vs time", "|IQ|^2", ts, list(h.power), now, history_sec, plot_width, plot_height):
|
||||
print(line)
|
||||
print()
|
||||
print("Ctrl+C to stop. If status=BUSY, corresponding SDR service is using the HackRF.")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = build_parser().parse_args()
|
||||
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
|
||||
|
||||
env_path = Path(args.env)
|
||||
if not env_path.is_absolute():
|
||||
env_path = (Path(__file__).resolve().parent / env_path).resolve()
|
||||
|
||||
env = parse_env(env_path)
|
||||
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
|
||||
if not targets:
|
||||
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
serial_to_index = parse_hackrf_info()
|
||||
|
||||
rows: Dict[str, Row] = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
|
||||
histories: Dict[str, MetricHistory] = {
|
||||
t.label: MetricHistory(deque(), deque(), deque(), deque()) for t in targets
|
||||
}
|
||||
|
||||
focus_label = (args.focus.strip() if args.focus.strip() else sorted(rows, key=lambda x: int(x))[0])
|
||||
|
||||
lock = threading.Lock()
|
||||
stop_event = threading.Event()
|
||||
|
||||
def _stop(signum, frame):
|
||||
stop_event.set()
|
||||
|
||||
signal.signal(signal.SIGINT, _stop)
|
||||
signal.signal(signal.SIGTERM, _stop)
|
||||
|
||||
workers: List[Worker] = []
|
||||
for i, t in enumerate(targets):
|
||||
wa = argparse.Namespace(**vars(args))
|
||||
wa.stagger = i * 0.15
|
||||
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
|
||||
w.start()
|
||||
workers.append(w)
|
||||
|
||||
started = time.time()
|
||||
try:
|
||||
while not stop_event.is_set():
|
||||
cutoff = time.time() - args.history_sec
|
||||
with lock:
|
||||
snap = {k: Row(**vars(v)) for k, v in rows.items()}
|
||||
for label, row in snap.items():
|
||||
maybe_append(histories[label], row)
|
||||
prune_history(histories[label], cutoff)
|
||||
render_screen(snap, histories, focus_label, started, args.history_sec,
|
||||
args.plot_height, args.plot_width, env_path, len(serial_to_index))
|
||||
stop_event.wait(args.refresh)
|
||||
finally:
|
||||
stop_event.set()
|
||||
for w in workers:
|
||||
w.join(timeout=2.0)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
raise SystemExit(main())
|
||||
Loading…
Reference in New Issue