You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/scripts_nn/data_saver_headless.py

448 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import os
import signal
import subprocess
import sys
import threading
import time
import numpy as np
from gnuradio import gr
import osmosdr
IIO_MIN_SAMPLE_RATE = 2_083_333
class RotatingIqWriter:
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0):
self.save_dir = str(save_dir)
self.file_tag = str(file_tag)
self.split_size = int(split_size)
self.delay = float(delay)
os.makedirs(self.save_dir, exist_ok=True)
self.file_index = 0
self.current_len = 0
self.current_fd = None
self.in_progress_path = os.path.join(self.save_dir, "reading_in_progress")
self.in_progress_fd = None
self._open_next_file()
def _touch_in_progress(self):
if self.in_progress_fd is not None:
try:
self.in_progress_fd.close()
except Exception:
pass
self.in_progress_fd = open(self.in_progress_path, "wb")
def _remove_in_progress(self):
if self.in_progress_fd is not None:
try:
self.in_progress_fd.close()
except Exception:
pass
self.in_progress_fd = None
if os.path.exists(self.in_progress_path):
try:
os.remove(self.in_progress_path)
except Exception:
pass
def _current_file_path(self):
return os.path.join(self.save_dir, f"{self.file_tag}{self.file_index}")
def _open_next_file(self):
self._touch_in_progress()
path = self._current_file_path()
self.current_fd = open(path, "wb")
self.current_len = 0
print(f"Opened file: {path}", flush=True)
def _rotate_file(self):
path = self._current_file_path()
print(f"Saving file: {path}", flush=True)
if self.current_fd is not None:
self.current_fd.close()
self.current_fd = None
self._remove_in_progress()
if self.delay > 0:
time.sleep(self.delay)
self.file_index += 1
self._open_next_file()
def write_samples(self, samples):
data = np.asarray(samples, dtype=np.complex64).reshape(-1)
offset = 0
total = int(data.size)
while offset < total:
remaining = self.split_size - self.current_len
chunk = min(remaining, total - offset)
block = np.ascontiguousarray(data[offset:offset + chunk], dtype=np.complex64)
self.current_fd.write(block.tobytes())
self.current_len += chunk
offset += chunk
if self.current_len >= self.split_size:
self._rotate_file()
def close(self):
try:
if self.current_fd is not None:
self.current_fd.close()
self.current_fd = None
finally:
self._remove_in_progress()
class SimsiSink(gr.sync_block):
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0):
gr.sync_block.__init__(
self,
name="Simsi_Sink",
in_sig=[np.complex64],
out_sig=None,
)
self.writer = RotatingIqWriter(
save_dir=save_dir,
file_tag=file_tag,
split_size=split_size,
delay=delay,
)
def work(self, input_items, output_items):
self.writer.write_samples(input_items[0])
return len(input_items[0])
def stop(self):
self.writer.close()
return True
class HackRfDataSaver(gr.top_block):
def __init__(
self,
serial: str,
freq: float,
save_dir: str,
file_tag: str,
samp_rate: float,
split_size: int,
delay: float,
rf_gain: float,
if_gain: float,
bb_gain: float,
):
super().__init__("data_saver_headless", catch_exceptions=True)
dev_args = f"numchan=1 hackrf={serial}"
self.source = osmosdr.source(args=dev_args)
self.source.set_time_unknown_pps(osmosdr.time_spec_t())
self.source.set_sample_rate(float(samp_rate))
self.source.set_center_freq(float(freq), 0)
self.source.set_freq_corr(0, 0)
self.source.set_dc_offset_mode(0, 0)
self.source.set_iq_balance_mode(0, 0)
self.source.set_gain_mode(False, 0)
self.source.set_gain(float(rf_gain), 0)
self.source.set_if_gain(float(if_gain), 0)
self.source.set_bb_gain(float(bb_gain), 0)
self.source.set_antenna("", 0)
self.source.set_bandwidth(0, 0)
self.sink = SimsiSink(
save_dir=save_dir,
file_tag=file_tag,
split_size=split_size,
delay=delay,
)
self.connect((self.source, 0), (self.sink, 0))
class IioCapture:
def __init__(self, args: argparse.Namespace):
self.args = args
self.writer = RotatingIqWriter(
save_dir=args.save_dir,
file_tag=args.file_tag,
split_size=args.split_size,
delay=args.delay,
)
self._static_signature = None
self._last_freq_hz = None
self._input_channels = [args.iio_i_channel]
if args.iio_q_channel not in self._input_channels:
self._input_channels.append(args.iio_q_channel)
def _run(self, cmd):
proc = subprocess.run(list(cmd), capture_output=True, text=True)
if proc.returncode != 0:
details = (proc.stderr or "").strip() or (proc.stdout or "").strip() or f"exit code {proc.returncode}"
raise RuntimeError(details)
return proc.stdout
def _run_binary(self, cmd, stop_event: threading.Event):
proc = subprocess.Popen(list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
while True:
try:
stdout, stderr = proc.communicate(timeout=0.2)
break
except subprocess.TimeoutExpired:
if stop_event.is_set():
proc.terminate()
try:
proc.communicate(timeout=1.0)
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
raise InterruptedError("stop requested")
if proc.returncode != 0:
details = stderr.decode("utf-8", errors="ignore").strip() or stdout.decode("utf-8", errors="ignore").strip()
raise RuntimeError(details or f"exit code {proc.returncode}")
return stdout
finally:
if proc.poll() is None:
proc.kill()
proc.communicate()
def _set_input_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.iio_uri,
"-q",
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def _set_output_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.iio_uri,
"-q",
"-o",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def ensure_configured(self):
sample_rate = int(round(max(float(self.args.samp_rate), IIO_MIN_SAMPLE_RATE)))
bandwidth = None
if self.args.bandwidth is not None:
bandwidth = int(round(max(float(self.args.bandwidth), 200_000.0)))
signature = (
float(sample_rate),
None if bandwidth is None else float(bandwidth),
self.args.iio_gain_mode,
self.args.iio_hardwaregain,
self.args.iio_port_select,
)
if signature == self._static_signature:
return
for channel in self._input_channels:
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
if bandwidth is not None:
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
if self.args.iio_port_select:
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
if self.args.iio_gain_mode:
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
self._static_signature = signature
def tune(self):
target = int(round(float(self.args.freq)))
if self._last_freq_hz == target:
return
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
self._last_freq_hz = target
def read_chunk(self, stop_event: threading.Event):
samples_per_read = self.args.samples_per_read
if samples_per_read is None:
samples_per_read = max(4096, min(int(self.args.split_size), 262_144))
raw = self._run_binary(
[
"iio_readdev",
"-u",
self.args.iio_uri,
"-T",
str(int(self.args.timeout_ms)),
"-b",
str(int(samples_per_read)),
"-s",
str(int(samples_per_read)),
self.args.iio_device,
self.args.iio_i_channel,
self.args.iio_q_channel,
],
stop_event,
)
values = np.frombuffer(raw, dtype="<i2")
if values.size == 0:
raise RuntimeError("no samples")
if values.size % 2 != 0:
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
complex_samples = (iq[:, 0] + 1j * iq[:, 1]).astype(np.complex64, copy=False)
complex_samples /= 32768.0
return complex_samples
def run(self, stop_event: threading.Event):
self.ensure_configured()
self.tune()
if self.args.settle > 0:
time.sleep(self.args.settle)
while not stop_event.is_set():
try:
samples = self.read_chunk(stop_event)
except InterruptedError:
break
self.writer.write_samples(samples)
def close(self):
self.writer.close()
def parse_args():
parser = argparse.ArgumentParser(description="Headless IQ saver for HackRF or IIO SDR backends")
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
parser.add_argument("--serial", help="HackRF serial number")
parser.add_argument("--freq", type=float, required=True, help="Center frequency in Hz")
parser.add_argument("--save-dir", required=True, help="Directory for output IQ files")
parser.add_argument("--file-tag", default="fragment_", help="Prefix for output files")
parser.add_argument("--samp-rate", type=float, default=20e6, help="Sample rate in S/s")
parser.add_argument("--split-size", type=int, default=400000, help="Max complex samples per file")
parser.add_argument("--delay", type=float, default=0.1, help="Delay between files in seconds")
parser.add_argument("--rf-gain", type=float, default=12, help="HackRF RF gain")
parser.add_argument("--if-gain", type=float, default=30, help="HackRF IF gain")
parser.add_argument("--bb-gain", type=float, default=36, help="HackRF BB gain")
parser.add_argument("--bandwidth", type=float, default=None, help="Optional IIO RF bandwidth in Hz")
parser.add_argument("--iio-uri", default="ip:192.168.2.1", help="IIO URI")
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
parser.add_argument("--iio-gain-mode", default="slow_attack", help="IIO gain_control_mode value")
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
parser.add_argument("--settle", type=float, default=0.12, help="Delay after tuning for IIO backend")
parser.add_argument("--samples-per-read", type=int, default=None, help="IIO complex samples per read call")
return parser.parse_args()
def main():
args = parse_args()
stop_event = threading.Event()
if args.backend == "hackrf" and not args.serial:
print("--serial is required for --backend hackrf", file=sys.stderr)
sys.exit(2)
capture = None
tb = None
def handle_signal(sig, frame):
print(f"Received signal {sig}, stopping...", flush=True)
stop_event.set()
if tb is not None:
tb.stop()
tb.wait()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
print("Starting capture...", flush=True)
print(f" backend: {args.backend}", flush=True)
print(f" freq: {args.freq}", flush=True)
print(f" save_dir: {args.save_dir}", flush=True)
print(f" file_tag: {args.file_tag}", flush=True)
print(f" samp_rate: {args.samp_rate}", flush=True)
print(f" split_size: {args.split_size}", flush=True)
try:
if args.backend == "hackrf":
print(f" serial: {args.serial}", flush=True)
print(f" gains: rf={args.rf_gain} if={args.if_gain} bb={args.bb_gain}", flush=True)
tb = HackRfDataSaver(
serial=args.serial,
freq=args.freq,
save_dir=args.save_dir,
file_tag=args.file_tag,
samp_rate=args.samp_rate,
split_size=args.split_size,
delay=args.delay,
rf_gain=args.rf_gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
tb.start()
while not stop_event.is_set():
time.sleep(0.5)
else:
print(f" uri: {args.iio_uri}", flush=True)
print(
f" iio: device={args.iio_device} phy={args.iio_phy_device} "
f"port={args.iio_port_select} gain_mode={args.iio_gain_mode}",
flush=True,
)
capture = IioCapture(args)
capture.run(stop_event)
except KeyboardInterrupt:
handle_signal(signal.SIGINT, None)
finally:
if capture is not None:
capture.close()
if tb is not None and not stop_event.is_set():
tb.stop()
tb.wait()
print("Stopped.", flush=True)
if __name__ == "__main__":
main()