You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
322 lines
11 KiB
Python
322 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import math
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
try:
|
|
import numpy as np
|
|
except Exception as exc:
|
|
print(f"numpy import failed: {exc}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
from gnuradio import blocks, gr
|
|
import osmosdr
|
|
except Exception as exc:
|
|
print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr)
|
|
print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy.py", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
EPS = 1e-20
|
|
|
|
|
|
@dataclass
|
|
class Target:
|
|
label: str
|
|
serial: str
|
|
freq_hz: float
|
|
source: str
|
|
|
|
|
|
@dataclass
|
|
class Row:
|
|
label: str
|
|
serial: str
|
|
index: Optional[int] = None
|
|
freq_hz: float = 0.0
|
|
status: str = "INIT"
|
|
rms: Optional[float] = None
|
|
power_lin: Optional[float] = None
|
|
dbfs: Optional[float] = None
|
|
samples: int = 0
|
|
updated_at: float = 0.0
|
|
error: str = ""
|
|
|
|
|
|
class ProbeTop(gr.top_block):
|
|
def __init__(self, index: int, freq_hz: float, sample_rate: float, vec_len: int,
|
|
gain: float, if_gain: float, bb_gain: float):
|
|
super().__init__("hackrf_energy_probe")
|
|
self.probe = blocks.probe_signal_vc(vec_len)
|
|
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
|
|
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
|
|
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
|
|
self.src.set_sample_rate(sample_rate)
|
|
self.src.set_center_freq(freq_hz, 0)
|
|
try:
|
|
self.src.set_freq_corr(0, 0)
|
|
except Exception:
|
|
pass
|
|
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
|
|
try:
|
|
getattr(self.src, fn)(val, 0)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.src.set_bandwidth(0, 0)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.src.set_antenna('', 0)
|
|
except Exception:
|
|
pass
|
|
self.connect((self.src, 0), (self.stream_to_vec, 0))
|
|
self.connect((self.stream_to_vec, 0), (self.probe, 0))
|
|
|
|
def read_metrics(self) -> Tuple[float, float, float, int]:
|
|
arr = np.asarray(self.probe.level(), dtype=np.complex64)
|
|
if arr.size == 0:
|
|
raise RuntimeError("no samples")
|
|
p = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
|
|
rms = math.sqrt(max(p, 0.0))
|
|
dbfs = 10.0 * math.log10(max(p, EPS))
|
|
return rms, p, dbfs, int(arr.size)
|
|
|
|
|
|
class Worker(threading.Thread):
|
|
def __init__(self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row],
|
|
lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace):
|
|
super().__init__(daemon=True)
|
|
self.target = target
|
|
self.serial_to_index = serial_to_index
|
|
self.rows = rows
|
|
self.lock = lock
|
|
self.stop_event = stop_event
|
|
self.args = args
|
|
self.tb: Optional[ProbeTop] = None
|
|
|
|
def _set_row(self, **kwargs):
|
|
with self.lock:
|
|
row = self.rows[self.target.label]
|
|
for k, v in kwargs.items():
|
|
setattr(row, k, v)
|
|
row.updated_at = time.time()
|
|
|
|
def _open(self) -> bool:
|
|
idx = self.serial_to_index.get(self.target.serial)
|
|
if idx is None:
|
|
self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None)
|
|
return False
|
|
self._set_row(index=idx, status="OPENING", error="", freq_hz=self.target.freq_hz)
|
|
try:
|
|
self.tb = ProbeTop(
|
|
idx, self.target.freq_hz, self.args.sample_rate, self.args.vec_len,
|
|
self.args.gain, self.args.if_gain, self.args.bb_gain
|
|
)
|
|
self.tb.start()
|
|
time.sleep(0.15)
|
|
self._set_row(status="OK", error="")
|
|
return True
|
|
except Exception as exc:
|
|
msg = str(exc)
|
|
status = "BUSY" if ("Resource busy" in msg or "-1000" in msg) else "ERR"
|
|
self._set_row(status=status, error=msg)
|
|
self.tb = None
|
|
return False
|
|
|
|
def _close(self):
|
|
if self.tb is not None:
|
|
try:
|
|
self.tb.stop()
|
|
self.tb.wait()
|
|
except Exception:
|
|
pass
|
|
self.tb = None
|
|
|
|
def run(self):
|
|
time.sleep(self.args.stagger)
|
|
while not self.stop_event.is_set():
|
|
if self.tb is None and not self._open():
|
|
if self.stop_event.wait(self.args.reopen_delay):
|
|
break
|
|
continue
|
|
try:
|
|
rms, p, dbfs, n = self.tb.read_metrics()
|
|
self._set_row(status="OK", rms=rms, power_lin=p, dbfs=dbfs, samples=n, error="")
|
|
except Exception as exc:
|
|
self._set_row(status="ERR", error=str(exc))
|
|
self._close()
|
|
if self.stop_event.wait(self.args.reopen_delay):
|
|
break
|
|
continue
|
|
if self.stop_event.wait(self.args.interval):
|
|
break
|
|
self._close()
|
|
|
|
|
|
def parse_env(path: Path) -> Dict[str, str]:
|
|
out: Dict[str, str] = {}
|
|
if not path.exists():
|
|
return out
|
|
for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, v = line.split("=", 1)
|
|
out[k.strip()] = v.strip().strip('"').strip("'")
|
|
return out
|
|
|
|
|
|
def collect_targets(env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float]) -> List[Target]:
|
|
targets: List[Target] = []
|
|
for k, v in env.items():
|
|
m = re.fullmatch(r"hack_(\d+)", k)
|
|
if m:
|
|
label = m.group(1)
|
|
else:
|
|
m = re.fullmatch(r"HACKID_(\d+)", k)
|
|
if not m:
|
|
continue
|
|
label = m.group(1)
|
|
if only and label not in only:
|
|
continue
|
|
mhz = override_freq_mhz if override_freq_mhz is not None else float(label)
|
|
targets.append(Target(label=label, serial=v.lower(), freq_hz=mhz * 1e6, source=k))
|
|
|
|
uniq: Dict[str, Target] = {}
|
|
for t in sorted(targets, key=lambda x: (int(x.label), 0 if x.source.startswith("hack_") else 1)):
|
|
uniq.setdefault(t.label, t)
|
|
return [uniq[k] for k in sorted(uniq, key=lambda x: int(x))]
|
|
|
|
|
|
def parse_hackrf_info() -> Dict[str, int]:
|
|
try:
|
|
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
|
|
except FileNotFoundError:
|
|
raise RuntimeError("hackrf_info not found")
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError("hackrf_info timeout")
|
|
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
|
|
out: Dict[str, int] = {}
|
|
cur_idx: Optional[int] = None
|
|
for line in text.splitlines():
|
|
m = re.search(r"^Index:\s*(\d+)", line)
|
|
if m:
|
|
cur_idx = int(m.group(1))
|
|
continue
|
|
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
|
|
if m and cur_idx is not None:
|
|
out[m.group(1).lower()] = cur_idx
|
|
if not out:
|
|
raise RuntimeError("no devices parsed from hackrf_info")
|
|
return out
|
|
|
|
|
|
def fmt(v: Optional[float], spec: str) -> str:
|
|
return "-" if v is None else format(v, spec)
|
|
|
|
|
|
def render(rows: Dict[str, Row], started_at: float, env_path: Path, serial_to_index: Dict[str, int]):
|
|
now = time.time()
|
|
print("\x1b[2J\x1b[H", end="")
|
|
print("HackRF Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)")
|
|
print(f"env: {env_path} | discovered: {len(serial_to_index)} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print()
|
|
header = f"{'band':>5} {'idx':>3} {'freq':>7} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>5} {'age':>5} {'serial':>12} error"
|
|
print(header)
|
|
print('-' * len(header))
|
|
for label in sorted(rows, key=lambda x: int(x)):
|
|
r = rows[label]
|
|
idx = '-' if r.index is None else str(r.index)
|
|
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
|
|
err = (r.error or "")
|
|
if len(err) > 64:
|
|
err = err[:61] + '...'
|
|
print(
|
|
f"{r.label:>5} {idx:>3} {r.freq_hz/1e6:>7.1f} {r.status:>9} "
|
|
f"{fmt(r.rms, '.6f'):>10} {fmt(r.power_lin, '.8f'):>12} {fmt(r.dbfs, '.2f'):>9} "
|
|
f"{r.samples:>5} {age:>5} {r.serial[-12:]:>12} {err}"
|
|
)
|
|
print()
|
|
print("Ctrl+C to stop. Use --only 2400,5200 or --freq-mhz 2450 to limit/override tuning.")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
p = argparse.ArgumentParser(description="Realtime HackRF relative energy monitor")
|
|
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
|
|
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
|
|
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
|
|
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate in Hz (HackRF min ~2e6)")
|
|
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
|
|
p.add_argument("--interval", type=float, default=0.5, help="Per-device read interval (s)")
|
|
p.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)")
|
|
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
|
|
p.add_argument("--gain", type=float, default=16.0, help="General gain")
|
|
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
|
|
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
|
|
return p
|
|
|
|
|
|
def main() -> int:
|
|
args = build_parser().parse_args()
|
|
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
|
|
|
|
env_path = Path(args.env)
|
|
if not env_path.is_absolute():
|
|
env_path = (Path(__file__).resolve().parent / env_path).resolve()
|
|
env = parse_env(env_path)
|
|
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
|
|
if not targets:
|
|
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
|
|
return 2
|
|
|
|
try:
|
|
serial_to_index = parse_hackrf_info()
|
|
except Exception as exc:
|
|
print(f"hackrf discovery failed: {exc}", file=sys.stderr)
|
|
return 3
|
|
|
|
rows = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
|
|
lock = threading.Lock()
|
|
stop_event = threading.Event()
|
|
|
|
def on_signal(signum, frame):
|
|
stop_event.set()
|
|
|
|
signal.signal(signal.SIGINT, on_signal)
|
|
signal.signal(signal.SIGTERM, on_signal)
|
|
|
|
workers: List[Worker] = []
|
|
for i, t in enumerate(targets):
|
|
wa = argparse.Namespace(**vars(args))
|
|
wa.stagger = i * 0.15
|
|
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
|
|
workers.append(w)
|
|
w.start()
|
|
|
|
started = time.time()
|
|
try:
|
|
while not stop_event.is_set():
|
|
with lock:
|
|
snap = {k: Row(**vars(v)) for k, v in rows.items()}
|
|
render(snap, started, env_path, serial_to_index)
|
|
stop_event.wait(args.refresh)
|
|
finally:
|
|
stop_event.set()
|
|
for w in workers:
|
|
w.join(timeout=2)
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
raise SystemExit(main())
|