|
|
|
@ -27,6 +27,9 @@ class Signal:
|
|
|
|
self.signal = []
|
|
|
|
self.signal = []
|
|
|
|
self.signal_abs = []
|
|
|
|
self.signal_abs = []
|
|
|
|
self.last_packet_ts = None
|
|
|
|
self.last_packet_ts = None
|
|
|
|
|
|
|
|
self.metric_mode = os.getenv('signal_metric_mode', 'fft_top_bins').strip().lower()
|
|
|
|
|
|
|
|
self.fft_top_bins = max(1, int(os.getenv('signal_fft_top_bins', '2048')))
|
|
|
|
|
|
|
|
self.fft_window = os.getenv('signal_fft_window', 'hann').strip().lower()
|
|
|
|
|
|
|
|
|
|
|
|
def get_signal(self):
|
|
|
|
def get_signal(self):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
@ -47,6 +50,30 @@ class Signal:
|
|
|
|
self.signal_abs = []
|
|
|
|
self.signal_abs = []
|
|
|
|
self.last_packet_ts = None
|
|
|
|
self.last_packet_ts = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_window(self, size: int) -> np.ndarray:
|
|
|
|
|
|
|
|
if self.fft_window in {'', 'none', 'rect', 'rectangular'}:
|
|
|
|
|
|
|
|
return np.ones(size, dtype=np.float32)
|
|
|
|
|
|
|
|
if self.fft_window == 'hann':
|
|
|
|
|
|
|
|
return np.hanning(size).astype(np.float32, copy=False)
|
|
|
|
|
|
|
|
raise ValueError(f'unsupported fft window: {self.fft_window}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _compute_iq_power(self, samples: np.ndarray, signal_abs: np.ndarray) -> float:
|
|
|
|
|
|
|
|
if self.conv_method == 'max':
|
|
|
|
|
|
|
|
return float(np.max(signal_abs * signal_abs))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.metric_mode in {'fft', 'fft_top_bins', 'top_bins'}:
|
|
|
|
|
|
|
|
window = self._build_window(samples.size)
|
|
|
|
|
|
|
|
windowed = samples.astype(np.complex64, copy=False) * window
|
|
|
|
|
|
|
|
spectrum = np.fft.fft(windowed)
|
|
|
|
|
|
|
|
power_bins = (np.abs(spectrum) ** 2).astype(np.float32, copy=False)
|
|
|
|
|
|
|
|
power_bins /= max(float(np.sum(window * window)), 1.0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bins_to_keep = min(self.fft_top_bins, power_bins.size)
|
|
|
|
|
|
|
|
top_bins = np.partition(power_bins, power_bins.size - bins_to_keep)[-bins_to_keep:]
|
|
|
|
|
|
|
|
return float(np.mean(top_bins))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return float(np.mean(signal_abs * signal_abs))
|
|
|
|
|
|
|
|
|
|
|
|
def signal_preprocessing(self, length) -> float:
|
|
|
|
def signal_preprocessing(self, length) -> float:
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Предобработка сигнала.
|
|
|
|
Предобработка сигнала.
|
|
|
|
@ -64,10 +91,7 @@ class Signal:
|
|
|
|
signal = np.array([i, q], dtype=np.float32)
|
|
|
|
signal = np.array([i, q], dtype=np.float32)
|
|
|
|
signal_abs = np.sqrt(i * i + q * q).astype(np.float32, copy=False)
|
|
|
|
signal_abs = np.sqrt(i * i + q * q).astype(np.float32, copy=False)
|
|
|
|
|
|
|
|
|
|
|
|
if self.conv_method == 'max':
|
|
|
|
power = self._compute_iq_power(samples, signal_abs)
|
|
|
|
power = float(np.max(signal_abs * signal_abs))
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
power = float(np.mean(signal_abs * signal_abs))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = 10.0 * math.log10(max(power, 1e-20))
|
|
|
|
result = 10.0 * math.log10(max(power, 1e-20))
|
|
|
|
self.signal = signal
|
|
|
|
self.signal = signal
|
|
|
|
|