You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
448 lines
15 KiB
Python
448 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
import numpy as np
|
|
from gnuradio import gr
|
|
import osmosdr
|
|
|
|
IIO_MIN_SAMPLE_RATE = 2_083_333
|
|
|
|
|
|
class RotatingIqWriter:
|
|
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0):
|
|
self.save_dir = str(save_dir)
|
|
self.file_tag = str(file_tag)
|
|
self.split_size = int(split_size)
|
|
self.delay = float(delay)
|
|
|
|
os.makedirs(self.save_dir, exist_ok=True)
|
|
|
|
self.file_index = 0
|
|
self.current_len = 0
|
|
self.current_fd = None
|
|
|
|
self.in_progress_path = os.path.join(self.save_dir, "reading_in_progress")
|
|
self.in_progress_fd = None
|
|
|
|
self._open_next_file()
|
|
|
|
def _touch_in_progress(self):
|
|
if self.in_progress_fd is not None:
|
|
try:
|
|
self.in_progress_fd.close()
|
|
except Exception:
|
|
pass
|
|
self.in_progress_fd = open(self.in_progress_path, "wb")
|
|
|
|
def _remove_in_progress(self):
|
|
if self.in_progress_fd is not None:
|
|
try:
|
|
self.in_progress_fd.close()
|
|
except Exception:
|
|
pass
|
|
self.in_progress_fd = None
|
|
|
|
if os.path.exists(self.in_progress_path):
|
|
try:
|
|
os.remove(self.in_progress_path)
|
|
except Exception:
|
|
pass
|
|
|
|
def _current_file_path(self):
|
|
return os.path.join(self.save_dir, f"{self.file_tag}{self.file_index}")
|
|
|
|
def _open_next_file(self):
|
|
self._touch_in_progress()
|
|
path = self._current_file_path()
|
|
self.current_fd = open(path, "wb")
|
|
self.current_len = 0
|
|
print(f"Opened file: {path}", flush=True)
|
|
|
|
def _rotate_file(self):
|
|
path = self._current_file_path()
|
|
print(f"Saving file: {path}", flush=True)
|
|
|
|
if self.current_fd is not None:
|
|
self.current_fd.close()
|
|
self.current_fd = None
|
|
|
|
self._remove_in_progress()
|
|
|
|
if self.delay > 0:
|
|
time.sleep(self.delay)
|
|
|
|
self.file_index += 1
|
|
self._open_next_file()
|
|
|
|
def write_samples(self, samples):
|
|
data = np.asarray(samples, dtype=np.complex64).reshape(-1)
|
|
offset = 0
|
|
total = int(data.size)
|
|
|
|
while offset < total:
|
|
remaining = self.split_size - self.current_len
|
|
chunk = min(remaining, total - offset)
|
|
block = np.ascontiguousarray(data[offset:offset + chunk], dtype=np.complex64)
|
|
self.current_fd.write(block.tobytes())
|
|
self.current_len += chunk
|
|
offset += chunk
|
|
|
|
if self.current_len >= self.split_size:
|
|
self._rotate_file()
|
|
|
|
def close(self):
|
|
try:
|
|
if self.current_fd is not None:
|
|
self.current_fd.close()
|
|
self.current_fd = None
|
|
finally:
|
|
self._remove_in_progress()
|
|
|
|
|
|
class SimsiSink(gr.sync_block):
|
|
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0):
|
|
gr.sync_block.__init__(
|
|
self,
|
|
name="Simsi_Sink",
|
|
in_sig=[np.complex64],
|
|
out_sig=None,
|
|
)
|
|
self.writer = RotatingIqWriter(
|
|
save_dir=save_dir,
|
|
file_tag=file_tag,
|
|
split_size=split_size,
|
|
delay=delay,
|
|
)
|
|
|
|
def work(self, input_items, output_items):
|
|
self.writer.write_samples(input_items[0])
|
|
return len(input_items[0])
|
|
|
|
def stop(self):
|
|
self.writer.close()
|
|
return True
|
|
|
|
|
|
class HackRfDataSaver(gr.top_block):
|
|
def __init__(
|
|
self,
|
|
serial: str,
|
|
freq: float,
|
|
save_dir: str,
|
|
file_tag: str,
|
|
samp_rate: float,
|
|
split_size: int,
|
|
delay: float,
|
|
rf_gain: float,
|
|
if_gain: float,
|
|
bb_gain: float,
|
|
):
|
|
super().__init__("data_saver_headless", catch_exceptions=True)
|
|
|
|
dev_args = f"numchan=1 hackrf={serial}"
|
|
|
|
self.source = osmosdr.source(args=dev_args)
|
|
self.source.set_time_unknown_pps(osmosdr.time_spec_t())
|
|
self.source.set_sample_rate(float(samp_rate))
|
|
self.source.set_center_freq(float(freq), 0)
|
|
self.source.set_freq_corr(0, 0)
|
|
self.source.set_dc_offset_mode(0, 0)
|
|
self.source.set_iq_balance_mode(0, 0)
|
|
self.source.set_gain_mode(False, 0)
|
|
self.source.set_gain(float(rf_gain), 0)
|
|
self.source.set_if_gain(float(if_gain), 0)
|
|
self.source.set_bb_gain(float(bb_gain), 0)
|
|
self.source.set_antenna("", 0)
|
|
self.source.set_bandwidth(0, 0)
|
|
|
|
self.sink = SimsiSink(
|
|
save_dir=save_dir,
|
|
file_tag=file_tag,
|
|
split_size=split_size,
|
|
delay=delay,
|
|
)
|
|
|
|
self.connect((self.source, 0), (self.sink, 0))
|
|
|
|
|
|
class IioCapture:
|
|
def __init__(self, args: argparse.Namespace):
|
|
self.args = args
|
|
self.writer = RotatingIqWriter(
|
|
save_dir=args.save_dir,
|
|
file_tag=args.file_tag,
|
|
split_size=args.split_size,
|
|
delay=args.delay,
|
|
)
|
|
self._static_signature = None
|
|
self._last_freq_hz = None
|
|
self._input_channels = [args.iio_i_channel]
|
|
if args.iio_q_channel not in self._input_channels:
|
|
self._input_channels.append(args.iio_q_channel)
|
|
|
|
def _run(self, cmd):
|
|
proc = subprocess.run(list(cmd), capture_output=True, text=True)
|
|
if proc.returncode != 0:
|
|
details = (proc.stderr or "").strip() or (proc.stdout or "").strip() or f"exit code {proc.returncode}"
|
|
raise RuntimeError(details)
|
|
return proc.stdout
|
|
|
|
def _run_binary(self, cmd, stop_event: threading.Event):
|
|
proc = subprocess.Popen(list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
try:
|
|
while True:
|
|
try:
|
|
stdout, stderr = proc.communicate(timeout=0.2)
|
|
break
|
|
except subprocess.TimeoutExpired:
|
|
if stop_event.is_set():
|
|
proc.terminate()
|
|
try:
|
|
proc.communicate(timeout=1.0)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
proc.communicate()
|
|
raise InterruptedError("stop requested")
|
|
if proc.returncode != 0:
|
|
details = stderr.decode("utf-8", errors="ignore").strip() or stdout.decode("utf-8", errors="ignore").strip()
|
|
raise RuntimeError(details or f"exit code {proc.returncode}")
|
|
return stdout
|
|
finally:
|
|
if proc.poll() is None:
|
|
proc.kill()
|
|
proc.communicate()
|
|
|
|
def _set_input_attr(self, channel: str, attr: str, value: str):
|
|
self._run(
|
|
[
|
|
"iio_attr",
|
|
"-u",
|
|
self.args.iio_uri,
|
|
"-q",
|
|
"-i",
|
|
"-c",
|
|
self.args.iio_phy_device,
|
|
channel,
|
|
attr,
|
|
value,
|
|
]
|
|
)
|
|
|
|
def _set_output_attr(self, channel: str, attr: str, value: str):
|
|
self._run(
|
|
[
|
|
"iio_attr",
|
|
"-u",
|
|
self.args.iio_uri,
|
|
"-q",
|
|
"-o",
|
|
"-c",
|
|
self.args.iio_phy_device,
|
|
channel,
|
|
attr,
|
|
value,
|
|
]
|
|
)
|
|
|
|
def ensure_configured(self):
|
|
sample_rate = int(round(max(float(self.args.samp_rate), IIO_MIN_SAMPLE_RATE)))
|
|
bandwidth = None
|
|
if self.args.bandwidth is not None:
|
|
bandwidth = int(round(max(float(self.args.bandwidth), 200_000.0)))
|
|
|
|
signature = (
|
|
float(sample_rate),
|
|
None if bandwidth is None else float(bandwidth),
|
|
self.args.iio_gain_mode,
|
|
self.args.iio_hardwaregain,
|
|
self.args.iio_port_select,
|
|
)
|
|
if signature == self._static_signature:
|
|
return
|
|
|
|
for channel in self._input_channels:
|
|
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
|
|
if bandwidth is not None:
|
|
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
|
|
if self.args.iio_port_select:
|
|
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
|
|
if self.args.iio_gain_mode:
|
|
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
|
|
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
|
|
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
|
|
|
|
self._static_signature = signature
|
|
|
|
def tune(self):
|
|
target = int(round(float(self.args.freq)))
|
|
if self._last_freq_hz == target:
|
|
return
|
|
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
|
|
self._last_freq_hz = target
|
|
|
|
def read_chunk(self, stop_event: threading.Event):
|
|
samples_per_read = self.args.samples_per_read
|
|
if samples_per_read is None:
|
|
samples_per_read = max(4096, min(int(self.args.split_size), 262_144))
|
|
|
|
raw = self._run_binary(
|
|
[
|
|
"iio_readdev",
|
|
"-u",
|
|
self.args.iio_uri,
|
|
"-T",
|
|
str(int(self.args.timeout_ms)),
|
|
"-b",
|
|
str(int(samples_per_read)),
|
|
"-s",
|
|
str(int(samples_per_read)),
|
|
self.args.iio_device,
|
|
self.args.iio_i_channel,
|
|
self.args.iio_q_channel,
|
|
],
|
|
stop_event,
|
|
)
|
|
|
|
values = np.frombuffer(raw, dtype="<i2")
|
|
if values.size == 0:
|
|
raise RuntimeError("no samples")
|
|
if values.size % 2 != 0:
|
|
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
|
|
|
|
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
|
|
complex_samples = (iq[:, 0] + 1j * iq[:, 1]).astype(np.complex64, copy=False)
|
|
complex_samples /= 32768.0
|
|
return complex_samples
|
|
|
|
def run(self, stop_event: threading.Event):
|
|
self.ensure_configured()
|
|
self.tune()
|
|
|
|
if self.args.settle > 0:
|
|
time.sleep(self.args.settle)
|
|
|
|
while not stop_event.is_set():
|
|
try:
|
|
samples = self.read_chunk(stop_event)
|
|
except InterruptedError:
|
|
break
|
|
self.writer.write_samples(samples)
|
|
|
|
def close(self):
|
|
self.writer.close()
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Headless IQ saver for HackRF or IIO SDR backends")
|
|
|
|
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
|
|
parser.add_argument("--serial", help="HackRF serial number")
|
|
parser.add_argument("--freq", type=float, required=True, help="Center frequency in Hz")
|
|
parser.add_argument("--save-dir", required=True, help="Directory for output IQ files")
|
|
parser.add_argument("--file-tag", default="fragment_", help="Prefix for output files")
|
|
|
|
parser.add_argument("--samp-rate", type=float, default=20e6, help="Sample rate in S/s")
|
|
parser.add_argument("--split-size", type=int, default=400000, help="Max complex samples per file")
|
|
parser.add_argument("--delay", type=float, default=0.1, help="Delay between files in seconds")
|
|
|
|
parser.add_argument("--rf-gain", type=float, default=12, help="HackRF RF gain")
|
|
parser.add_argument("--if-gain", type=float, default=30, help="HackRF IF gain")
|
|
parser.add_argument("--bb-gain", type=float, default=36, help="HackRF BB gain")
|
|
|
|
parser.add_argument("--bandwidth", type=float, default=None, help="Optional IIO RF bandwidth in Hz")
|
|
parser.add_argument("--iio-uri", default="ip:192.168.2.1", help="IIO URI")
|
|
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
|
|
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
|
|
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
|
|
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
|
|
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
|
|
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
|
|
parser.add_argument("--iio-gain-mode", default="slow_attack", help="IIO gain_control_mode value")
|
|
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
|
|
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
|
|
parser.add_argument("--settle", type=float, default=0.12, help="Delay after tuning for IIO backend")
|
|
parser.add_argument("--samples-per-read", type=int, default=None, help="IIO complex samples per read call")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
stop_event = threading.Event()
|
|
|
|
if args.backend == "hackrf" and not args.serial:
|
|
print("--serial is required for --backend hackrf", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
capture = None
|
|
tb = None
|
|
|
|
def handle_signal(sig, frame):
|
|
print(f"Received signal {sig}, stopping...", flush=True)
|
|
stop_event.set()
|
|
if tb is not None:
|
|
tb.stop()
|
|
tb.wait()
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
|
|
print("Starting capture...", flush=True)
|
|
print(f" backend: {args.backend}", flush=True)
|
|
print(f" freq: {args.freq}", flush=True)
|
|
print(f" save_dir: {args.save_dir}", flush=True)
|
|
print(f" file_tag: {args.file_tag}", flush=True)
|
|
print(f" samp_rate: {args.samp_rate}", flush=True)
|
|
print(f" split_size: {args.split_size}", flush=True)
|
|
|
|
try:
|
|
if args.backend == "hackrf":
|
|
print(f" serial: {args.serial}", flush=True)
|
|
print(f" gains: rf={args.rf_gain} if={args.if_gain} bb={args.bb_gain}", flush=True)
|
|
tb = HackRfDataSaver(
|
|
serial=args.serial,
|
|
freq=args.freq,
|
|
save_dir=args.save_dir,
|
|
file_tag=args.file_tag,
|
|
samp_rate=args.samp_rate,
|
|
split_size=args.split_size,
|
|
delay=args.delay,
|
|
rf_gain=args.rf_gain,
|
|
if_gain=args.if_gain,
|
|
bb_gain=args.bb_gain,
|
|
)
|
|
tb.start()
|
|
while not stop_event.is_set():
|
|
time.sleep(0.5)
|
|
else:
|
|
print(f" uri: {args.iio_uri}", flush=True)
|
|
print(
|
|
f" iio: device={args.iio_device} phy={args.iio_phy_device} "
|
|
f"port={args.iio_port_select} gain_mode={args.iio_gain_mode}",
|
|
flush=True,
|
|
)
|
|
capture = IioCapture(args)
|
|
capture.run(stop_event)
|
|
except KeyboardInterrupt:
|
|
handle_signal(signal.SIGINT, None)
|
|
finally:
|
|
if capture is not None:
|
|
capture.close()
|
|
if tb is not None and not stop_event.is_set():
|
|
tb.stop()
|
|
tb.wait()
|
|
|
|
print("Stopped.", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|