#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import time import signal import argparse import threading import numpy as np from gnuradio import gr import osmosdr class SimsiSink(gr.sync_block): def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1000000, delay=0.0): gr.sync_block.__init__( self, name="Simsi_Sink", in_sig=[np.complex64], out_sig=None, ) self.save_dir = str(save_dir) self.file_tag = str(file_tag) self.split_size = int(split_size) self.delay = float(delay) os.makedirs(self.save_dir, exist_ok=True) self.file_index = 0 self.current_len = 0 self.current_fd = None self.in_progress_path = os.path.join(self.save_dir, "reading_in_progress") self.in_progress_fd = None self._open_next_file() def _touch_in_progress(self): if self.in_progress_fd is not None: try: self.in_progress_fd.close() except Exception: pass self.in_progress_fd = open(self.in_progress_path, "wb") def _remove_in_progress(self): if self.in_progress_fd is not None: try: self.in_progress_fd.close() except Exception: pass self.in_progress_fd = None if os.path.exists(self.in_progress_path): try: os.remove(self.in_progress_path) except Exception: pass def _current_file_path(self): return os.path.join(self.save_dir, f"{self.file_tag}{self.file_index}") def _open_next_file(self): self._touch_in_progress() path = self._current_file_path() self.current_fd = open(path, "wb") self.current_len = 0 print(f"Opened file: {path}", flush=True) def _rotate_file(self): path = self._current_file_path() print(f"Saving file: {path}", flush=True) if self.current_fd is not None: self.current_fd.close() self.current_fd = None self._remove_in_progress() if self.delay > 0: time.sleep(self.delay) self.file_index += 1 self._open_next_file() def work(self, input_items, output_items): data = input_items[0] offset = 0 total = len(data) while offset < total: remaining = self.split_size - self.current_len chunk = min(remaining, total - offset) self.current_fd.write(data[offset:offset + chunk].copy()) self.current_len += chunk offset += chunk if self.current_len >= self.split_size: self._rotate_file() return len(data) def stop(self): try: if self.current_fd is not None: self.current_fd.close() self.current_fd = None finally: self._remove_in_progress() return True class DataSaver(gr.top_block): def __init__( self, serial: str, freq: float, save_dir: str, file_tag: str, samp_rate: float, split_size: int, delay: float, rf_gain: float, if_gain: float, bb_gain: float, ): super().__init__("data_saver_headless", catch_exceptions=True) self.serial = serial self.freq = float(freq) self.save_dir = save_dir self.file_tag = file_tag self.samp_rate = float(samp_rate) self.split_size = int(split_size) self.delay = float(delay) self.rf_gain = float(rf_gain) self.if_gain = float(if_gain) self.bb_gain = float(bb_gain) dev_args = f"numchan=1 hackrf={self.serial}" self.source = osmosdr.source(args=dev_args) self.source.set_time_unknown_pps(osmosdr.time_spec_t()) self.source.set_sample_rate(self.samp_rate) self.source.set_center_freq(self.freq, 0) self.source.set_freq_corr(0, 0) self.source.set_dc_offset_mode(0, 0) self.source.set_iq_balance_mode(0, 0) self.source.set_gain_mode(False, 0) self.source.set_gain(self.rf_gain, 0) self.source.set_if_gain(self.if_gain, 0) self.source.set_bb_gain(self.bb_gain, 0) self.source.set_antenna("", 0) self.source.set_bandwidth(0, 0) self.sink = SimsiSink( save_dir=self.save_dir, file_tag=self.file_tag, split_size=self.split_size, delay=self.delay, ) self.connect((self.source, 0), (self.sink, 0)) def parse_args(): parser = argparse.ArgumentParser(description="Headless GNU Radio IQ saver for HackRF") parser.add_argument("--serial", required=True, help="HackRF serial number") parser.add_argument("--freq", type=float, required=True, help="Center frequency in Hz") parser.add_argument("--save-dir", required=True, help="Directory for output IQ files") parser.add_argument("--file-tag", default="fragment_", help="Prefix for output files") parser.add_argument("--samp-rate", type=float, default=20e6, help="Sample rate in S/s") parser.add_argument("--split-size", type=int, default=400000, help="Max complex samples per file") parser.add_argument("--delay", type=float, default=0.1, help="Delay between files in seconds") parser.add_argument("--rf-gain", type=float, default=12, help="HackRF RF gain") parser.add_argument("--if-gain", type=float, default=30, help="HackRF IF gain") parser.add_argument("--bb-gain", type=float, default=36, help="HackRF BB gain") return parser.parse_args() def main(): args = parse_args() tb = DataSaver( serial=args.serial, freq=args.freq, save_dir=args.save_dir, file_tag=args.file_tag, samp_rate=args.samp_rate, split_size=args.split_size, delay=args.delay, rf_gain=args.rf_gain, if_gain=args.if_gain, bb_gain=args.bb_gain, ) stop_event = threading.Event() def handle_signal(sig, frame): print(f"Received signal {sig}, stopping...", flush=True) stop_event.set() tb.stop() tb.wait() signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) print("Starting flowgraph...", flush=True) print(f" serial: {args.serial}", flush=True) print(f" freq: {args.freq}", flush=True) print(f" save_dir: {args.save_dir}", flush=True) print(f" file_tag: {args.file_tag}", flush=True) print(f" samp_rate: {args.samp_rate}", flush=True) print(f" split_size: {args.split_size}", flush=True) print(f" gains: rf={args.rf_gain} if={args.if_gain} bb={args.bb_gain}", flush=True) tb.start() try: while not stop_event.is_set(): time.sleep(0.5) except KeyboardInterrupt: handle_signal(signal.SIGINT, None) print("Stopped.", flush=True) if __name__ == "__main__": main()