добавил поддержку neptune в read_energy

neptune
Sergey Revyakin 2 weeks ago
parent f4c4a0bd22
commit 260638ec43

@ -9,7 +9,7 @@ import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Sequence, Tuple
try:
import numpy as np
@ -26,12 +26,13 @@ except Exception as exc:
sys.exit(1)
EPS = 1e-20
IIO_MIN_SAMPLE_RATE = 2083333
@dataclass
class Target:
label: str
serial: str
device: str
freq_hz: float
source: str
@ -39,8 +40,8 @@ class Target:
@dataclass
class Row:
label: str
serial: str
index: Optional[int] = None
device: str
index: Optional[str] = None
freq_hz: float = 0.0
status: str = "INIT"
rms: Optional[float] = None
@ -51,13 +52,28 @@ class Row:
error: str = ""
class ProbeTop(gr.top_block):
def __init__(self, index: int, freq_hz: float, sample_rate: float, vec_len: int,
gain: float, if_gain: float, bb_gain: float):
def label_sort_key(label: str) -> Tuple[int, float | str]:
try:
return (0, float(label))
except ValueError:
return (1, label)
class HackRfProbeTop(gr.top_block):
def __init__(
self,
serial: str,
freq_hz: float,
sample_rate: float,
vec_len: int,
gain: float,
if_gain: float,
bb_gain: float,
):
super().__init__("hackrf_energy_probe")
self.probe = blocks.probe_signal_vc(vec_len)
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}")
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
self.src.set_sample_rate(sample_rate)
self.src.set_center_freq(freq_hz, 0)
@ -68,14 +84,14 @@ class ProbeTop(gr.top_block):
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
try:
getattr(self.src, fn)(val, 0)
except Exception:
raise Exception("не ставится усиление")
except Exception as exc:
raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc
try:
self.src.set_bandwidth(0, 0)
except Exception:
pass
try:
self.src.set_antenna('', 0)
self.src.set_antenna("", 0)
except Exception:
pass
self.connect((self.src, 0), (self.stream_to_vec, 0))
@ -85,15 +101,22 @@ class ProbeTop(gr.top_block):
arr = np.asarray(self.probe.level(), dtype=np.complex64)
if arr.size == 0:
raise RuntimeError("no samples")
p = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
rms = math.sqrt(max(p, 0.0))
dbfs = 10.0 * math.log10(max(p, EPS))
return rms, p, dbfs, int(arr.size)
class Worker(threading.Thread):
def __init__(self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row],
lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace):
power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
rms = math.sqrt(max(power_lin, 0.0))
dbfs = 10.0 * math.log10(max(power_lin, EPS))
return rms, power_lin, dbfs, int(arr.size)
class HackRfWorker(threading.Thread):
def __init__(
self,
target: Target,
serial_to_index: Dict[str, int],
rows: Dict[str, Row],
lock: threading.Lock,
stop_event: threading.Event,
args: argparse.Namespace,
):
super().__init__(daemon=True)
self.target = target
self.serial_to_index = serial_to_index
@ -101,39 +124,46 @@ class Worker(threading.Thread):
self.lock = lock
self.stop_event = stop_event
self.args = args
self.tb: Optional[ProbeTop] = None
self.tb: Optional[HackRfProbeTop] = None
def _set_row(self, **kwargs):
with self.lock:
row = self.rows[self.target.label]
for k, v in kwargs.items():
setattr(row, k, v)
for key, value in kwargs.items():
setattr(row, key, value)
row.updated_at = time.time()
def _open(self) -> bool:
idx = self.serial_to_index.get(self.target.serial)
if idx is None:
if self.target.device not in self.serial_to_index:
self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None)
return False
self._set_row(index=idx, status="OPENING", error="", freq_hz=self.target.freq_hz)
idx = self.serial_to_index[self.target.device]
self._set_row(index=str(idx), status="OPENING", error="", freq_hz=self.target.freq_hz)
try:
self.tb = ProbeTop(
idx, self.target.freq_hz, self.args.sample_rate, self.args.vec_len,
self.args.gain, self.args.if_gain, self.args.bb_gain
self.tb = HackRfProbeTop(
self.target.device,
self.target.freq_hz,
self.args.sample_rate,
self.args.vec_len,
self.args.gain,
self.args.if_gain,
self.args.bb_gain,
)
self.tb.start()
time.sleep(0.15)
self._set_row(status="OK", error="")
return True
except Exception as exc:
msg = str(exc)
status = "BUSY" if ("Resource busy" in msg or "-1000" in msg) else "ERR"
self._set_row(status=status, error=msg)
message = str(exc)
status = "BUSY" if ("Resource busy" in message or "-1000" in message) else "ERR"
self._set_row(status=status, error=message)
self.tb = None
return False
def _close(self):
if self.tb is not None:
if self.tb is None:
return
try:
self.tb.stop()
self.tb.wait()
@ -149,8 +179,15 @@ class Worker(threading.Thread):
break
continue
try:
rms, p, dbfs, n = self.tb.read_metrics()
self._set_row(status="OK", rms=rms, power_lin=p, dbfs=dbfs, samples=n, error="")
rms, power_lin, dbfs, samples = self.tb.read_metrics()
self._set_row(
status="OK",
rms=rms,
power_lin=power_lin,
dbfs=dbfs,
samples=samples,
error="",
)
except Exception as exc:
self._set_row(status="ERR", error=str(exc))
self._close()
@ -162,6 +199,190 @@ class Worker(threading.Thread):
self._close()
class IioProbe:
def __init__(self, args: argparse.Namespace):
self.args = args
self._static_signature: Optional[Tuple[float, float, str, Optional[float], str]] = None
self._last_freq_hz: Optional[int] = None
self._input_channels = [args.iio_i_channel]
if args.iio_q_channel not in self._input_channels:
self._input_channels.append(args.iio_q_channel)
def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes:
proc = subprocess.run(
list(cmd),
capture_output=True,
text=not binary,
)
if proc.returncode != 0:
stderr = proc.stderr if binary else (proc.stderr or "")
stdout = "" if binary else (proc.stdout or "")
details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}"
raise RuntimeError(details)
return proc.stdout
def _set_input_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def _set_output_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-o",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def ensure_configured(self):
sample_rate = None
if self.args.sample_rate is not None:
sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE)))
bandwidth = None
if self.args.bandwidth is not None:
bandwidth = int(round(max(float(self.args.bandwidth), 200000.0)))
signature = (
None if sample_rate is None else float(sample_rate),
None if bandwidth is None else float(bandwidth),
self.args.iio_gain_mode,
self.args.iio_hardwaregain,
self.args.iio_port_select,
)
if signature == self._static_signature:
return
for channel in self._input_channels:
if sample_rate is not None:
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
if bandwidth is not None:
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
if self.args.iio_port_select:
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
if self.args.iio_gain_mode:
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
self._static_signature = signature
def tune(self, freq_hz: float):
target = int(round(freq_hz))
if self._last_freq_hz == target:
return
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
self._last_freq_hz = target
if self.args.settle > 0:
time.sleep(self.args.settle)
def read_metrics(self) -> Tuple[float, float, float, int]:
cmd = [
"iio_readdev",
"-u",
self.args.uri,
"-T",
str(int(self.args.timeout_ms)),
"-b",
str(max(4, int(self.args.vec_len))),
"-s",
str(max(4, int(self.args.vec_len))),
self.args.iio_device,
self.args.iio_i_channel,
self.args.iio_q_channel,
]
raw = self._run(cmd, binary=True)
values = np.frombuffer(raw, dtype="<i2")
if values.size == 0:
raise RuntimeError("no samples")
if values.size % 2 != 0:
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
i = iq[:, 0]
q = iq[:, 1]
power_lin = float(np.mean(i * i + q * q))
rms = math.sqrt(max(power_lin, 0.0))
dbfs = 10.0 * math.log10(max(power_lin, EPS))
return rms, power_lin, dbfs, int(iq.shape[0])
class IioWorker(threading.Thread):
def __init__(
self,
targets: List[Target],
rows: Dict[str, Row],
lock: threading.Lock,
stop_event: threading.Event,
args: argparse.Namespace,
):
super().__init__(daemon=True)
self.targets = targets
self.rows = rows
self.lock = lock
self.stop_event = stop_event
self.args = args
self.probe = IioProbe(args)
def _set_row(self, label: str, **kwargs):
with self.lock:
row = self.rows[label]
for key, value in kwargs.items():
setattr(row, key, value)
row.updated_at = time.time()
def run(self):
while not self.stop_event.is_set():
for target in self.targets:
if self.stop_event.is_set():
break
self._set_row(
target.label,
index="iio",
status="OPENING",
error="",
freq_hz=target.freq_hz,
)
try:
self.probe.ensure_configured()
self.probe.tune(target.freq_hz)
rms, power_lin, dbfs, samples = self.probe.read_metrics()
self._set_row(
target.label,
status="OK",
rms=rms,
power_lin=power_lin,
dbfs=dbfs,
samples=samples,
error="",
)
except Exception as exc:
self._set_row(target.label, status="ERR", error=str(exc))
if self.stop_event.wait(self.args.reopen_delay):
return
continue
if self.stop_event.wait(self.args.interval):
return
def parse_env(path: Path) -> Dict[str, str]:
out: Dict[str, str] = {}
if not path.exists():
@ -170,122 +391,198 @@ def parse_env(path: Path) -> Dict[str, str]:
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip().strip('"').strip("'")
key, value = line.split("=", 1)
out[key.strip()] = value.strip().strip('"').strip("'")
return out
def collect_targets(env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float]) -> List[Target]:
def collect_hackrf_targets(
env: Dict[str, str],
only: Optional[set],
override_freq_mhz: Optional[float],
) -> List[Target]:
targets: List[Target] = []
for k, v in env.items():
m = re.fullmatch(r"hack_(\d+)", k)
if m:
label = m.group(1)
for key, value in env.items():
match = re.fullmatch(r"hack_(\d+)", key)
if match:
label = match.group(1)
else:
m = re.fullmatch(r"HACKID_(\d+)", k)
if not m:
match = re.fullmatch(r"HACKID_(\d+)", key)
if not match:
continue
label = m.group(1)
label = match.group(1)
if only and label not in only:
continue
mhz = override_freq_mhz if override_freq_mhz is not None else float(label)
targets.append(Target(label=label, serial=v.lower(), freq_hz=mhz * 1e6, source=k))
targets.append(Target(label=label, device=value.lower(), freq_hz=mhz * 1e6, source=key))
unique: Dict[str, Target] = {}
for target in sorted(targets, key=lambda item: (label_sort_key(item.label), 0 if item.source.startswith("hack_") else 1)):
unique.setdefault(target.label, target)
return [unique[key] for key in sorted(unique, key=label_sort_key)]
def collect_iio_targets(
env: Dict[str, str],
only: Optional[set],
override_freq_mhz: Optional[float],
uri: str,
) -> List[Target]:
if only:
labels = set(only)
else:
labels = set()
for key in env:
for pattern in (r"c_freq_(\d+)", r"hack_(\d+)", r"HACKID_(\d+)"):
match = re.fullmatch(pattern, key)
if match:
labels.add(match.group(1))
break
uniq: Dict[str, Target] = {}
for t in sorted(targets, key=lambda x: (int(x.label), 0 if x.source.startswith("hack_") else 1)):
uniq.setdefault(t.label, t)
return [uniq[k] for k in sorted(uniq, key=lambda x: int(x))]
if not labels and override_freq_mhz is not None:
labels.add(str(int(override_freq_mhz) if float(override_freq_mhz).is_integer() else override_freq_mhz))
targets: List[Target] = []
for label in sorted(labels, key=label_sort_key):
raw_freq = env.get(f"c_freq_{label}", label)
try:
mhz = override_freq_mhz if override_freq_mhz is not None else float(raw_freq)
except ValueError as exc:
raise ValueError(f"cannot resolve frequency for target {label!r}") from exc
targets.append(Target(label=label, device=uri, freq_hz=mhz * 1e6, source="iio"))
return targets
def parse_hackrf_info() -> Dict[str, int]:
try:
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
except FileNotFoundError:
raise RuntimeError("hackrf_info not found")
except subprocess.TimeoutExpired:
raise RuntimeError("hackrf_info timeout")
except FileNotFoundError as exc:
raise RuntimeError("hackrf_info not found") from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError("hackrf_info timeout") from exc
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
out: Dict[str, int] = {}
cur_idx: Optional[int] = None
for line in text.splitlines():
m = re.search(r"^Index:\s*(\d+)", line)
if m:
cur_idx = int(m.group(1))
match = re.search(r"^Index:\s*(\d+)", line)
if match:
cur_idx = int(match.group(1))
continue
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if m and cur_idx is not None:
out[m.group(1).lower()] = cur_idx
match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if match and cur_idx is not None:
out[match.group(1).lower()] = cur_idx
if not out:
raise RuntimeError("no devices parsed from hackrf_info")
return out
def fmt(v: Optional[float], spec: str) -> str:
return "-" if v is None else format(v, spec)
def fmt(value: Optional[float], spec: str) -> str:
return "-" if value is None else format(value, spec)
def render(rows: Dict[str, Row], started_at: float, env_path: Path, serial_to_index: Dict[str, int]):
def render(rows: Dict[str, Row], started_at: float, env_path: Path, summary: str):
now = time.time()
print("\x1b[2J\x1b[H", end="")
print("HackRF Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)")
print(f"env: {env_path} | discovered: {len(serial_to_index)} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("Read Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)")
print(f"env: {env_path} | {summary} | uptime: {int(now - started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
header = f"{'band':>5} {'idx':>3} {'freq':>7} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>5} {'age':>5} {'serial':>12} error"
header = f"{'band':>5} {'idx':>4} {'freq':>8} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>6} {'age':>5} {'device':>18} error"
print(header)
print('-' * len(header))
for label in sorted(rows, key=lambda x: int(x)):
r = rows[label]
idx = '-' if r.index is None else str(r.index)
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
err = (r.error or "")
print("-" * len(header))
for label in sorted(rows, key=label_sort_key):
row = rows[label]
idx = "-" if row.index is None else str(row.index)
age = "-" if row.updated_at <= 0 else f"{(now - row.updated_at):.1f}"
err = row.error or ""
if len(err) > 64:
err = err[:61] + '...'
err = err[:61] + "..."
device = row.device[-18:]
print(
f"{r.label:>5} {idx:>3} {r.freq_hz/1e6:>7.1f} {r.status:>9} "
f"{fmt(r.rms, '.6f'):>10} {fmt(r.power_lin, '.8f'):>12} {fmt(r.dbfs, '.2f'):>9} "
f"{r.samples:>5} {age:>5} {r.serial[-12:]:>12} {err}"
f"{row.label:>5} {idx:>4} {row.freq_hz / 1e6:>8.1f} {row.status:>9} "
f"{fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} {fmt(row.dbfs, '.2f'):>9} "
f"{row.samples:>6} {age:>5} {device:>18} {err}"
)
print()
print("Ctrl+C to stop. Use --only 2400,5200 or --freq-mhz 2450 to limit/override tuning.")
print("Ctrl+C to stop. Use --backend iio --uri ip:192.168.2.1 --only 2400 for Neptune.")
sys.stdout.flush()
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Realtime HackRF relative energy monitor")
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate in Hz (HackRF min ~2e6)")
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
p.add_argument("--interval", type=float, default=0.5, help="Per-device read interval (s)")
p.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)")
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
p.add_argument("--gain", type=float, default=0.0, help="General gain")
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
return p
parser = argparse.ArgumentParser(description="Realtime SDR relative energy monitor")
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
parser.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
parser.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
parser.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all targets (MHz)")
parser.add_argument(
"--sample-rate",
type=float,
default=None,
help="Sample rate in Hz (HackRF default: 2e6, IIO default: keep current device setting)",
)
parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (default: keep device setting)")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / capture size")
parser.add_argument("--interval", type=float, default=0.5, help="Per-target read interval (s)")
parser.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)")
parser.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after ERR/BUSY (s)")
parser.add_argument("--gain", type=float, default=0.0, help="General gain for HackRF")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF")
parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1")
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
parser.add_argument(
"--iio-gain-mode",
choices=("manual", "fast_attack", "slow_attack", "hybrid"),
default="slow_attack",
help="IIO gain control mode",
)
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
parser.add_argument("--settle", type=float, default=0.12, help="Wait after retune before reading in IIO mode (s)")
return parser
def main() -> int:
args = build_parser().parse_args()
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
only = {item.strip() for item in args.only.split(",") if item.strip()} or None
env_path = Path(args.env)
if not env_path.is_absolute():
env_path = (Path(__file__).resolve().parent / env_path).resolve()
env = parse_env(env_path)
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
if args.backend == "hackrf":
targets = collect_hackrf_targets(env, only=only, override_freq_mhz=args.freq_mhz)
if not targets:
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
return 2
try:
serial_to_index = parse_hackrf_info()
except Exception as exc:
print(f"hackrf discovery failed: {exc}", file=sys.stderr)
return 3
summary = f"backend=hackrf | discovered={len(serial_to_index)}"
if args.sample_rate is None:
args.sample_rate = 2e6
else:
try:
targets = collect_iio_targets(env, only=only, override_freq_mhz=args.freq_mhz, uri=args.uri)
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 2
if not targets:
print(
f"No IIO targets resolved from {env_path}. Use c_freq_* in .env or pass --only / --freq-mhz.",
file=sys.stderr,
)
return 2
summary = f"backend=iio | uri={args.uri} | device={args.iio_device} | targets={len(targets)}"
rows = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
rows = {target.label: Row(label=target.label, device=target.device, freq_hz=target.freq_hz) for target in targets}
lock = threading.Lock()
stop_event = threading.Event()
@ -295,27 +592,32 @@ def main() -> int:
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
workers: List[Worker] = []
for i, t in enumerate(targets):
wa = argparse.Namespace(**vars(args))
wa.stagger = i * 0.15
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
workers.append(w)
w.start()
workers: List[threading.Thread] = []
if args.backend == "hackrf":
for idx, target in enumerate(targets):
worker_args = argparse.Namespace(**vars(args))
worker_args.stagger = idx * 0.15
worker = HackRfWorker(target, serial_to_index, rows, lock, stop_event, worker_args)
workers.append(worker)
worker.start()
else:
worker = IioWorker(targets, rows, lock, stop_event, args)
workers.append(worker)
worker.start()
started = time.time()
try:
while not stop_event.is_set():
with lock:
snap = {k: Row(**vars(v)) for k, v in rows.items()}
render(snap, started, env_path, serial_to_index)
snapshot = {key: Row(**vars(value)) for key, value in rows.items()}
render(snapshot, started, env_path, summary)
stop_event.wait(args.refresh)
finally:
stop_event.set()
for w in workers:
w.join(timeout=2)
for worker in workers:
worker.join(timeout=2)
return 0
if __name__ == '__main__':
if __name__ == "__main__":
raise SystemExit(main())

@ -1,13 +1,25 @@
#!/usr/bin/env python3
"""
./.venv-sdr/bin/python read_energy_wide.py \
Examples:
HackRF:
./.venv-sdr/bin/python read_energy_wide.py \
--backend hackrf \
--serial 0000000000000000a18c63dc2a83b813 \
--sample-rate 20000000 \
--base 6000 \
--roof 5700 \
--step 20
Neptune / Pluto over IIO:
./.venv-sdr/bin/python read_energy_wide.py \
--backend iio \
--uri ip:192.168.2.1 \
--base 2402 \
--roof 2398 \
--step 1
"""
#!/usr/bin/env python3
import argparse
import math
import re
@ -16,7 +28,7 @@ import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Sequence, Tuple
try:
import numpy as np
@ -33,6 +45,7 @@ except Exception as exc:
sys.exit(1)
EPS = 1e-20
IIO_MIN_SAMPLE_RATE = 2083333
@dataclass
@ -53,10 +66,10 @@ class ScanWindow:
pass_no: int = 0
class WideProbeTop(gr.top_block):
class HackRfWideProbe(gr.top_block):
def __init__(
self,
index: int,
serial: str,
center_freq_hz: float,
sample_rate: float,
vec_len: int,
@ -67,7 +80,7 @@ class WideProbeTop(gr.top_block):
super().__init__("hackrf_energy_wide_probe")
self.probe = blocks.probe_signal_vc(vec_len)
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}")
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
self.src.set_sample_rate(sample_rate)
self.src.set_center_freq(center_freq_hz, 0)
@ -82,8 +95,8 @@ class WideProbeTop(gr.top_block):
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
try:
getattr(self.src, fn)(val, 0)
except Exception:
pass
except Exception as exc:
raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc
try:
self.src.set_bandwidth(0, 0)
except Exception:
@ -95,7 +108,7 @@ class WideProbeTop(gr.top_block):
self.connect((self.src, 0), (self.stream_to_vec, 0))
self.connect((self.stream_to_vec, 0), (self.probe, 0))
def tune(self, freq_hz: float) -> None:
def tune(self, freq_hz: float):
self.src.set_center_freq(freq_hz, 0)
def read_metrics(self) -> Tuple[float, float, float, int]:
@ -139,374 +152,154 @@ class WideProbeTop(gr.top_block):
return rms, power_lin, dbfs, samples
def parse_hackrf_info() -> Dict[str, int]:
try:
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
except FileNotFoundError:
raise RuntimeError("hackrf_info not found")
except subprocess.TimeoutExpired:
raise RuntimeError("hackrf_info timeout")
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
out: Dict[str, int] = {}
cur_idx: Optional[int] = None
for line in text.splitlines():
m = re.search(r"^Index:\s*(\d+)", line)
if m:
cur_idx = int(m.group(1))
continue
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if m and cur_idx is not None:
out[m.group(1).lower()] = cur_idx
if not out:
raise RuntimeError("no devices parsed from hackrf_info")
return out
def fmt(value: Optional[float], spec: str) -> str:
return "-" if value is None else format(value, spec)
def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[ScanWindow]:
if step_mhz <= 0:
raise ValueError("step must be > 0")
if base_mhz == roof_mhz:
raise ValueError("base and roof must be different")
direction = -1.0 if roof_mhz < base_mhz else 1.0
edge = base_mhz
seq = 1
windows: List[ScanWindow] = []
while True:
next_edge = edge + direction * step_mhz
if direction < 0 and next_edge < roof_mhz:
next_edge = roof_mhz
if direction > 0 and next_edge > roof_mhz:
next_edge = roof_mhz
low_mhz = min(edge, next_edge)
high_mhz = max(edge, next_edge)
center_mhz = (low_mhz + high_mhz) / 2.0
windows.append(
ScanWindow(
seq=seq,
start_mhz=edge,
end_mhz=next_edge,
low_mhz=low_mhz,
high_mhz=high_mhz,
center_mhz=center_mhz,
class IioWideProbe:
def __init__(self, args: argparse.Namespace):
self.args = args
self._static_signature: Optional[Tuple[Optional[float], Optional[float], str, Optional[float], str]] = None
self._last_freq_hz: Optional[int] = None
self._input_channels = [args.iio_i_channel]
if args.iio_q_channel not in self._input_channels:
self._input_channels.append(args.iio_q_channel)
def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes:
proc = subprocess.run(list(cmd), capture_output=True, text=not binary)
if proc.returncode != 0:
stderr = proc.stderr if binary else (proc.stderr or "")
stdout = "" if binary else (proc.stdout or "")
details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}"
raise RuntimeError(details)
return proc.stdout
def _run_binary(self, cmd: Sequence[str]) -> bytes:
proc = subprocess.run(list(cmd), capture_output=True)
if proc.returncode != 0:
details = proc.stderr.decode("utf-8", errors="ignore").strip() or proc.stdout.decode("utf-8", errors="ignore").strip()
raise RuntimeError(details or f"exit code {proc.returncode}")
return proc.stdout
def _read_input_attr(self, channel: str, attr: str) -> str:
out = self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
]
)
return str(out).strip()
def _set_input_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
if next_edge == roof_mhz:
break
edge = next_edge
seq += 1
return windows
def render(
windows: List[ScanWindow],
serial: str,
index: int,
sample_rate: float,
base_mhz: float,
roof_mhz: float,
step_mhz: float,
started_at: float,
pass_no: int,
current_seq: int,
) -> None:
now = time.time()
capture_bw_mhz = sample_rate / 1e6
current_row = next((row for row in windows if row.seq == current_seq), None)
best_row = max(
(row for row in windows if row.status == "OK" and row.dbfs is not None),
key=lambda row: row.dbfs if row.dbfs is not None else float("-inf"),
default=None,
def _set_output_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-o",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
print("\x1b[2J\x1b[H", end="")
print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)")
print(
f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | "
f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | "
f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
print()
header = (
f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} "
f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error"
)
print(header)
print("-" * len(header))
for row in windows:
age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}"
err = row.error
if len(err) > 50:
err = err[:47] + "..."
marker = ">>>" if row.seq == current_seq else ""
print(
f"{marker:>3} {row.seq:>3} "
f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} "
f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} "
f"{row.samples:>5} {age:>5} {err}"
)
print()
if best_row is not None:
best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}"
print(
f"{'':>3} {'MAX':>3} "
f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} "
f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} "
f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}"
)
elif current_row is not None:
current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}"
print(
f"{'':>3} {'MAX':>3} "
f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} "
f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} "
f"{0:>5} {current_age:>5} no successful windows yet"
)
print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.")
sys.stdout.flush()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy")
parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info")
parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz")
parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz")
parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz")
parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)")
parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window")
parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)")
parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite")
parser.add_argument("--gain", type=float, default=16.0, help="General gain")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
return parser
def main() -> int:
args = build_parser().parse_args()
serial = args.serial.lower()
try:
windows = build_windows(args.base, args.roof, args.step)
except ValueError as exc:
print(f"invalid scan range: {exc}", file=sys.stderr)
return 2
step_hz = args.step * 1e6
if args.sample_rate < step_hz:
print(
f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; "
"this would leave gaps in the scan",
file=sys.stderr,
)
return 2
try:
serial_to_index = parse_hackrf_info()
except Exception as exc:
print(f"hackrf discovery failed: {exc}", file=sys.stderr)
return 3
index = serial_to_index.get(serial)
if index is None:
print(f"serial {serial} not found in hackrf_info", file=sys.stderr)
print("available serials:", file=sys.stderr)
for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]):
print(f" idx={item_index} serial={item_serial}", file=sys.stderr)
return 4
stop_requested = False
def on_signal(signum, frame):
nonlocal stop_requested
stop_requested = True
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
probe: Optional[WideProbeTop] = None
started_at = time.time()
pass_no = 0
current_seq = windows[0].seq
try:
probe = WideProbeTop(
index=index,
center_freq_hz=windows[0].center_mhz * 1e6,
sample_rate=args.sample_rate,
vec_len=args.vec_len,
gain=args.gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
probe.start()
time.sleep(max(args.settle, 0.12))
while not stop_requested:
pass_no += 1
for row in windows:
if stop_requested:
break
current_seq = row.seq
try:
probe.tune(row.center_mhz * 1e6)
rms, power_lin, dbfs, samples = probe.read_window(
settle=args.settle,
avg_reads=args.avg_reads,
pause_between_reads=args.pause_between_reads,
def get_effective_sample_rate(self) -> float:
if self.args.sample_rate is not None:
return float(self.args.sample_rate)
raw = self._read_input_attr(self.args.iio_i_channel, "sampling_frequency")
return float(raw)
def ensure_configured(self):
sample_rate = None
if self.args.sample_rate is not None:
sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE)))
bandwidth = None
if self.args.bandwidth is not None:
bandwidth = int(round(max(float(self.args.bandwidth), 200000.0)))
signature = (
None if sample_rate is None else float(sample_rate),
None if bandwidth is None else float(bandwidth),
self.args.iio_gain_mode,
self.args.iio_hardwaregain,
self.args.iio_port_select,
)
row.status = "OK"
row.rms = rms
row.power_lin = power_lin
row.dbfs = dbfs
row.samples = samples
row.error = ""
row.updated_at = time.time()
row.pass_no = pass_no
except Exception as exc:
row.status = "ERR"
row.error = str(exc)
row.updated_at = time.time()
render(
windows=windows,
serial=serial,
index=index,
sample_rate=args.sample_rate,
base_mhz=args.base,
roof_mhz=args.roof,
step_mhz=args.step,
started_at=started_at,
pass_no=pass_no,
current_seq=current_seq,
)
if args.passes > 0 and pass_no >= args.passes:
break
except Exception as exc:
print(f"scanner failed: {exc}", file=sys.stderr)
return 5
finally:
if probe is not None:
try:
probe.stop()
probe.wait()
except Exception:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import math
import re
import signal
import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
try:
import numpy as np
except Exception as exc:
print(f"numpy import failed: {exc}", file=sys.stderr)
sys.exit(1)
try:
from gnuradio import blocks, gr
import osmosdr
except Exception as exc:
print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr)
print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy_wide.py", file=sys.stderr)
sys.exit(1)
EPS = 1e-20
@dataclass
class ScanWindow:
seq: int
start_mhz: float
end_mhz: float
low_mhz: float
high_mhz: float
center_mhz: float
status: str = "INIT"
rms: Optional[float] = None
power_lin: Optional[float] = None
dbfs: Optional[float] = None
samples: int = 0
updated_at: float = 0.0
error: str = ""
pass_no: int = 0
class WideProbeTop(gr.top_block):
def __init__(
self,
index: int,
center_freq_hz: float,
sample_rate: float,
vec_len: int,
gain: float,
if_gain: float,
bb_gain: float,
):
super().__init__("hackrf_energy_wide_probe")
self.probe = blocks.probe_signal_vc(vec_len)
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
self.src.set_sample_rate(sample_rate)
self.src.set_center_freq(center_freq_hz, 0)
try:
self.src.set_freq_corr(0, 0)
except Exception:
pass
try:
self.src.set_gain_mode(False, 0)
except Exception:
pass
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
try:
getattr(self.src, fn)(val, 0)
except Exception:
pass
try:
self.src.set_bandwidth(0, 0)
except Exception:
pass
try:
self.src.set_antenna("", 0)
except Exception:
pass
self.connect((self.src, 0), (self.stream_to_vec, 0))
self.connect((self.stream_to_vec, 0), (self.probe, 0))
def tune(self, freq_hz: float) -> None:
self.src.set_center_freq(freq_hz, 0)
def read_metrics(self) -> Tuple[float, float, float, int]:
arr = np.asarray(self.probe.level(), dtype=np.complex64)
if arr.size == 0:
if signature == self._static_signature:
return
for channel in self._input_channels:
if sample_rate is not None:
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
if bandwidth is not None:
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
if self.args.iio_port_select:
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
if self.args.iio_gain_mode:
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
self._static_signature = signature
def tune(self, freq_hz: float):
target = int(round(freq_hz))
if self._last_freq_hz == target:
return
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
self._last_freq_hz = target
def read_metrics(self, samples_per_read: int) -> Tuple[float, float, float, int]:
cmd = [
"iio_readdev",
"-u",
self.args.uri,
"-T",
str(int(self.args.timeout_ms)),
"-b",
str(max(4, int(samples_per_read))),
"-s",
str(max(4, int(samples_per_read))),
self.args.iio_device,
self.args.iio_i_channel,
self.args.iio_q_channel,
]
raw = self._run_binary(cmd)
values = np.frombuffer(raw, dtype="<i2")
if values.size == 0:
raise RuntimeError("no samples")
power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
if values.size % 2 != 0:
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
i = iq[:, 0]
q = iq[:, 1]
power_lin = float(np.mean(i * i + q * q))
rms = math.sqrt(max(power_lin, 0.0))
dbfs = 10.0 * math.log10(max(power_lin, EPS))
return rms, power_lin, dbfs, int(arr.size)
return rms, power_lin, dbfs, int(iq.shape[0])
def read_window(self, settle: float, avg_reads: int, pause_between_reads: float) -> Tuple[float, float, float, int]:
def read_window(self, settle: float, avg_reads: int, pause_between_reads: float, samples_per_read: int) -> Tuple[float, float, float, int]:
if settle > 0:
time.sleep(settle)
@ -516,10 +309,10 @@ class WideProbeTop(gr.top_block):
last_error: Optional[Exception] = None
for idx in range(read_count):
deadline = time.time() + 1.0
deadline = time.time() + max(1.0, self.args.timeout_ms / 1000.0)
while True:
try:
_, power_lin, _, samples = self.read_metrics()
_, power_lin, _, samples = self.read_metrics(samples_per_read)
powers.append(power_lin)
sample_sizes.append(samples)
break
@ -541,21 +334,21 @@ class WideProbeTop(gr.top_block):
def parse_hackrf_info() -> Dict[str, int]:
try:
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
except FileNotFoundError:
raise RuntimeError("hackrf_info not found")
except subprocess.TimeoutExpired:
raise RuntimeError("hackrf_info timeout")
except FileNotFoundError as exc:
raise RuntimeError("hackrf_info not found") from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError("hackrf_info timeout") from exc
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
out: Dict[str, int] = {}
cur_idx: Optional[int] = None
for line in text.splitlines():
m = re.search(r"^Index:\s*(\d+)", line)
if m:
cur_idx = int(m.group(1))
match = re.search(r"^Index:\s*(\d+)", line)
if match:
cur_idx = int(match.group(1))
continue
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if m and cur_idx is not None:
out[m.group(1).lower()] = cur_idx
match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if match and cur_idx is not None:
out[match.group(1).lower()] = cur_idx
if not out:
raise RuntimeError("no devices parsed from hackrf_info")
return out
@ -607,8 +400,9 @@ def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[Sca
def render(
windows: List[ScanWindow],
serial: str,
index: int,
backend: str,
device_ref: str,
index_ref: str,
sample_rate: float,
base_mhz: float,
roof_mhz: float,
@ -626,73 +420,93 @@ def render(
default=None,
)
print("\x1b[2J\x1b[H", end="")
print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)")
print("Wide Energy Monitor (relative power: RMS / linear / dBFS)")
print(
f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | "
f"backend: {backend} | device: {device_ref} | idx: {index_ref} | sample-rate: {capture_bw_mhz:.3f} MHz | "
f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | "
f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
print()
header = (
f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} "
f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error"
f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>6} {'age':>5} error"
)
print(header)
print("-" * len(header))
for row in windows:
age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}"
age = "-" if row.updated_at <= 0 else f"{(now - row.updated_at):.1f}"
err = row.error
if len(err) > 50:
err = err[:47] + "..."
if len(err) > 56:
err = err[:53] + "..."
marker = ">>>" if row.seq == current_seq else ""
print(
f"{marker:>3} {row.seq:>3} "
f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} "
f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} "
f"{row.samples:>5} {age:>5} {err}"
f"{row.samples:>6} {age:>5} {err}"
)
print()
if best_row is not None:
best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}"
best_age = "-" if best_row.updated_at <= 0 else f"{(now - best_row.updated_at):.1f}"
print(
f"{'':>3} {'MAX':>3} "
f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} "
f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} "
f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}"
f"{best_row.samples:>6} {best_age:>5} pass={best_row.pass_no}"
)
elif current_row is not None:
current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}"
current_age = "-" if current_row.updated_at <= 0 else f"{(now - current_row.updated_at):.1f}"
print(
f"{'':>3} {'MAX':>3} "
f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} "
f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} "
f"{0:>5} {current_age:>5} no successful windows yet"
f"{0:>6} {current_age:>5} no successful windows yet"
)
print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.")
sys.stdout.flush()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy")
parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info")
parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz")
parser = argparse.ArgumentParser(description="Retune one SDR across a wide frequency range and measure energy")
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
parser.add_argument("--serial", help="HackRF serial number from hackrf_info")
parser.add_argument(
"--sample-rate",
type=float,
default=None,
help="Sample rate in Hz (required for hackrf, optional for iio to keep device setting)",
)
parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (iio only, optional)")
parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz")
parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz")
parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / samples per read")
parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)")
parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window")
parser.add_argument("--avg-reads", type=int, default=3, help="How many reads to average per window")
parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)")
parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite")
parser.add_argument("--gain", type=float, default=16.0, help="General gain")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
parser.add_argument("--gain", type=float, default=16.0, help="General gain for HackRF")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF")
parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1")
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
parser.add_argument(
"--iio-gain-mode",
choices=("manual", "fast_attack", "slow_attack", "hybrid"),
default="slow_attack",
help="IIO gain control mode",
)
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
return parser
def main() -> int:
args = build_parser().parse_args()
serial = args.serial.lower()
try:
windows = build_windows(args.base, args.roof, args.step)
@ -700,11 +514,23 @@ def main() -> int:
print(f"invalid scan range: {exc}", file=sys.stderr)
return 2
probe: Optional[HackRfWideProbe | IioWideProbe] = None
device_ref = ""
index_ref = "-"
if args.backend == "hackrf":
if not args.serial:
print("--serial is required for --backend hackrf", file=sys.stderr)
return 2
if args.sample_rate is None:
print("--sample-rate is required for --backend hackrf", file=sys.stderr)
return 2
serial = args.serial.lower()
step_hz = args.step * 1e6
if args.sample_rate < step_hz:
print(
f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; "
"this would leave gaps in the scan",
f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; this would leave gaps in the scan",
file=sys.stderr,
)
return 2
@ -723,6 +549,38 @@ def main() -> int:
print(f" idx={item_index} serial={item_serial}", file=sys.stderr)
return 4
probe = HackRfWideProbe(
serial=serial,
center_freq_hz=windows[0].center_mhz * 1e6,
sample_rate=args.sample_rate,
vec_len=args.vec_len,
gain=args.gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
effective_sample_rate = float(args.sample_rate)
device_ref = serial
index_ref = str(index)
else:
probe = IioWideProbe(args)
try:
probe.ensure_configured()
effective_sample_rate = probe.get_effective_sample_rate()
except Exception as exc:
print(f"iio setup failed: {exc}", file=sys.stderr)
return 3
step_hz = args.step * 1e6
if effective_sample_rate < step_hz:
print(
f"effective sample-rate {effective_sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; this would leave gaps in the scan",
file=sys.stderr,
)
return 2
device_ref = args.uri
index_ref = "iio"
stop_requested = False
def on_signal(signum, frame):
@ -732,21 +590,12 @@ def main() -> int:
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
probe: Optional[WideProbeTop] = None
started_at = time.time()
pass_no = 0
current_seq = windows[0].seq
try:
probe = WideProbeTop(
index=index,
center_freq_hz=windows[0].center_mhz * 1e6,
sample_rate=args.sample_rate,
vec_len=args.vec_len,
gain=args.gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
if isinstance(probe, HackRfWideProbe):
probe.start()
time.sleep(max(args.settle, 0.12))
@ -758,10 +607,18 @@ def main() -> int:
current_seq = row.seq
try:
probe.tune(row.center_mhz * 1e6)
if isinstance(probe, HackRfWideProbe):
rms, power_lin, dbfs, samples = probe.read_window(
settle=args.settle,
avg_reads=args.avg_reads,
pause_between_reads=args.pause_between_reads,
)
else:
rms, power_lin, dbfs, samples = probe.read_window(
settle=args.settle,
avg_reads=args.avg_reads,
pause_between_reads=args.pause_between_reads,
samples_per_read=args.vec_len,
)
row.status = "OK"
row.rms = rms
@ -775,11 +632,13 @@ def main() -> int:
row.status = "ERR"
row.error = str(exc)
row.updated_at = time.time()
render(
windows=windows,
serial=serial,
index=index,
sample_rate=args.sample_rate,
backend=args.backend,
device_ref=device_ref,
index_ref=index_ref,
sample_rate=effective_sample_rate,
base_mhz=args.base,
roof_mhz=args.roof,
step_mhz=args.step,
@ -787,13 +646,12 @@ def main() -> int:
pass_no=pass_no,
current_seq=current_seq,
)
if args.passes > 0 and pass_no >= args.passes:
break
except Exception as exc:
print(f"scanner failed: {exc}", file=sys.stderr)
return 5
finally:
if probe is not None:
if isinstance(probe, HackRfWideProbe):
try:
probe.stop()
probe.wait()

Loading…
Cancel
Save