#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import os import signal import subprocess import sys import threading import time import numpy as np from gnuradio import gr import osmosdr IIO_MIN_SAMPLE_RATE = 2_083_333 class RotatingIqWriter: def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0): self.save_dir = str(save_dir) self.file_tag = str(file_tag) self.split_size = int(split_size) self.delay = float(delay) os.makedirs(self.save_dir, exist_ok=True) self.file_index = 0 self.current_len = 0 self.current_fd = None self.in_progress_path = os.path.join(self.save_dir, "reading_in_progress") self.in_progress_fd = None self._open_next_file() def _touch_in_progress(self): if self.in_progress_fd is not None: try: self.in_progress_fd.close() except Exception: pass self.in_progress_fd = open(self.in_progress_path, "wb") def _remove_in_progress(self): if self.in_progress_fd is not None: try: self.in_progress_fd.close() except Exception: pass self.in_progress_fd = None if os.path.exists(self.in_progress_path): try: os.remove(self.in_progress_path) except Exception: pass def _current_file_path(self): return os.path.join(self.save_dir, f"{self.file_tag}{self.file_index}") def _open_next_file(self): self._touch_in_progress() path = self._current_file_path() self.current_fd = open(path, "wb") self.current_len = 0 print(f"Opened file: {path}", flush=True) def _rotate_file(self): path = self._current_file_path() print(f"Saving file: {path}", flush=True) if self.current_fd is not None: self.current_fd.close() self.current_fd = None self._remove_in_progress() if self.delay > 0: time.sleep(self.delay) self.file_index += 1 self._open_next_file() def write_samples(self, samples): data = np.asarray(samples, dtype=np.complex64).reshape(-1) offset = 0 total = int(data.size) while offset < total: remaining = self.split_size - self.current_len chunk = min(remaining, total - offset) block = np.ascontiguousarray(data[offset:offset + chunk], dtype=np.complex64) self.current_fd.write(block.tobytes()) self.current_len += chunk offset += chunk if self.current_len >= self.split_size: self._rotate_file() def close(self): try: if self.current_fd is not None: self.current_fd.close() self.current_fd = None finally: self._remove_in_progress() class SimsiSink(gr.sync_block): def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0): gr.sync_block.__init__( self, name="Simsi_Sink", in_sig=[np.complex64], out_sig=None, ) self.writer = RotatingIqWriter( save_dir=save_dir, file_tag=file_tag, split_size=split_size, delay=delay, ) def work(self, input_items, output_items): self.writer.write_samples(input_items[0]) return len(input_items[0]) def stop(self): self.writer.close() return True class HackRfDataSaver(gr.top_block): def __init__( self, serial: str, freq: float, save_dir: str, file_tag: str, samp_rate: float, split_size: int, delay: float, rf_gain: float, if_gain: float, bb_gain: float, ): super().__init__("data_saver_headless", catch_exceptions=True) dev_args = f"numchan=1 hackrf={serial}" self.source = osmosdr.source(args=dev_args) self.source.set_time_unknown_pps(osmosdr.time_spec_t()) self.source.set_sample_rate(float(samp_rate)) self.source.set_center_freq(float(freq), 0) self.source.set_freq_corr(0, 0) self.source.set_dc_offset_mode(0, 0) self.source.set_iq_balance_mode(0, 0) self.source.set_gain_mode(False, 0) self.source.set_gain(float(rf_gain), 0) self.source.set_if_gain(float(if_gain), 0) self.source.set_bb_gain(float(bb_gain), 0) self.source.set_antenna("", 0) self.source.set_bandwidth(0, 0) self.sink = SimsiSink( save_dir=save_dir, file_tag=file_tag, split_size=split_size, delay=delay, ) self.connect((self.source, 0), (self.sink, 0)) class IioCapture: def __init__(self, args: argparse.Namespace): self.args = args self.writer = RotatingIqWriter( save_dir=args.save_dir, file_tag=args.file_tag, split_size=args.split_size, delay=args.delay, ) self._static_signature = None self._last_freq_hz = None self._input_channels = [args.iio_i_channel] if args.iio_q_channel not in self._input_channels: self._input_channels.append(args.iio_q_channel) def _run(self, cmd): proc = subprocess.run(list(cmd), capture_output=True, text=True) if proc.returncode != 0: details = (proc.stderr or "").strip() or (proc.stdout or "").strip() or f"exit code {proc.returncode}" raise RuntimeError(details) return proc.stdout def _run_binary(self, cmd, stop_event: threading.Event): proc = subprocess.Popen(list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: while True: try: stdout, stderr = proc.communicate(timeout=0.2) break except subprocess.TimeoutExpired: if stop_event.is_set(): proc.terminate() try: proc.communicate(timeout=1.0) except subprocess.TimeoutExpired: proc.kill() proc.communicate() raise InterruptedError("stop requested") if proc.returncode != 0: details = stderr.decode("utf-8", errors="ignore").strip() or stdout.decode("utf-8", errors="ignore").strip() raise RuntimeError(details or f"exit code {proc.returncode}") return stdout finally: if proc.poll() is None: proc.kill() proc.communicate() def _set_input_attr(self, channel: str, attr: str, value: str): self._run( [ "iio_attr", "-u", self.args.iio_uri, "-q", "-i", "-c", self.args.iio_phy_device, channel, attr, value, ] ) def _set_output_attr(self, channel: str, attr: str, value: str): self._run( [ "iio_attr", "-u", self.args.iio_uri, "-q", "-o", "-c", self.args.iio_phy_device, channel, attr, value, ] ) def ensure_configured(self): sample_rate = int(round(max(float(self.args.samp_rate), IIO_MIN_SAMPLE_RATE))) bandwidth = None if self.args.bandwidth is not None: bandwidth = int(round(max(float(self.args.bandwidth), 200_000.0))) signature = ( float(sample_rate), None if bandwidth is None else float(bandwidth), self.args.iio_gain_mode, self.args.iio_hardwaregain, self.args.iio_port_select, ) if signature == self._static_signature: return for channel in self._input_channels: self._set_input_attr(channel, "sampling_frequency", str(sample_rate)) if bandwidth is not None: self._set_input_attr(channel, "rf_bandwidth", str(bandwidth)) if self.args.iio_port_select: self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select) if self.args.iio_gain_mode: self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode) if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None: self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}") self._static_signature = signature def tune(self): target = int(round(float(self.args.freq))) if self._last_freq_hz == target: return self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target)) self._last_freq_hz = target def read_chunk(self, stop_event: threading.Event): samples_per_read = self.args.samples_per_read if samples_per_read is None: samples_per_read = max(4096, min(int(self.args.split_size), 262_144)) raw = self._run_binary( [ "iio_readdev", "-u", self.args.iio_uri, "-T", str(int(self.args.timeout_ms)), "-b", str(int(samples_per_read)), "-s", str(int(samples_per_read)), self.args.iio_device, self.args.iio_i_channel, self.args.iio_q_channel, ], stop_event, ) values = np.frombuffer(raw, dtype=" 0: time.sleep(self.args.settle) while not stop_event.is_set(): try: samples = self.read_chunk(stop_event) except InterruptedError: break self.writer.write_samples(samples) def close(self): self.writer.close() def parse_args(): parser = argparse.ArgumentParser(description="Headless IQ saver for HackRF or IIO SDR backends") parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend") parser.add_argument("--serial", help="HackRF serial number") parser.add_argument("--freq", type=float, required=True, help="Center frequency in Hz") parser.add_argument("--save-dir", required=True, help="Directory for output IQ files") parser.add_argument("--file-tag", default="fragment_", help="Prefix for output files") parser.add_argument("--samp-rate", type=float, default=20e6, help="Sample rate in S/s") parser.add_argument("--split-size", type=int, default=400000, help="Max complex samples per file") parser.add_argument("--delay", type=float, default=0.1, help="Delay between files in seconds") parser.add_argument("--rf-gain", type=float, default=12, help="HackRF RF gain") parser.add_argument("--if-gain", type=float, default=30, help="HackRF IF gain") parser.add_argument("--bb-gain", type=float, default=36, help="HackRF BB gain") parser.add_argument("--bandwidth", type=float, default=None, help="Optional IIO RF bandwidth in Hz") parser.add_argument("--iio-uri", default="ip:192.168.2.1", help="IIO URI") parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device") parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device") parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel") parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel") parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency") parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value") parser.add_argument("--iio-gain-mode", default="slow_attack", help="IIO gain_control_mode value") parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode") parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds") parser.add_argument("--settle", type=float, default=0.12, help="Delay after tuning for IIO backend") parser.add_argument("--samples-per-read", type=int, default=None, help="IIO complex samples per read call") return parser.parse_args() def main(): args = parse_args() stop_event = threading.Event() if args.backend == "hackrf" and not args.serial: print("--serial is required for --backend hackrf", file=sys.stderr) sys.exit(2) capture = None tb = None def handle_signal(sig, frame): print(f"Received signal {sig}, stopping...", flush=True) stop_event.set() if tb is not None: tb.stop() tb.wait() signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) print("Starting capture...", flush=True) print(f" backend: {args.backend}", flush=True) print(f" freq: {args.freq}", flush=True) print(f" save_dir: {args.save_dir}", flush=True) print(f" file_tag: {args.file_tag}", flush=True) print(f" samp_rate: {args.samp_rate}", flush=True) print(f" split_size: {args.split_size}", flush=True) try: if args.backend == "hackrf": print(f" serial: {args.serial}", flush=True) print(f" gains: rf={args.rf_gain} if={args.if_gain} bb={args.bb_gain}", flush=True) tb = HackRfDataSaver( serial=args.serial, freq=args.freq, save_dir=args.save_dir, file_tag=args.file_tag, samp_rate=args.samp_rate, split_size=args.split_size, delay=args.delay, rf_gain=args.rf_gain, if_gain=args.if_gain, bb_gain=args.bb_gain, ) tb.start() while not stop_event.is_set(): time.sleep(0.5) else: print(f" uri: {args.iio_uri}", flush=True) print( f" iio: device={args.iio_device} phy={args.iio_phy_device} " f"port={args.iio_port_select} gain_mode={args.iio_gain_mode}", flush=True, ) capture = IioCapture(args) capture.run(stop_event) except KeyboardInterrupt: handle_signal(signal.SIGINT, None) finally: if capture is not None: capture.close() if tb is not None and not stop_event.is_set(): tb.stop() tb.wait() print("Stopped.", flush=True) if __name__ == "__main__": main()