Compare commits

..

1 Commits

@ -228,21 +228,3 @@ curl -X POST 'http://127.0.0.1:5010/process_data' \
``` bash
.venv-sdr/bin/python read_energy.py
```
### Read_energy Neptune
``` bash
./.venv-sdr/bin/python read_energy.py --backend iio --uri ip:192.168.2.1 --only 2400 --refresh 0.5 --interval 0.2 --vec-len 1024
```
### Capture_hourly Neptune
``` bash
CAPTURE_ORDER=2400 \
CAPTURE_BACKEND_2400=iio \
CAPTURE_IIO_URI_2400=ip:192.168.2.1 \
RUN_ONCE=1 \
./capture_hourly.sh
```
Дополнительные параметры `Neptune` для `capture_hourly` читаются из `.env` или окружения в формате
`CAPTURE_IIO_<KEY>_2400`, например `CAPTURE_IIO_DEVICE_2400`, `CAPTURE_IIO_PHY_DEVICE_2400`,
`CAPTURE_IIO_GAIN_MODE_2400`, `CAPTURE_IIO_HARDWAREGAIN_2400`, `CAPTURE_IIO_BANDWIDTH_2400`.

@ -6,114 +6,91 @@ SCRIPT_OWNER="${CAPTURE_USER:-$(stat -c %U "$SCRIPT_DIR")}"
SCRIPT_OWNER_HOME="$(getent passwd "$SCRIPT_OWNER" | cut -d: -f6)"
cd "$SCRIPT_DIR"
ENV_FILE="$SCRIPT_DIR/.env"
declare -A DOTENV_VALUES
trim_whitespace() {
local value="$1"
value="${value#"${value%%[![:space:]]*}"}"
value="${value%"${value##*[![:space:]]}"}"
printf '%s' "$value"
}
source "$SCRIPT_DIR/.env"
############################
# НАСТРОЙКИ
############################
load_env_file() {
local line key value quote_char
BASE_DIR="${CAPTURE_BASE_DIR:-${SCRIPT_OWNER_HOME}/dataset/noise}"
if [[ ! -f "$ENV_FILE" ]]; then
echo "Не найден .env: $ENV_FILE" >&2
exit 1
fi
# Путь к python из venv
PYTHON_BIN="${PYTHON_BIN:-$SCRIPT_DIR/.venv-sdr/bin/python}"
while IFS= read -r line || [[ -n "$line" ]]; do
line="${line%$'\r'}"
line="$(trim_whitespace "$line")"
# Путь к headless скрипту
SCRIPT_PATH="${SCRIPT_PATH:-$SCRIPT_DIR/scripts_nn/data_saver_headless.py}"
[[ -z "$line" || "$line" == \#* ]] && continue
RUN_ONCE="${RUN_ONCE:-0}"
CAPTURE_LOG_FILE="${CAPTURE_LOG_FILE:-$BASE_DIR/capture_hourly.log}"
if [[ "$line" == export\ * ]]; then
line="${line#export }"
fi
SYSTEMCTL_BIN=(systemctl)
CURRENT_CAPTURE_PID=""
CURRENT_SERVICE_UNIT=""
[[ "$line" == *=* ]] || continue
key="$(trim_whitespace "${line%%=*}")"
value="$(trim_whitespace "${line#*=}")"
if [[ ${#value} -ge 2 ]]; then
quote_char="${value:0:1}"
if [[ ( "$quote_char" == "'" || "$quote_char" == '"' ) && "${value: -1}" == "$quote_char" ]]; then
value="${value:1:${#value}-2}"
fi
fi
# Лимиты
DOTENV_VALUES["$key"]="$value"
done < "$ENV_FILE"
}
lim_all=10
PER_FREQ_LIMIT_BYTES=$((1 * 1024 * 1024 * 1024)) # GiB на частоту за запуск
TOTAL_LIMIT_BYTES=$((lim_all * 1024 * 1024 * 1024)) # общий лимит GiB
CYCLE_SECONDS=1 # один цикл в час
# Параметры SDR
SAMP_RATE="20e6"
SPLIT_SIZE="400000"
DELAY="0.01"
RF_GAIN="12"
IF_GAIN="30"
BB_GAIN="36"
get_env_setting() {
local key="$1"
local default="${2-}"
############################
# ЧАСТОТЫ И SERIAL ИЗ ENV
############################
if [[ -n "${!key+x}" ]]; then
printf '%s' "${!key}"
elif [[ -n "${DOTENV_VALUES[$key]+x}" ]]; then
printf '%s' "${DOTENV_VALUES[$key]}"
else
printf '%s' "$default"
fi
}
ORDER=(433)
get_band_setting() {
local prefix="$1"
local band="$2"
local default="${3-}"
get_env_setting "${prefix}_${band}" "$(get_env_setting "$prefix" "$default")"
}
declare -A SERIAL
declare -A FREQ_HZ
declare -A SERVICE_UNIT
load_env_file
SERIAL[433]="$hack_433"
FREQ_HZ[433]="433000000"
############################
# НАСТРОЙКИ
############################
SERIAL[750]="$hack_750"
FREQ_HZ[750]="750000000"
BASE_DIR="$(get_env_setting CAPTURE_BASE_DIR "${SCRIPT_OWNER_HOME}/dataset/noise")"
PYTHON_BIN="$(get_env_setting PYTHON_BIN "$SCRIPT_DIR/.venv-sdr/bin/python")"
SCRIPT_PATH="$(get_env_setting SCRIPT_PATH "$SCRIPT_DIR/scripts_nn/data_saver_headless.py")"
RUN_ONCE="$(get_env_setting RUN_ONCE "0")"
CAPTURE_LOG_FILE="$(get_env_setting CAPTURE_LOG_FILE "$BASE_DIR/capture_hourly.log")"
SERIAL[915]="$hack_915"
FREQ_HZ[915]="915000000"
SYSTEMCTL_BIN=(systemctl)
CURRENT_CAPTURE_PID=""
CURRENT_SERVICE_UNITS=()
SERIAL[1200]="$hack_1200"
FREQ_HZ[1200]="1200000000"
lim_all=10
PER_FREQ_LIMIT_BYTES=$((1 * 1024 * 1024 * 1024))
TOTAL_LIMIT_BYTES=$((lim_all * 1024 * 1024 * 1024))
CYCLE_SECONDS="$(get_env_setting CAPTURE_CYCLE_SECONDS "1")"
SERIAL[2400]="$hack_2400"
FREQ_HZ[2400]="2400000000"
DEFAULT_SAMP_RATE="$(get_env_setting CAPTURE_SAMP_RATE "20e6")"
DEFAULT_SPLIT_SIZE="$(get_env_setting CAPTURE_SPLIT_SIZE "400000")"
DEFAULT_DELAY="$(get_env_setting CAPTURE_DELAY "0.01")"
DEFAULT_RF_GAIN="$(get_env_setting CAPTURE_RF_GAIN "12")"
DEFAULT_IF_GAIN="$(get_env_setting CAPTURE_IF_GAIN "30")"
DEFAULT_BB_GAIN="$(get_env_setting CAPTURE_BB_GAIN "36")"
SERIAL[3300]="$hack_3300"
FREQ_HZ[3300]="3300000000"
ORDER=(433)
SUPPORTED_BANDS=(433 750 915 1200 1500 2400 3300 4500 5200 5800)
SERIAL[4500]="$hack_4500"
FREQ_HZ[4500]="4500000000"
declare -A SERIAL
declare -A FREQ_HZ
declare -A SERVICE_UNIT
declare -A BACKEND
SERIAL[5200]="$hack_5200"
FREQ_HZ[5200]="5200000000"
for band in "${SUPPORTED_BANDS[@]}"; do
SERIAL["$band"]="$(get_env_setting "hack_${band}" "$(get_env_setting "HACKID_${band}")")"
FREQ_HZ["$band"]="$(get_env_setting "c_freq_${band}" "$band")e6"
SERVICE_UNIT["$band"]="dronedetector-sdr-${band}.service"
BACKEND["$band"]="$(tr '[:upper:]' '[:lower:]' <<<"$(get_band_setting CAPTURE_BACKEND "$band" "hackrf")")"
done
SERIAL[5800]="$hack_5800"
FREQ_HZ[5800]="5800000000"
SERVICE_UNIT[433]="dronedetector-sdr-433.service"
SERVICE_UNIT[750]="dronedetector-sdr-750.service"
SERVICE_UNIT[915]="dronedetector-sdr-915.service"
SERVICE_UNIT[1200]="dronedetector-sdr-1200.service"
SERVICE_UNIT[2400]="dronedetector-sdr-2400.service"
SERVICE_UNIT[3300]="dronedetector-sdr-3300.service"
SERVICE_UNIT[4500]="dronedetector-sdr-4500.service"
SERVICE_UNIT[5200]="dronedetector-sdr-5200.service"
SERVICE_UNIT[5800]="dronedetector-sdr-5800.service"
############################
# ВСПОМОГАТЕЛЬНОЕ
@ -132,117 +109,36 @@ service_exists() {
systemctl_run show "$unit" >/dev/null 2>&1
}
service_is_active() {
local unit="$1"
systemctl_run is-active --quiet "$unit"
}
remember_service_unit() {
local unit="$1"
local existing
for existing in "${CURRENT_SERVICE_UNITS[@]}"; do
if [[ "$existing" == "$unit" ]]; then
return 0
fi
done
CURRENT_SERVICE_UNITS+=("$unit")
}
stop_iio_band_service() {
stop_band_service() {
local band="$1"
local unit="${SERVICE_UNIT[$band]:-}"
CURRENT_SERVICE_UNITS=()
if [[ -z "$unit" ]]; then
log "Для band=$band не найден service unit"
return 0
fi
if ! service_exists "$unit"; then
log "Service unit $unit не установлен, пропускаю"
log "Service unit $unit не установлен, пропускаю stop/start"
return 0
fi
if service_is_active "$unit"; then
remember_service_unit "$unit"
fi
log "Останавливаю service $unit перед записью band=$band backend=iio"
systemctl_run stop "$unit"
}
stop_hackrf_band_services() {
local band="$1"
local serial="${SERIAL[$band]:-}"
local other_band unit other_backend other_serial
if [[ -z "$serial" ]]; then
log "Для band=$band пустой serial, пропускаю stop/start сервисов"
return 0
fi
CURRENT_SERVICE_UNITS=()
for other_band in "${!SERVICE_UNIT[@]}"; do
unit="${SERVICE_UNIT[$other_band]:-}"
other_backend="${BACKEND[$other_band]:-hackrf}"
other_serial="${SERIAL[$other_band]:-}"
if [[ "$other_backend" != "hackrf" || -z "$unit" || -z "$other_serial" || "$other_serial" != "$serial" ]]; then
continue
fi
if ! service_exists "$unit"; then
log "Service unit $unit не установлен, пропускаю"
continue
fi
if service_is_active "$unit"; then
remember_service_unit "$unit"
fi
log "Останавливаю service $unit перед записью band=$band"
systemctl_run stop "$unit"
done
}
stop_band_service() {
local band="$1"
local backend="${BACKEND[$band]:-hackrf}"
case "$backend" in
hackrf)
stop_hackrf_band_services "$band"
;;
iio)
stop_iio_band_service "$band"
;;
*)
log "Неизвестный backend=$backend для band=$band"
return 1
;;
esac
CURRENT_SERVICE_UNIT="$unit"
}
start_current_service() {
local unit
if [[ "${#CURRENT_SERVICE_UNITS[@]}" -eq 0 ]]; then
if [[ -z "$CURRENT_SERVICE_UNIT" ]]; then
return 0
fi
for unit in "${CURRENT_SERVICE_UNITS[@]}"; do
log "Запускаю service $unit после записи"
systemctl_run start "$unit"
done
CURRENT_SERVICE_UNITS=()
log "Запускаю service $CURRENT_SERVICE_UNIT после записи"
systemctl_run start "$CURRENT_SERVICE_UNIT"
CURRENT_SERVICE_UNIT=""
}
cleanup_capture() {
local unit
if [[ -n "$CURRENT_CAPTURE_PID" ]] && kill -0 "$CURRENT_CAPTURE_PID" 2>/dev/null; then
log "Останавливаю текущий PID=$CURRENT_CAPTURE_PID при завершении скрипта"
kill -TERM "$CURRENT_CAPTURE_PID" 2>/dev/null || true
@ -250,12 +146,10 @@ cleanup_capture() {
fi
CURRENT_CAPTURE_PID=""
if [[ "${#CURRENT_SERVICE_UNITS[@]}" -gt 0 ]]; then
for unit in "${CURRENT_SERVICE_UNITS[@]}"; do
log "Восстанавливаю service $unit при завершении скрипта"
systemctl_run start "$unit" || true
done
CURRENT_SERVICE_UNITS=()
if [[ -n "$CURRENT_SERVICE_UNIT" ]]; then
log "Восстанавливаю service $CURRENT_SERVICE_UNIT при завершении скрипта"
systemctl_run start "$CURRENT_SERVICE_UNIT" || true
CURRENT_SERVICE_UNIT=""
fi
}
@ -279,8 +173,6 @@ total_size_bytes() {
}
ensure_requirements() {
local capture_order band backend
if [[ ! -x "$PYTHON_BIN" ]]; then
echo "Не найден python: $PYTHON_BIN" >&2
exit 1
@ -308,10 +200,10 @@ ensure_requirements() {
mkdir -p "$BASE_DIR"
mkdir -p "$(dirname "$CAPTURE_LOG_FILE")"
capture_order="$(get_env_setting CAPTURE_ORDER)"
if [[ -n "$capture_order" ]]; then
if [[ -n "${CAPTURE_ORDER:-}" ]]; then
ORDER=()
for band in ${capture_order//,/ }; do
local band
for band in ${CAPTURE_ORDER//,/ }; do
ORDER+=("$band")
done
fi
@ -321,132 +213,29 @@ ensure_requirements() {
exit 1
fi
for band in "${ORDER[@]}"; do
backend="${BACKEND[$band]:-}"
if [[ -z "$backend" ]]; then
echo "Band $band не поддерживается capture_hourly." >&2
exit 1
fi
case "$backend" in
hackrf|iio)
;;
*)
echo "Неизвестный backend=$backend для band=$band" >&2
exit 1
;;
esac
if [[ "$backend" == "iio" ]]; then
if ! command -v iio_attr >/dev/null 2>&1; then
echo "Не найден iio_attr для band=$band" >&2
exit 1
fi
if ! command -v iio_readdev >/dev/null 2>&1; then
echo "Не найден iio_readdev для band=$band" >&2
exit 1
fi
fi
done
log "Рабочая директория: $SCRIPT_DIR"
log "BASE_DIR: $BASE_DIR"
log "ORDER: ${ORDER[*]}"
log "RUN_ONCE: $RUN_ONCE"
}
run_one_freq() {
local band="$1"
local backend="${BACKEND[$band]}"
local serial="${SERIAL[$band]}"
local freq="${FREQ_HZ[$band]}"
local ts out_dir log_file pid
local samp_rate split_size delay
local rf_gain if_gain bb_gain
local iio_uri iio_device iio_phy_device iio_i_channel iio_q_channel
local iio_lo_channel iio_port_select iio_gain_mode iio_hardwaregain
local iio_timeout_ms iio_settle iio_bandwidth iio_samples_per_read
local backend_desc
local -a cmd
if [[ "$backend" == "hackrf" && -z "$serial" ]]; then
log "Для band=$band backend=hackrf, но serial пустой, пропускаю"
if [[ -z "$serial" ]]; then
log "Для band=$band пустой serial, пропускаю"
return 0
fi
local ts out_dir log_file
ts="$(date '+%F_%H-%M-%S')"
out_dir="$BASE_DIR/$band/$ts"
log_file="$out_dir/run.log"
samp_rate="$(get_band_setting CAPTURE_SAMP_RATE "$band" "$DEFAULT_SAMP_RATE")"
split_size="$(get_band_setting CAPTURE_SPLIT_SIZE "$band" "$DEFAULT_SPLIT_SIZE")"
delay="$(get_band_setting CAPTURE_DELAY "$band" "$DEFAULT_DELAY")"
cmd=(
"$PYTHON_BIN"
"$SCRIPT_PATH"
"--backend" "$backend"
"--freq" "$freq"
"--save-dir" "$out_dir"
"--file-tag" "${band}_"
"--samp-rate" "$samp_rate"
"--split-size" "$split_size"
"--delay" "$delay"
)
if [[ "$backend" == "hackrf" ]]; then
rf_gain="$(get_band_setting CAPTURE_RF_GAIN "$band" "$DEFAULT_RF_GAIN")"
if_gain="$(get_band_setting CAPTURE_IF_GAIN "$band" "$DEFAULT_IF_GAIN")"
bb_gain="$(get_band_setting CAPTURE_BB_GAIN "$band" "$DEFAULT_BB_GAIN")"
cmd+=(
"--serial" "$serial"
"--rf-gain" "$rf_gain"
"--if-gain" "$if_gain"
"--bb-gain" "$bb_gain"
)
backend_desc="serial=$serial"
else
iio_uri="$(get_band_setting CAPTURE_IIO_URI "$band" "ip:192.168.2.1")"
iio_device="$(get_band_setting CAPTURE_IIO_DEVICE "$band" "cf-ad9361-lpc")"
iio_phy_device="$(get_band_setting CAPTURE_IIO_PHY_DEVICE "$band" "ad9361-phy")"
iio_i_channel="$(get_band_setting CAPTURE_IIO_I_CHANNEL "$band" "voltage0")"
iio_q_channel="$(get_band_setting CAPTURE_IIO_Q_CHANNEL "$band" "voltage1")"
iio_lo_channel="$(get_band_setting CAPTURE_IIO_LO_CHANNEL "$band" "altvoltage0")"
iio_port_select="$(get_band_setting CAPTURE_IIO_PORT_SELECT "$band" "A_BALANCED")"
iio_gain_mode="$(get_band_setting CAPTURE_IIO_GAIN_MODE "$band" "slow_attack")"
iio_hardwaregain="$(get_band_setting CAPTURE_IIO_HARDWAREGAIN "$band")"
iio_timeout_ms="$(get_band_setting CAPTURE_IIO_TIMEOUT_MS "$band" "4000")"
iio_settle="$(get_band_setting CAPTURE_SETTLE "$band" "0.12")"
iio_bandwidth="$(get_band_setting CAPTURE_IIO_BANDWIDTH "$band")"
iio_samples_per_read="$(get_band_setting CAPTURE_IIO_SAMPLES_PER_READ "$band")"
cmd+=(
"--iio-uri" "$iio_uri"
"--iio-device" "$iio_device"
"--iio-phy-device" "$iio_phy_device"
"--iio-i-channel" "$iio_i_channel"
"--iio-q-channel" "$iio_q_channel"
"--iio-lo-channel" "$iio_lo_channel"
"--iio-port-select" "$iio_port_select"
"--iio-gain-mode" "$iio_gain_mode"
"--timeout-ms" "$iio_timeout_ms"
"--settle" "$iio_settle"
)
if [[ -n "$iio_hardwaregain" ]]; then
cmd+=("--iio-hardwaregain" "$iio_hardwaregain")
fi
if [[ -n "$iio_bandwidth" ]]; then
cmd+=("--bandwidth" "$iio_bandwidth")
fi
if [[ -n "$iio_samples_per_read" ]]; then
cmd+=("--samples-per-read" "$iio_samples_per_read")
fi
backend_desc="uri=$iio_uri"
fi
log "Старт band=$band backend=$backend $backend_desc freq=$freq dir=$out_dir"
log "Старт band=$band serial=$serial freq=$freq dir=$out_dir"
if ! mkdir -p "$out_dir"; then
log "Не удалось создать каталог $out_dir"
return 1
@ -454,8 +243,20 @@ run_one_freq() {
stop_band_service "$band"
"${cmd[@]}" >"$log_file" 2>&1 &
pid=$!
"$PYTHON_BIN" "$SCRIPT_PATH" \
--serial "$serial" \
--freq "$freq" \
--save-dir "$out_dir" \
--file-tag "${band}_" \
--samp-rate "$SAMP_RATE" \
--split-size "$SPLIT_SIZE" \
--delay "$DELAY" \
--rf-gain "$RF_GAIN" \
--if-gain "$IF_GAIN" \
--bb-gain "$BB_GAIN" \
>"$log_file" 2>&1 &
local pid=$!
CURRENT_CAPTURE_PID="$pid"
log "PID=$pid"
@ -465,7 +266,7 @@ run_one_freq() {
cur_total_size="$(total_size_bytes)"
if (( cur_total_size >= TOTAL_LIMIT_BYTES )); then
log "Достигнут общий лимит ${lim_all} GiB. Останавливаю PID=$pid"
log "Достигнут общий лимит $lim_all GiB. Останавливаю PID=$pid"
kill -TERM "$pid" 2>/dev/null || true
wait "$pid" 2>/dev/null || true
CURRENT_CAPTURE_PID=""
@ -474,7 +275,7 @@ run_one_freq() {
fi
if (( cur_dir_size >= PER_FREQ_LIMIT_BYTES )); then
log "Для band=$band достигнут лимит 1 GiB. Останавливаю PID=$pid"
log "Для band=$band достигнут лимит 3 GiB. Останавливаю PID=$pid"
kill -TERM "$pid" 2>/dev/null || true
for _ in {1..10}; do
@ -498,18 +299,17 @@ run_one_freq() {
CURRENT_CAPTURE_PID=""
start_current_service
log "Завершен band=$band backend=$backend, размер=$(du -sh "$out_dir" | awk '{print $1}')"
log "Завершен band=$band, размер=$(du -sh "$out_dir" | awk '{print $1}')"
return 0
}
main_loop() {
local total_before cycle_start elapsed sleep_left rc
while true; do
local total_before cycle_start elapsed sleep_left
total_before="$(total_size_bytes)"
if (( total_before >= TOTAL_LIMIT_BYTES )); then
log "Общий размер уже >= ${lim_all} GiB, выхожу"
log "Общий размер уже >= GiB, выхожу"
break
fi
@ -527,9 +327,10 @@ main_loop() {
if [[ $rc -eq 2 ]]; then
log "Остановка по общему лимиту"
return 0
fi
else
log "Ошибка записи band=$band rc=$rc"
return "$rc"
fi
}
done
@ -542,7 +343,7 @@ main_loop() {
sleep_left=$(( CYCLE_SECONDS - elapsed ))
if (( sleep_left > 0 )); then
log "Цикл занял ${elapsed} сек, жду ${sleep_left} сек до следующего запуска"
log "Цикл занял ${elapsed} сек, жду ${sleep_left} сек до следующего часа"
sleep "$sleep_left"
else
log "Цикл занял ${elapsed} сек, паузы нет"

@ -25,11 +25,6 @@ SDR_UNITS=(
)
CONFIGURED_SDR_UNITS=()
IIO_TOOL_BINS=(
/usr/bin/iio_attr
/usr/bin/iio_readdev
/usr/bin/iio_info
)
log() {
printf '[install_all] %s\n' "$*"
@ -182,26 +177,10 @@ install_host_non_python_deps() {
libusb-1.0-0 \
libusb-1.0-0-dev \
hackrf \
libiio-utils \
gnuradio \
gr-osmosdr
}
link_host_sdr_tools_into_venv() {
local venv_path="$1"
local tool_path tool_name
for tool_path in "${IIO_TOOL_BINS[@]}"; do
if [[ ! -x "$tool_path" ]]; then
log "Skipping missing SDR host tool: ${tool_path}"
continue
fi
tool_name="$(basename "$tool_path")"
ln -sf "$tool_path" "$venv_path/bin/$tool_name"
done
}
setup_sdr_python_env() {
log "Setting up SDR python environment"
local venv_path="${PROJECT_ROOT}/.venv-sdr"
@ -212,7 +191,6 @@ setup_sdr_python_env() {
"$venv_path/bin/pip" install --upgrade pip
"$venv_path/bin/pip" install -r "${PROJECT_ROOT}/deploy/requirements/sdr_host.txt"
link_host_sdr_tools_into_venv "$venv_path"
chown -R "${RUN_USER}:${RUN_GROUP}" "$venv_path"
}

@ -28,7 +28,7 @@ run_as_root() {
preflight() {
[[ -f "${PROJECT_ROOT}/deploy/requirements/nn_gpu_pinned.txt" ]] || die "Missing deploy/requirements/nn_gpu_pinned.txt"
[[ -f "${PROJECT_ROOT}/train_scripts/requirements-train.txt" ]] || die "Missing train_scripts/requirements-train.txt"
[[ -f "${PROJECT_ROOT}/requirements-train.txt" ]] || die "Missing train_scripts/requirements-train.txt"
[[ -f "${PROJECT_ROOT}/torchsig/pyproject.toml" ]] || die "Missing local torchsig package"
command -v "${PYTHON_BIN}" >/dev/null 2>&1 || die "${PYTHON_BIN} not found"
}

@ -9,7 +9,7 @@ import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
from typing import Dict, List, Optional, Tuple
try:
import numpy as np
@ -26,13 +26,12 @@ except Exception as exc:
sys.exit(1)
EPS = 1e-20
IIO_MIN_SAMPLE_RATE = 2083333
@dataclass
class Target:
label: str
device: str
serial: str
freq_hz: float
source: str
@ -40,8 +39,8 @@ class Target:
@dataclass
class Row:
label: str
device: str
index: Optional[str] = None
serial: str
index: Optional[int] = None
freq_hz: float = 0.0
status: str = "INIT"
rms: Optional[float] = None
@ -52,28 +51,13 @@ class Row:
error: str = ""
def label_sort_key(label: str) -> Tuple[int, float | str]:
try:
return (0, float(label))
except ValueError:
return (1, label)
class HackRfProbeTop(gr.top_block):
def __init__(
self,
serial: str,
freq_hz: float,
sample_rate: float,
vec_len: int,
gain: float,
if_gain: float,
bb_gain: float,
):
class ProbeTop(gr.top_block):
def __init__(self, index: int, freq_hz: float, sample_rate: float, vec_len: int,
gain: float, if_gain: float, bb_gain: float):
super().__init__("hackrf_energy_probe")
self.probe = blocks.probe_signal_vc(vec_len)
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}")
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
self.src.set_sample_rate(sample_rate)
self.src.set_center_freq(freq_hz, 0)
@ -84,14 +68,14 @@ class HackRfProbeTop(gr.top_block):
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
try:
getattr(self.src, fn)(val, 0)
except Exception as exc:
raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc
except Exception:
raise Exception("не ставится усиление")
try:
self.src.set_bandwidth(0, 0)
except Exception:
pass
try:
self.src.set_antenna("", 0)
self.src.set_antenna('', 0)
except Exception:
pass
self.connect((self.src, 0), (self.stream_to_vec, 0))
@ -101,22 +85,15 @@ class HackRfProbeTop(gr.top_block):
arr = np.asarray(self.probe.level(), dtype=np.complex64)
if arr.size == 0:
raise RuntimeError("no samples")
power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
rms = math.sqrt(max(power_lin, 0.0))
dbfs = 10.0 * math.log10(max(power_lin, EPS))
return rms, power_lin, dbfs, int(arr.size)
class HackRfWorker(threading.Thread):
def __init__(
self,
target: Target,
serial_to_index: Dict[str, int],
rows: Dict[str, Row],
lock: threading.Lock,
stop_event: threading.Event,
args: argparse.Namespace,
):
p = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
rms = math.sqrt(max(p, 0.0))
dbfs = 10.0 * math.log10(max(p, EPS))
return rms, p, dbfs, int(arr.size)
class Worker(threading.Thread):
def __init__(self, target: Target, serial_to_index: Dict[str, int], rows: Dict[str, Row],
lock: threading.Lock, stop_event: threading.Event, args: argparse.Namespace):
super().__init__(daemon=True)
self.target = target
self.serial_to_index = serial_to_index
@ -124,46 +101,39 @@ class HackRfWorker(threading.Thread):
self.lock = lock
self.stop_event = stop_event
self.args = args
self.tb: Optional[HackRfProbeTop] = None
self.tb: Optional[ProbeTop] = None
def _set_row(self, **kwargs):
with self.lock:
row = self.rows[self.target.label]
for key, value in kwargs.items():
setattr(row, key, value)
for k, v in kwargs.items():
setattr(row, k, v)
row.updated_at = time.time()
def _open(self) -> bool:
if self.target.device not in self.serial_to_index:
idx = self.serial_to_index.get(self.target.serial)
if idx is None:
self._set_row(status="NOT_FOUND", error="serial not in hackrf_info", index=None)
return False
idx = self.serial_to_index[self.target.device]
self._set_row(index=str(idx), status="OPENING", error="", freq_hz=self.target.freq_hz)
self._set_row(index=idx, status="OPENING", error="", freq_hz=self.target.freq_hz)
try:
self.tb = HackRfProbeTop(
self.target.device,
self.target.freq_hz,
self.args.sample_rate,
self.args.vec_len,
self.args.gain,
self.args.if_gain,
self.args.bb_gain,
self.tb = ProbeTop(
idx, self.target.freq_hz, self.args.sample_rate, self.args.vec_len,
self.args.gain, self.args.if_gain, self.args.bb_gain
)
self.tb.start()
time.sleep(0.15)
self._set_row(status="OK", error="")
return True
except Exception as exc:
message = str(exc)
status = "BUSY" if ("Resource busy" in message or "-1000" in message) else "ERR"
self._set_row(status=status, error=message)
msg = str(exc)
status = "BUSY" if ("Resource busy" in msg or "-1000" in msg) else "ERR"
self._set_row(status=status, error=msg)
self.tb = None
return False
def _close(self):
if self.tb is None:
return
if self.tb is not None:
try:
self.tb.stop()
self.tb.wait()
@ -179,15 +149,8 @@ class HackRfWorker(threading.Thread):
break
continue
try:
rms, power_lin, dbfs, samples = self.tb.read_metrics()
self._set_row(
status="OK",
rms=rms,
power_lin=power_lin,
dbfs=dbfs,
samples=samples,
error="",
)
rms, p, dbfs, n = self.tb.read_metrics()
self._set_row(status="OK", rms=rms, power_lin=p, dbfs=dbfs, samples=n, error="")
except Exception as exc:
self._set_row(status="ERR", error=str(exc))
self._close()
@ -199,190 +162,6 @@ class HackRfWorker(threading.Thread):
self._close()
class IioProbe:
def __init__(self, args: argparse.Namespace):
self.args = args
self._static_signature: Optional[Tuple[float, float, str, Optional[float], str]] = None
self._last_freq_hz: Optional[int] = None
self._input_channels = [args.iio_i_channel]
if args.iio_q_channel not in self._input_channels:
self._input_channels.append(args.iio_q_channel)
def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes:
proc = subprocess.run(
list(cmd),
capture_output=True,
text=not binary,
)
if proc.returncode != 0:
stderr = proc.stderr if binary else (proc.stderr or "")
stdout = "" if binary else (proc.stdout or "")
details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}"
raise RuntimeError(details)
return proc.stdout
def _set_input_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def _set_output_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-o",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def ensure_configured(self):
sample_rate = None
if self.args.sample_rate is not None:
sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE)))
bandwidth = None
if self.args.bandwidth is not None:
bandwidth = int(round(max(float(self.args.bandwidth), 200000.0)))
signature = (
None if sample_rate is None else float(sample_rate),
None if bandwidth is None else float(bandwidth),
self.args.iio_gain_mode,
self.args.iio_hardwaregain,
self.args.iio_port_select,
)
if signature == self._static_signature:
return
for channel in self._input_channels:
if sample_rate is not None:
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
if bandwidth is not None:
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
if self.args.iio_port_select:
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
if self.args.iio_gain_mode:
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
self._static_signature = signature
def tune(self, freq_hz: float):
target = int(round(freq_hz))
if self._last_freq_hz == target:
return
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
self._last_freq_hz = target
if self.args.settle > 0:
time.sleep(self.args.settle)
def read_metrics(self) -> Tuple[float, float, float, int]:
cmd = [
"iio_readdev",
"-u",
self.args.uri,
"-T",
str(int(self.args.timeout_ms)),
"-b",
str(max(4, int(self.args.vec_len))),
"-s",
str(max(4, int(self.args.vec_len))),
self.args.iio_device,
self.args.iio_i_channel,
self.args.iio_q_channel,
]
raw = self._run(cmd, binary=True)
values = np.frombuffer(raw, dtype="<i2")
if values.size == 0:
raise RuntimeError("no samples")
if values.size % 2 != 0:
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
i = iq[:, 0]
q = iq[:, 1]
power_lin = float(np.mean(i * i + q * q))
rms = math.sqrt(max(power_lin, 0.0))
dbfs = 10.0 * math.log10(max(power_lin, EPS))
return rms, power_lin, dbfs, int(iq.shape[0])
class IioWorker(threading.Thread):
def __init__(
self,
targets: List[Target],
rows: Dict[str, Row],
lock: threading.Lock,
stop_event: threading.Event,
args: argparse.Namespace,
):
super().__init__(daemon=True)
self.targets = targets
self.rows = rows
self.lock = lock
self.stop_event = stop_event
self.args = args
self.probe = IioProbe(args)
def _set_row(self, label: str, **kwargs):
with self.lock:
row = self.rows[label]
for key, value in kwargs.items():
setattr(row, key, value)
row.updated_at = time.time()
def run(self):
while not self.stop_event.is_set():
for target in self.targets:
if self.stop_event.is_set():
break
self._set_row(
target.label,
index="iio",
status="OPENING",
error="",
freq_hz=target.freq_hz,
)
try:
self.probe.ensure_configured()
self.probe.tune(target.freq_hz)
rms, power_lin, dbfs, samples = self.probe.read_metrics()
self._set_row(
target.label,
status="OK",
rms=rms,
power_lin=power_lin,
dbfs=dbfs,
samples=samples,
error="",
)
except Exception as exc:
self._set_row(target.label, status="ERR", error=str(exc))
if self.stop_event.wait(self.args.reopen_delay):
return
continue
if self.stop_event.wait(self.args.interval):
return
def parse_env(path: Path) -> Dict[str, str]:
out: Dict[str, str] = {}
if not path.exists():
@ -391,198 +170,122 @@ def parse_env(path: Path) -> Dict[str, str]:
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
out[key.strip()] = value.strip().strip('"').strip("'")
k, v = line.split("=", 1)
out[k.strip()] = v.strip().strip('"').strip("'")
return out
def collect_hackrf_targets(
env: Dict[str, str],
only: Optional[set],
override_freq_mhz: Optional[float],
) -> List[Target]:
def collect_targets(env: Dict[str, str], only: Optional[set], override_freq_mhz: Optional[float]) -> List[Target]:
targets: List[Target] = []
for key, value in env.items():
match = re.fullmatch(r"hack_(\d+)", key)
if match:
label = match.group(1)
for k, v in env.items():
m = re.fullmatch(r"hack_(\d+)", k)
if m:
label = m.group(1)
else:
match = re.fullmatch(r"HACKID_(\d+)", key)
if not match:
m = re.fullmatch(r"HACKID_(\d+)", k)
if not m:
continue
label = match.group(1)
label = m.group(1)
if only and label not in only:
continue
mhz = override_freq_mhz if override_freq_mhz is not None else float(label)
targets.append(Target(label=label, device=value.lower(), freq_hz=mhz * 1e6, source=key))
unique: Dict[str, Target] = {}
for target in sorted(targets, key=lambda item: (label_sort_key(item.label), 0 if item.source.startswith("hack_") else 1)):
unique.setdefault(target.label, target)
return [unique[key] for key in sorted(unique, key=label_sort_key)]
def collect_iio_targets(
env: Dict[str, str],
only: Optional[set],
override_freq_mhz: Optional[float],
uri: str,
) -> List[Target]:
if only:
labels = set(only)
else:
labels = set()
for key in env:
for pattern in (r"c_freq_(\d+)", r"hack_(\d+)", r"HACKID_(\d+)"):
match = re.fullmatch(pattern, key)
if match:
labels.add(match.group(1))
break
targets.append(Target(label=label, serial=v.lower(), freq_hz=mhz * 1e6, source=k))
if not labels and override_freq_mhz is not None:
labels.add(str(int(override_freq_mhz) if float(override_freq_mhz).is_integer() else override_freq_mhz))
targets: List[Target] = []
for label in sorted(labels, key=label_sort_key):
raw_freq = env.get(f"c_freq_{label}", label)
try:
mhz = override_freq_mhz if override_freq_mhz is not None else float(raw_freq)
except ValueError as exc:
raise ValueError(f"cannot resolve frequency for target {label!r}") from exc
targets.append(Target(label=label, device=uri, freq_hz=mhz * 1e6, source="iio"))
return targets
uniq: Dict[str, Target] = {}
for t in sorted(targets, key=lambda x: (int(x.label), 0 if x.source.startswith("hack_") else 1)):
uniq.setdefault(t.label, t)
return [uniq[k] for k in sorted(uniq, key=lambda x: int(x))]
def parse_hackrf_info() -> Dict[str, int]:
try:
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
except FileNotFoundError as exc:
raise RuntimeError("hackrf_info not found") from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError("hackrf_info timeout") from exc
except FileNotFoundError:
raise RuntimeError("hackrf_info not found")
except subprocess.TimeoutExpired:
raise RuntimeError("hackrf_info timeout")
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
out: Dict[str, int] = {}
cur_idx: Optional[int] = None
for line in text.splitlines():
match = re.search(r"^Index:\s*(\d+)", line)
if match:
cur_idx = int(match.group(1))
m = re.search(r"^Index:\s*(\d+)", line)
if m:
cur_idx = int(m.group(1))
continue
match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if match and cur_idx is not None:
out[match.group(1).lower()] = cur_idx
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if m and cur_idx is not None:
out[m.group(1).lower()] = cur_idx
if not out:
raise RuntimeError("no devices parsed from hackrf_info")
return out
def fmt(value: Optional[float], spec: str) -> str:
return "-" if value is None else format(value, spec)
def fmt(v: Optional[float], spec: str) -> str:
return "-" if v is None else format(v, spec)
def render(rows: Dict[str, Row], started_at: float, env_path: Path, summary: str):
def render(rows: Dict[str, Row], started_at: float, env_path: Path, serial_to_index: Dict[str, int]):
now = time.time()
print("\x1b[2J\x1b[H", end="")
print("Read Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)")
print(f"env: {env_path} | {summary} | uptime: {int(now - started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("HackRF Energy Monitor (relative power: RMS / linear / dBFS, not calibrated dBm)")
print(f"env: {env_path} | discovered: {len(serial_to_index)} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
header = f"{'band':>5} {'idx':>4} {'freq':>8} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>6} {'age':>5} {'device':>18} error"
header = f"{'band':>5} {'idx':>3} {'freq':>7} {'status':>9} {'rms':>10} {'power':>12} {'dBFS':>9} {'N':>5} {'age':>5} {'serial':>12} error"
print(header)
print("-" * len(header))
for label in sorted(rows, key=label_sort_key):
row = rows[label]
idx = "-" if row.index is None else str(row.index)
age = "-" if row.updated_at <= 0 else f"{(now - row.updated_at):.1f}"
err = row.error or ""
print('-' * len(header))
for label in sorted(rows, key=lambda x: int(x)):
r = rows[label]
idx = '-' if r.index is None else str(r.index)
age = '-' if r.updated_at <= 0 else f"{(now-r.updated_at):.1f}"
err = (r.error or "")
if len(err) > 64:
err = err[:61] + "..."
device = row.device[-18:]
err = err[:61] + '...'
print(
f"{row.label:>5} {idx:>4} {row.freq_hz / 1e6:>8.1f} {row.status:>9} "
f"{fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} {fmt(row.dbfs, '.2f'):>9} "
f"{row.samples:>6} {age:>5} {device:>18} {err}"
f"{r.label:>5} {idx:>3} {r.freq_hz/1e6:>7.1f} {r.status:>9} "
f"{fmt(r.rms, '.6f'):>10} {fmt(r.power_lin, '.8f'):>12} {fmt(r.dbfs, '.2f'):>9} "
f"{r.samples:>5} {age:>5} {r.serial[-12:]:>12} {err}"
)
print()
print("Ctrl+C to stop. Use --backend iio --uri ip:192.168.2.1 --only 2400 for Neptune.")
print("Ctrl+C to stop. Use --only 2400,5200 or --freq-mhz 2450 to limit/override tuning.")
sys.stdout.flush()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Realtime SDR relative energy monitor")
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
parser.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
parser.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
parser.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all targets (MHz)")
parser.add_argument(
"--sample-rate",
type=float,
default=None,
help="Sample rate in Hz (HackRF default: 2e6, IIO default: keep current device setting)",
)
parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (default: keep device setting)")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / capture size")
parser.add_argument("--interval", type=float, default=0.5, help="Per-target read interval (s)")
parser.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)")
parser.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after ERR/BUSY (s)")
parser.add_argument("--gain", type=float, default=0.0, help="General gain for HackRF")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF")
parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1")
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
parser.add_argument(
"--iio-gain-mode",
choices=("manual", "fast_attack", "slow_attack", "hybrid"),
default="slow_attack",
help="IIO gain control mode",
)
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
parser.add_argument("--settle", type=float, default=0.12, help="Wait after retune before reading in IIO mode (s)")
return parser
p = argparse.ArgumentParser(description="Realtime HackRF relative energy monitor")
p.add_argument("--env", default=".env", help="Path to .env (default: repo_root/.env)")
p.add_argument("--only", default="", help="Comma-separated labels (e.g. 2400,5200)")
p.add_argument("--freq-mhz", type=float, default=None, help="Override tuning frequency for all devices (MHz)")
p.add_argument("--sample-rate", type=float, default=2e6, help="Sample rate in Hz (HackRF min ~2e6)")
p.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
p.add_argument("--interval", type=float, default=0.5, help="Per-device read interval (s)")
p.add_argument("--refresh", type=float, default=0.5, help="Console refresh interval (s)")
p.add_argument("--reopen-delay", type=float, default=1.0, help="Retry delay after BUSY/ERR (s)")
p.add_argument("--gain", type=float, default=0.0, help="General gain")
p.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
p.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
return p
def main() -> int:
args = build_parser().parse_args()
only = {item.strip() for item in args.only.split(",") if item.strip()} or None
only = {x.strip() for x in args.only.split(',') if x.strip()} or None
env_path = Path(args.env)
if not env_path.is_absolute():
env_path = (Path(__file__).resolve().parent / env_path).resolve()
env = parse_env(env_path)
if args.backend == "hackrf":
targets = collect_hackrf_targets(env, only=only, override_freq_mhz=args.freq_mhz)
targets = collect_targets(env, only=only, override_freq_mhz=args.freq_mhz)
if not targets:
print(f"No hack_/HACKID_ entries found in {env_path}", file=sys.stderr)
return 2
try:
serial_to_index = parse_hackrf_info()
except Exception as exc:
print(f"hackrf discovery failed: {exc}", file=sys.stderr)
return 3
summary = f"backend=hackrf | discovered={len(serial_to_index)}"
if args.sample_rate is None:
args.sample_rate = 2e6
else:
try:
targets = collect_iio_targets(env, only=only, override_freq_mhz=args.freq_mhz, uri=args.uri)
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 2
if not targets:
print(
f"No IIO targets resolved from {env_path}. Use c_freq_* in .env or pass --only / --freq-mhz.",
file=sys.stderr,
)
return 2
summary = f"backend=iio | uri={args.uri} | device={args.iio_device} | targets={len(targets)}"
rows = {target.label: Row(label=target.label, device=target.device, freq_hz=target.freq_hz) for target in targets}
rows = {t.label: Row(label=t.label, serial=t.serial, freq_hz=t.freq_hz) for t in targets}
lock = threading.Lock()
stop_event = threading.Event()
@ -592,32 +295,27 @@ def main() -> int:
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
workers: List[threading.Thread] = []
if args.backend == "hackrf":
for idx, target in enumerate(targets):
worker_args = argparse.Namespace(**vars(args))
worker_args.stagger = idx * 0.15
worker = HackRfWorker(target, serial_to_index, rows, lock, stop_event, worker_args)
workers.append(worker)
worker.start()
else:
worker = IioWorker(targets, rows, lock, stop_event, args)
workers.append(worker)
worker.start()
workers: List[Worker] = []
for i, t in enumerate(targets):
wa = argparse.Namespace(**vars(args))
wa.stagger = i * 0.15
w = Worker(t, serial_to_index, rows, lock, stop_event, wa)
workers.append(w)
w.start()
started = time.time()
try:
while not stop_event.is_set():
with lock:
snapshot = {key: Row(**vars(value)) for key, value in rows.items()}
render(snapshot, started, env_path, summary)
snap = {k: Row(**vars(v)) for k, v in rows.items()}
render(snap, started, env_path, serial_to_index)
stop_event.wait(args.refresh)
finally:
stop_event.set()
for worker in workers:
worker.join(timeout=2)
for w in workers:
w.join(timeout=2)
return 0
if __name__ == "__main__":
if __name__ == '__main__':
raise SystemExit(main())

@ -1,25 +1,13 @@
#!/usr/bin/env python3
"""
Examples:
HackRF:
./.venv-sdr/bin/python read_energy_wide.py \
--backend hackrf \
--serial 0000000000000000a18c63dc2a83b813 \
--sample-rate 20000000 \
--base 6000 \
--roof 5700 \
--step 20
Neptune / Pluto over IIO:
./.venv-sdr/bin/python read_energy_wide.py \
--backend iio \
--uri ip:192.168.2.1 \
--base 2402 \
--roof 2398 \
--step 1
"""
#!/usr/bin/env python3
import argparse
import math
import re
@ -28,7 +16,7 @@ import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Tuple
from typing import Dict, List, Optional, Tuple
try:
import numpy as np
@ -45,7 +33,6 @@ except Exception as exc:
sys.exit(1)
EPS = 1e-20
IIO_MIN_SAMPLE_RATE = 2083333
@dataclass
@ -66,10 +53,10 @@ class ScanWindow:
pass_no: int = 0
class HackRfWideProbe(gr.top_block):
class WideProbeTop(gr.top_block):
def __init__(
self,
serial: str,
index: int,
center_freq_hz: float,
sample_rate: float,
vec_len: int,
@ -80,7 +67,7 @@ class HackRfWideProbe(gr.top_block):
super().__init__("hackrf_energy_wide_probe")
self.probe = blocks.probe_signal_vc(vec_len)
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
self.src = osmosdr.source(args=f"numchan=1 hackrf={serial}")
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
self.src.set_sample_rate(sample_rate)
self.src.set_center_freq(center_freq_hz, 0)
@ -95,8 +82,8 @@ class HackRfWideProbe(gr.top_block):
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
try:
getattr(self.src, fn)(val, 0)
except Exception as exc:
raise RuntimeError(f"failed to set {fn}={val}: {exc}") from exc
except Exception:
pass
try:
self.src.set_bandwidth(0, 0)
except Exception:
@ -108,7 +95,7 @@ class HackRfWideProbe(gr.top_block):
self.connect((self.src, 0), (self.stream_to_vec, 0))
self.connect((self.stream_to_vec, 0), (self.probe, 0))
def tune(self, freq_hz: float):
def tune(self, freq_hz: float) -> None:
self.src.set_center_freq(freq_hz, 0)
def read_metrics(self) -> Tuple[float, float, float, int]:
@ -152,154 +139,374 @@ class HackRfWideProbe(gr.top_block):
return rms, power_lin, dbfs, samples
class IioWideProbe:
def __init__(self, args: argparse.Namespace):
self.args = args
self._static_signature: Optional[Tuple[Optional[float], Optional[float], str, Optional[float], str]] = None
self._last_freq_hz: Optional[int] = None
self._input_channels = [args.iio_i_channel]
if args.iio_q_channel not in self._input_channels:
self._input_channels.append(args.iio_q_channel)
def _run(self, cmd: Sequence[str], binary: bool = False) -> str | bytes:
proc = subprocess.run(list(cmd), capture_output=True, text=not binary)
if proc.returncode != 0:
stderr = proc.stderr if binary else (proc.stderr or "")
stdout = "" if binary else (proc.stdout or "")
details = stderr.strip() or stdout.strip() or f"exit code {proc.returncode}"
raise RuntimeError(details)
return proc.stdout
def _run_binary(self, cmd: Sequence[str]) -> bytes:
proc = subprocess.run(list(cmd), capture_output=True)
if proc.returncode != 0:
details = proc.stderr.decode("utf-8", errors="ignore").strip() or proc.stdout.decode("utf-8", errors="ignore").strip()
raise RuntimeError(details or f"exit code {proc.returncode}")
return proc.stdout
def _read_input_attr(self, channel: str, attr: str) -> str:
out = self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
]
def parse_hackrf_info() -> Dict[str, int]:
try:
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
except FileNotFoundError:
raise RuntimeError("hackrf_info not found")
except subprocess.TimeoutExpired:
raise RuntimeError("hackrf_info timeout")
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
out: Dict[str, int] = {}
cur_idx: Optional[int] = None
for line in text.splitlines():
m = re.search(r"^Index:\s*(\d+)", line)
if m:
cur_idx = int(m.group(1))
continue
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if m and cur_idx is not None:
out[m.group(1).lower()] = cur_idx
if not out:
raise RuntimeError("no devices parsed from hackrf_info")
return out
def fmt(value: Optional[float], spec: str) -> str:
return "-" if value is None else format(value, spec)
def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[ScanWindow]:
if step_mhz <= 0:
raise ValueError("step must be > 0")
if base_mhz == roof_mhz:
raise ValueError("base and roof must be different")
direction = -1.0 if roof_mhz < base_mhz else 1.0
edge = base_mhz
seq = 1
windows: List[ScanWindow] = []
while True:
next_edge = edge + direction * step_mhz
if direction < 0 and next_edge < roof_mhz:
next_edge = roof_mhz
if direction > 0 and next_edge > roof_mhz:
next_edge = roof_mhz
low_mhz = min(edge, next_edge)
high_mhz = max(edge, next_edge)
center_mhz = (low_mhz + high_mhz) / 2.0
windows.append(
ScanWindow(
seq=seq,
start_mhz=edge,
end_mhz=next_edge,
low_mhz=low_mhz,
high_mhz=high_mhz,
center_mhz=center_mhz,
)
return str(out).strip()
def _set_input_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def _set_output_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.uri,
"-q",
"-o",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
if next_edge == roof_mhz:
break
edge = next_edge
seq += 1
return windows
def render(
windows: List[ScanWindow],
serial: str,
index: int,
sample_rate: float,
base_mhz: float,
roof_mhz: float,
step_mhz: float,
started_at: float,
pass_no: int,
current_seq: int,
) -> None:
now = time.time()
capture_bw_mhz = sample_rate / 1e6
current_row = next((row for row in windows if row.seq == current_seq), None)
best_row = max(
(row for row in windows if row.status == "OK" and row.dbfs is not None),
key=lambda row: row.dbfs if row.dbfs is not None else float("-inf"),
default=None,
)
print("\x1b[2J\x1b[H", end="")
print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)")
print(
f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | "
f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | "
f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
print()
header = (
f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} "
f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error"
)
print(header)
print("-" * len(header))
for row in windows:
age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}"
err = row.error
if len(err) > 50:
err = err[:47] + "..."
marker = ">>>" if row.seq == current_seq else ""
print(
f"{marker:>3} {row.seq:>3} "
f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} "
f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} "
f"{row.samples:>5} {age:>5} {err}"
)
print()
if best_row is not None:
best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}"
print(
f"{'':>3} {'MAX':>3} "
f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} "
f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} "
f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}"
)
elif current_row is not None:
current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}"
print(
f"{'':>3} {'MAX':>3} "
f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} "
f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} "
f"{0:>5} {current_age:>5} no successful windows yet"
)
print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.")
sys.stdout.flush()
def get_effective_sample_rate(self) -> float:
if self.args.sample_rate is not None:
return float(self.args.sample_rate)
raw = self._read_input_attr(self.args.iio_i_channel, "sampling_frequency")
return float(raw)
def ensure_configured(self):
sample_rate = None
if self.args.sample_rate is not None:
sample_rate = int(round(max(float(self.args.sample_rate), IIO_MIN_SAMPLE_RATE)))
bandwidth = None
if self.args.bandwidth is not None:
bandwidth = int(round(max(float(self.args.bandwidth), 200000.0)))
signature = (
None if sample_rate is None else float(sample_rate),
None if bandwidth is None else float(bandwidth),
self.args.iio_gain_mode,
self.args.iio_hardwaregain,
self.args.iio_port_select,
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy")
parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info")
parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz")
parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz")
parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz")
parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)")
parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window")
parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)")
parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite")
parser.add_argument("--gain", type=float, default=16.0, help="General gain")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
return parser
def main() -> int:
args = build_parser().parse_args()
serial = args.serial.lower()
try:
windows = build_windows(args.base, args.roof, args.step)
except ValueError as exc:
print(f"invalid scan range: {exc}", file=sys.stderr)
return 2
step_hz = args.step * 1e6
if args.sample_rate < step_hz:
print(
f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; "
"this would leave gaps in the scan",
file=sys.stderr,
)
return 2
try:
serial_to_index = parse_hackrf_info()
except Exception as exc:
print(f"hackrf discovery failed: {exc}", file=sys.stderr)
return 3
index = serial_to_index.get(serial)
if index is None:
print(f"serial {serial} not found in hackrf_info", file=sys.stderr)
print("available serials:", file=sys.stderr)
for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]):
print(f" idx={item_index} serial={item_serial}", file=sys.stderr)
return 4
stop_requested = False
def on_signal(signum, frame):
nonlocal stop_requested
stop_requested = True
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
probe: Optional[WideProbeTop] = None
started_at = time.time()
pass_no = 0
current_seq = windows[0].seq
try:
probe = WideProbeTop(
index=index,
center_freq_hz=windows[0].center_mhz * 1e6,
sample_rate=args.sample_rate,
vec_len=args.vec_len,
gain=args.gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
probe.start()
time.sleep(max(args.settle, 0.12))
while not stop_requested:
pass_no += 1
for row in windows:
if stop_requested:
break
current_seq = row.seq
try:
probe.tune(row.center_mhz * 1e6)
rms, power_lin, dbfs, samples = probe.read_window(
settle=args.settle,
avg_reads=args.avg_reads,
pause_between_reads=args.pause_between_reads,
)
if signature == self._static_signature:
return
for channel in self._input_channels:
if sample_rate is not None:
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
if bandwidth is not None:
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
if self.args.iio_port_select:
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
if self.args.iio_gain_mode:
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
self._static_signature = signature
def tune(self, freq_hz: float):
target = int(round(freq_hz))
if self._last_freq_hz == target:
return
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
self._last_freq_hz = target
def read_metrics(self, samples_per_read: int) -> Tuple[float, float, float, int]:
cmd = [
"iio_readdev",
"-u",
self.args.uri,
"-T",
str(int(self.args.timeout_ms)),
"-b",
str(max(4, int(samples_per_read))),
"-s",
str(max(4, int(samples_per_read))),
self.args.iio_device,
self.args.iio_i_channel,
self.args.iio_q_channel,
]
raw = self._run_binary(cmd)
values = np.frombuffer(raw, dtype="<i2")
if values.size == 0:
row.status = "OK"
row.rms = rms
row.power_lin = power_lin
row.dbfs = dbfs
row.samples = samples
row.error = ""
row.updated_at = time.time()
row.pass_no = pass_no
except Exception as exc:
row.status = "ERR"
row.error = str(exc)
row.updated_at = time.time()
render(
windows=windows,
serial=serial,
index=index,
sample_rate=args.sample_rate,
base_mhz=args.base,
roof_mhz=args.roof,
step_mhz=args.step,
started_at=started_at,
pass_no=pass_no,
current_seq=current_seq,
)
if args.passes > 0 and pass_no >= args.passes:
break
except Exception as exc:
print(f"scanner failed: {exc}", file=sys.stderr)
return 5
finally:
if probe is not None:
try:
probe.stop()
probe.wait()
except Exception:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import math
import re
import signal
import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
try:
import numpy as np
except Exception as exc:
print(f"numpy import failed: {exc}", file=sys.stderr)
sys.exit(1)
try:
from gnuradio import blocks, gr
import osmosdr
except Exception as exc:
print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr)
print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy_wide.py", file=sys.stderr)
sys.exit(1)
EPS = 1e-20
@dataclass
class ScanWindow:
seq: int
start_mhz: float
end_mhz: float
low_mhz: float
high_mhz: float
center_mhz: float
status: str = "INIT"
rms: Optional[float] = None
power_lin: Optional[float] = None
dbfs: Optional[float] = None
samples: int = 0
updated_at: float = 0.0
error: str = ""
pass_no: int = 0
class WideProbeTop(gr.top_block):
def __init__(
self,
index: int,
center_freq_hz: float,
sample_rate: float,
vec_len: int,
gain: float,
if_gain: float,
bb_gain: float,
):
super().__init__("hackrf_energy_wide_probe")
self.probe = blocks.probe_signal_vc(vec_len)
self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len)
self.src = osmosdr.source(args=f"numchan=1 hackrf={index}")
self.src.set_time_unknown_pps(osmosdr.time_spec_t())
self.src.set_sample_rate(sample_rate)
self.src.set_center_freq(center_freq_hz, 0)
try:
self.src.set_freq_corr(0, 0)
except Exception:
pass
try:
self.src.set_gain_mode(False, 0)
except Exception:
pass
for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)):
try:
getattr(self.src, fn)(val, 0)
except Exception:
pass
try:
self.src.set_bandwidth(0, 0)
except Exception:
pass
try:
self.src.set_antenna("", 0)
except Exception:
pass
self.connect((self.src, 0), (self.stream_to_vec, 0))
self.connect((self.stream_to_vec, 0), (self.probe, 0))
def tune(self, freq_hz: float) -> None:
self.src.set_center_freq(freq_hz, 0)
def read_metrics(self) -> Tuple[float, float, float, int]:
arr = np.asarray(self.probe.level(), dtype=np.complex64)
if arr.size == 0:
raise RuntimeError("no samples")
if values.size % 2 != 0:
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
i = iq[:, 0]
q = iq[:, 1]
power_lin = float(np.mean(i * i + q * q))
power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag))
rms = math.sqrt(max(power_lin, 0.0))
dbfs = 10.0 * math.log10(max(power_lin, EPS))
return rms, power_lin, dbfs, int(iq.shape[0])
return rms, power_lin, dbfs, int(arr.size)
def read_window(self, settle: float, avg_reads: int, pause_between_reads: float, samples_per_read: int) -> Tuple[float, float, float, int]:
def read_window(self, settle: float, avg_reads: int, pause_between_reads: float) -> Tuple[float, float, float, int]:
if settle > 0:
time.sleep(settle)
@ -309,10 +516,10 @@ class IioWideProbe:
last_error: Optional[Exception] = None
for idx in range(read_count):
deadline = time.time() + max(1.0, self.args.timeout_ms / 1000.0)
deadline = time.time() + 1.0
while True:
try:
_, power_lin, _, samples = self.read_metrics(samples_per_read)
_, power_lin, _, samples = self.read_metrics()
powers.append(power_lin)
sample_sizes.append(samples)
break
@ -334,21 +541,21 @@ class IioWideProbe:
def parse_hackrf_info() -> Dict[str, int]:
try:
proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15)
except FileNotFoundError as exc:
raise RuntimeError("hackrf_info not found") from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError("hackrf_info timeout") from exc
except FileNotFoundError:
raise RuntimeError("hackrf_info not found")
except subprocess.TimeoutExpired:
raise RuntimeError("hackrf_info timeout")
text = (proc.stdout or "") + "\n" + (proc.stderr or "")
out: Dict[str, int] = {}
cur_idx: Optional[int] = None
for line in text.splitlines():
match = re.search(r"^Index:\s*(\d+)", line)
if match:
cur_idx = int(match.group(1))
m = re.search(r"^Index:\s*(\d+)", line)
if m:
cur_idx = int(m.group(1))
continue
match = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if match and cur_idx is not None:
out[match.group(1).lower()] = cur_idx
m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line)
if m and cur_idx is not None:
out[m.group(1).lower()] = cur_idx
if not out:
raise RuntimeError("no devices parsed from hackrf_info")
return out
@ -400,9 +607,8 @@ def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[Sca
def render(
windows: List[ScanWindow],
backend: str,
device_ref: str,
index_ref: str,
serial: str,
index: int,
sample_rate: float,
base_mhz: float,
roof_mhz: float,
@ -420,30 +626,30 @@ def render(
default=None,
)
print("\x1b[2J\x1b[H", end="")
print("Wide Energy Monitor (relative power: RMS / linear / dBFS)")
print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)")
print(
f"backend: {backend} | device: {device_ref} | idx: {index_ref} | sample-rate: {capture_bw_mhz:.3f} MHz | "
f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | "
f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | "
f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
print()
header = (
f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} "
f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>6} {'age':>5} error"
f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error"
)
print(header)
print("-" * len(header))
for row in windows:
age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}"
err = row.error
if len(err) > 56:
err = err[:53] + "..."
if len(err) > 50:
err = err[:47] + "..."
marker = ">>>" if row.seq == current_seq else ""
print(
f"{marker:>3} {row.seq:>3} "
f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} "
f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} "
f"{row.samples:>6} {age:>5} {err}"
f"{row.samples:>5} {age:>5} {err}"
)
print()
if best_row is not None:
@ -452,7 +658,7 @@ def render(
f"{'':>3} {'MAX':>3} "
f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} "
f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} "
f"{best_row.samples:>6} {best_age:>5} pass={best_row.pass_no}"
f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}"
)
elif current_row is not None:
current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}"
@ -460,53 +666,33 @@ def render(
f"{'':>3} {'MAX':>3} "
f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} "
f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} "
f"{0:>6} {current_age:>5} no successful windows yet"
f"{0:>5} {current_age:>5} no successful windows yet"
)
print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.")
sys.stdout.flush()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Retune one SDR across a wide frequency range and measure energy")
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
parser.add_argument("--serial", help="HackRF serial number from hackrf_info")
parser.add_argument(
"--sample-rate",
type=float,
default=None,
help="Sample rate in Hz (required for hackrf, optional for iio to keep device setting)",
)
parser.add_argument("--bandwidth", type=float, default=None, help="RF bandwidth in Hz (iio only, optional)")
parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy")
parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info")
parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz")
parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz")
parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz")
parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length / samples per read")
parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length")
parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)")
parser.add_argument("--avg-reads", type=int, default=3, help="How many reads to average per window")
parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window")
parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)")
parser.add_argument("--gain", type=float, default=16.0, help="General gain for HackRF")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain for HackRF")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain for HackRF")
parser.add_argument("--uri", default="ip:192.168.2.1", help="IIO URI, e.g. ip:192.168.2.1")
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
parser.add_argument(
"--iio-gain-mode",
choices=("manual", "fast_attack", "slow_attack", "hybrid"),
default="slow_attack",
help="IIO gain control mode",
)
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite")
parser.add_argument("--gain", type=float, default=16.0, help="General gain")
parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain")
parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain")
return parser
def main() -> int:
args = build_parser().parse_args()
serial = args.serial.lower()
try:
windows = build_windows(args.base, args.roof, args.step)
@ -514,23 +700,11 @@ def main() -> int:
print(f"invalid scan range: {exc}", file=sys.stderr)
return 2
probe: Optional[HackRfWideProbe | IioWideProbe] = None
device_ref = ""
index_ref = "-"
if args.backend == "hackrf":
if not args.serial:
print("--serial is required for --backend hackrf", file=sys.stderr)
return 2
if args.sample_rate is None:
print("--sample-rate is required for --backend hackrf", file=sys.stderr)
return 2
serial = args.serial.lower()
step_hz = args.step * 1e6
if args.sample_rate < step_hz:
print(
f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; this would leave gaps in the scan",
f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; "
"this would leave gaps in the scan",
file=sys.stderr,
)
return 2
@ -549,38 +723,6 @@ def main() -> int:
print(f" idx={item_index} serial={item_serial}", file=sys.stderr)
return 4
probe = HackRfWideProbe(
serial=serial,
center_freq_hz=windows[0].center_mhz * 1e6,
sample_rate=args.sample_rate,
vec_len=args.vec_len,
gain=args.gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
effective_sample_rate = float(args.sample_rate)
device_ref = serial
index_ref = str(index)
else:
probe = IioWideProbe(args)
try:
probe.ensure_configured()
effective_sample_rate = probe.get_effective_sample_rate()
except Exception as exc:
print(f"iio setup failed: {exc}", file=sys.stderr)
return 3
step_hz = args.step * 1e6
if effective_sample_rate < step_hz:
print(
f"effective sample-rate {effective_sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; this would leave gaps in the scan",
file=sys.stderr,
)
return 2
device_ref = args.uri
index_ref = "iio"
stop_requested = False
def on_signal(signum, frame):
@ -590,12 +732,21 @@ def main() -> int:
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
probe: Optional[WideProbeTop] = None
started_at = time.time()
pass_no = 0
current_seq = windows[0].seq
try:
if isinstance(probe, HackRfWideProbe):
probe = WideProbeTop(
index=index,
center_freq_hz=windows[0].center_mhz * 1e6,
sample_rate=args.sample_rate,
vec_len=args.vec_len,
gain=args.gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
probe.start()
time.sleep(max(args.settle, 0.12))
@ -607,18 +758,10 @@ def main() -> int:
current_seq = row.seq
try:
probe.tune(row.center_mhz * 1e6)
if isinstance(probe, HackRfWideProbe):
rms, power_lin, dbfs, samples = probe.read_window(
settle=args.settle,
avg_reads=args.avg_reads,
pause_between_reads=args.pause_between_reads,
)
else:
rms, power_lin, dbfs, samples = probe.read_window(
settle=args.settle,
avg_reads=args.avg_reads,
pause_between_reads=args.pause_between_reads,
samples_per_read=args.vec_len,
)
row.status = "OK"
row.rms = rms
@ -632,13 +775,11 @@ def main() -> int:
row.status = "ERR"
row.error = str(exc)
row.updated_at = time.time()
render(
windows=windows,
backend=args.backend,
device_ref=device_ref,
index_ref=index_ref,
sample_rate=effective_sample_rate,
serial=serial,
index=index,
sample_rate=args.sample_rate,
base_mhz=args.base,
roof_mhz=args.roof,
step_mhz=args.step,
@ -646,12 +787,13 @@ def main() -> int:
pass_no=pass_no,
current_seq=current_seq,
)
if args.passes > 0 and pass_no >= args.passes:
break
except Exception as exc:
print(f"scanner failed: {exc}", file=sys.stderr)
return 5
finally:
if isinstance(probe, HackRfWideProbe):
if probe is not None:
try:
probe.stop()
probe.wait()

@ -1,23 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import os
import signal
import subprocess
import sys
import threading
import time
import signal
import argparse
import threading
import numpy as np
from gnuradio import gr
import osmosdr
IIO_MIN_SAMPLE_RATE = 2_083_333
class SimsiSink(gr.sync_block):
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1000000, delay=0.0):
gr.sync_block.__init__(
self,
name="Simsi_Sink",
in_sig=[np.complex64],
out_sig=None,
)
class RotatingIqWriter:
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0):
self.save_dir = str(save_dir)
self.file_tag = str(file_tag)
self.split_size = int(split_size)
@ -82,56 +86,35 @@ class RotatingIqWriter:
self.file_index += 1
self._open_next_file()
def write_samples(self, samples):
data = np.asarray(samples, dtype=np.complex64).reshape(-1)
def work(self, input_items, output_items):
data = input_items[0]
offset = 0
total = int(data.size)
total = len(data)
while offset < total:
remaining = self.split_size - self.current_len
chunk = min(remaining, total - offset)
block = np.ascontiguousarray(data[offset:offset + chunk], dtype=np.complex64)
self.current_fd.write(block.tobytes())
self.current_fd.write(data[offset:offset + chunk].copy())
self.current_len += chunk
offset += chunk
if self.current_len >= self.split_size:
self._rotate_file()
def close(self):
return len(data)
def stop(self):
try:
if self.current_fd is not None:
self.current_fd.close()
self.current_fd = None
finally:
self._remove_in_progress()
class SimsiSink(gr.sync_block):
def __init__(self, save_dir="./signal", file_tag="fragment_", split_size=1_000_000, delay=0.0):
gr.sync_block.__init__(
self,
name="Simsi_Sink",
in_sig=[np.complex64],
out_sig=None,
)
self.writer = RotatingIqWriter(
save_dir=save_dir,
file_tag=file_tag,
split_size=split_size,
delay=delay,
)
def work(self, input_items, output_items):
self.writer.write_samples(input_items[0])
return len(input_items[0])
def stop(self):
self.writer.close()
return True
class HackRfDataSaver(gr.top_block):
class DataSaver(gr.top_block):
def __init__(
self,
serial: str,
@ -147,204 +130,47 @@ class HackRfDataSaver(gr.top_block):
):
super().__init__("data_saver_headless", catch_exceptions=True)
dev_args = f"numchan=1 hackrf={serial}"
self.serial = serial
self.freq = float(freq)
self.save_dir = save_dir
self.file_tag = file_tag
self.samp_rate = float(samp_rate)
self.split_size = int(split_size)
self.delay = float(delay)
self.rf_gain = float(rf_gain)
self.if_gain = float(if_gain)
self.bb_gain = float(bb_gain)
dev_args = f"numchan=1 hackrf={self.serial}"
self.source = osmosdr.source(args=dev_args)
self.source.set_time_unknown_pps(osmosdr.time_spec_t())
self.source.set_sample_rate(float(samp_rate))
self.source.set_center_freq(float(freq), 0)
self.source.set_sample_rate(self.samp_rate)
self.source.set_center_freq(self.freq, 0)
self.source.set_freq_corr(0, 0)
self.source.set_dc_offset_mode(0, 0)
self.source.set_iq_balance_mode(0, 0)
self.source.set_gain_mode(False, 0)
self.source.set_gain(float(rf_gain), 0)
self.source.set_if_gain(float(if_gain), 0)
self.source.set_bb_gain(float(bb_gain), 0)
self.source.set_gain(self.rf_gain, 0)
self.source.set_if_gain(self.if_gain, 0)
self.source.set_bb_gain(self.bb_gain, 0)
self.source.set_antenna("", 0)
self.source.set_bandwidth(0, 0)
self.sink = SimsiSink(
save_dir=save_dir,
file_tag=file_tag,
split_size=split_size,
delay=delay,
save_dir=self.save_dir,
file_tag=self.file_tag,
split_size=self.split_size,
delay=self.delay,
)
self.connect((self.source, 0), (self.sink, 0))
class IioCapture:
def __init__(self, args: argparse.Namespace):
self.args = args
self.writer = RotatingIqWriter(
save_dir=args.save_dir,
file_tag=args.file_tag,
split_size=args.split_size,
delay=args.delay,
)
self._static_signature = None
self._last_freq_hz = None
self._input_channels = [args.iio_i_channel]
if args.iio_q_channel not in self._input_channels:
self._input_channels.append(args.iio_q_channel)
def _run(self, cmd):
proc = subprocess.run(list(cmd), capture_output=True, text=True)
if proc.returncode != 0:
details = (proc.stderr or "").strip() or (proc.stdout or "").strip() or f"exit code {proc.returncode}"
raise RuntimeError(details)
return proc.stdout
def _run_binary(self, cmd, stop_event: threading.Event):
proc = subprocess.Popen(list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
while True:
try:
stdout, stderr = proc.communicate(timeout=0.2)
break
except subprocess.TimeoutExpired:
if stop_event.is_set():
proc.terminate()
try:
proc.communicate(timeout=1.0)
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
raise InterruptedError("stop requested")
if proc.returncode != 0:
details = stderr.decode("utf-8", errors="ignore").strip() or stdout.decode("utf-8", errors="ignore").strip()
raise RuntimeError(details or f"exit code {proc.returncode}")
return stdout
finally:
if proc.poll() is None:
proc.kill()
proc.communicate()
def _set_input_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.iio_uri,
"-q",
"-i",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def _set_output_attr(self, channel: str, attr: str, value: str):
self._run(
[
"iio_attr",
"-u",
self.args.iio_uri,
"-q",
"-o",
"-c",
self.args.iio_phy_device,
channel,
attr,
value,
]
)
def ensure_configured(self):
sample_rate = int(round(max(float(self.args.samp_rate), IIO_MIN_SAMPLE_RATE)))
bandwidth = None
if self.args.bandwidth is not None:
bandwidth = int(round(max(float(self.args.bandwidth), 200_000.0)))
signature = (
float(sample_rate),
None if bandwidth is None else float(bandwidth),
self.args.iio_gain_mode,
self.args.iio_hardwaregain,
self.args.iio_port_select,
)
if signature == self._static_signature:
return
for channel in self._input_channels:
self._set_input_attr(channel, "sampling_frequency", str(sample_rate))
if bandwidth is not None:
self._set_input_attr(channel, "rf_bandwidth", str(bandwidth))
if self.args.iio_port_select:
self._set_input_attr(channel, "rf_port_select", self.args.iio_port_select)
if self.args.iio_gain_mode:
self._set_input_attr(channel, "gain_control_mode", self.args.iio_gain_mode)
if self.args.iio_gain_mode == "manual" and self.args.iio_hardwaregain is not None:
self._set_input_attr(channel, "hardwaregain", f"{self.args.iio_hardwaregain:.6f}")
self._static_signature = signature
def tune(self):
target = int(round(float(self.args.freq)))
if self._last_freq_hz == target:
return
self._set_output_attr(self.args.iio_lo_channel, "frequency", str(target))
self._last_freq_hz = target
def read_chunk(self, stop_event: threading.Event):
samples_per_read = self.args.samples_per_read
if samples_per_read is None:
samples_per_read = max(4096, min(int(self.args.split_size), 262_144))
raw = self._run_binary(
[
"iio_readdev",
"-u",
self.args.iio_uri,
"-T",
str(int(self.args.timeout_ms)),
"-b",
str(int(samples_per_read)),
"-s",
str(int(samples_per_read)),
self.args.iio_device,
self.args.iio_i_channel,
self.args.iio_q_channel,
],
stop_event,
)
values = np.frombuffer(raw, dtype="<i2")
if values.size == 0:
raise RuntimeError("no samples")
if values.size % 2 != 0:
raise RuntimeError(f"unexpected IQ sample payload: {values.size} int16 values")
iq = values.reshape(-1, 2).astype(np.float32, copy=False)
complex_samples = (iq[:, 0] + 1j * iq[:, 1]).astype(np.complex64, copy=False)
complex_samples /= 32768.0
return complex_samples
def run(self, stop_event: threading.Event):
self.ensure_configured()
self.tune()
if self.args.settle > 0:
time.sleep(self.args.settle)
while not stop_event.is_set():
try:
samples = self.read_chunk(stop_event)
except InterruptedError:
break
self.writer.write_samples(samples)
def close(self):
self.writer.close()
def parse_args():
parser = argparse.ArgumentParser(description="Headless IQ saver for HackRF or IIO SDR backends")
parser = argparse.ArgumentParser(description="Headless GNU Radio IQ saver for HackRF")
parser.add_argument("--backend", choices=("hackrf", "iio"), default="hackrf", help="SDR backend")
parser.add_argument("--serial", help="HackRF serial number")
parser.add_argument("--serial", required=True, help="HackRF serial number")
parser.add_argument("--freq", type=float, required=True, help="Center frequency in Hz")
parser.add_argument("--save-dir", required=True, help="Directory for output IQ files")
parser.add_argument("--file-tag", default="fragment_", help="Prefix for output files")
@ -357,88 +183,52 @@ def parse_args():
parser.add_argument("--if-gain", type=float, default=30, help="HackRF IF gain")
parser.add_argument("--bb-gain", type=float, default=36, help="HackRF BB gain")
parser.add_argument("--bandwidth", type=float, default=None, help="Optional IIO RF bandwidth in Hz")
parser.add_argument("--iio-uri", default="ip:192.168.2.1", help="IIO URI")
parser.add_argument("--iio-device", default="cf-ad9361-lpc", help="IIO RX buffer device")
parser.add_argument("--iio-phy-device", default="ad9361-phy", help="IIO PHY device")
parser.add_argument("--iio-i-channel", default="voltage0", help="IIO I channel")
parser.add_argument("--iio-q-channel", default="voltage1", help="IIO Q channel")
parser.add_argument("--iio-lo-channel", default="altvoltage0", help="IIO LO channel for RX frequency")
parser.add_argument("--iio-port-select", default="A_BALANCED", help="IIO rf_port_select value")
parser.add_argument("--iio-gain-mode", default="slow_attack", help="IIO gain_control_mode value")
parser.add_argument("--iio-hardwaregain", type=float, default=None, help="IIO hardware gain in dB for manual mode")
parser.add_argument("--timeout-ms", type=int, default=4000, help="IIO read timeout in milliseconds")
parser.add_argument("--settle", type=float, default=0.12, help="Delay after tuning for IIO backend")
parser.add_argument("--samples-per-read", type=int, default=None, help="IIO complex samples per read call")
return parser.parse_args()
def main():
args = parse_args()
stop_event = threading.Event()
if args.backend == "hackrf" and not args.serial:
print("--serial is required for --backend hackrf", file=sys.stderr)
sys.exit(2)
tb = DataSaver(
serial=args.serial,
freq=args.freq,
save_dir=args.save_dir,
file_tag=args.file_tag,
samp_rate=args.samp_rate,
split_size=args.split_size,
delay=args.delay,
rf_gain=args.rf_gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
capture = None
tb = None
stop_event = threading.Event()
def handle_signal(sig, frame):
print(f"Received signal {sig}, stopping...", flush=True)
stop_event.set()
if tb is not None:
tb.stop()
tb.wait()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
print("Starting capture...", flush=True)
print(f" backend: {args.backend}", flush=True)
print("Starting flowgraph...", flush=True)
print(f" serial: {args.serial}", flush=True)
print(f" freq: {args.freq}", flush=True)
print(f" save_dir: {args.save_dir}", flush=True)
print(f" file_tag: {args.file_tag}", flush=True)
print(f" samp_rate: {args.samp_rate}", flush=True)
print(f" split_size: {args.split_size}", flush=True)
try:
if args.backend == "hackrf":
print(f" serial: {args.serial}", flush=True)
print(f" gains: rf={args.rf_gain} if={args.if_gain} bb={args.bb_gain}", flush=True)
tb = HackRfDataSaver(
serial=args.serial,
freq=args.freq,
save_dir=args.save_dir,
file_tag=args.file_tag,
samp_rate=args.samp_rate,
split_size=args.split_size,
delay=args.delay,
rf_gain=args.rf_gain,
if_gain=args.if_gain,
bb_gain=args.bb_gain,
)
tb.start()
try:
while not stop_event.is_set():
time.sleep(0.5)
else:
print(f" uri: {args.iio_uri}", flush=True)
print(
f" iio: device={args.iio_device} phy={args.iio_phy_device} "
f"port={args.iio_port_select} gain_mode={args.iio_gain_mode}",
flush=True,
)
capture = IioCapture(args)
capture.run(stop_event)
except KeyboardInterrupt:
handle_signal(signal.SIGINT, None)
finally:
if capture is not None:
capture.close()
if tb is not None and not stop_event.is_set():
tb.stop()
tb.wait()
print("Stopped.", flush=True)

Loading…
Cancel
Save