diff --git a/src/core/sig_n_medi_collect.py b/src/core/sig_n_medi_collect.py index be703ee..4c0d928 100644 --- a/src/core/sig_n_medi_collect.py +++ b/src/core/sig_n_medi_collect.py @@ -27,6 +27,9 @@ class Signal: self.signal = [] self.signal_abs = [] self.last_packet_ts = None + self.metric_mode = os.getenv('signal_metric_mode', 'fft_top_bins').strip().lower() + self.fft_top_bins = max(1, int(os.getenv('signal_fft_top_bins', '2048'))) + self.fft_window = os.getenv('signal_fft_window', 'hann').strip().lower() def get_signal(self): """ @@ -47,6 +50,30 @@ class Signal: self.signal_abs = [] self.last_packet_ts = None + def _build_window(self, size: int) -> np.ndarray: + if self.fft_window in {'', 'none', 'rect', 'rectangular'}: + return np.ones(size, dtype=np.float32) + if self.fft_window == 'hann': + return np.hanning(size).astype(np.float32, copy=False) + raise ValueError(f'unsupported fft window: {self.fft_window}') + + def _compute_iq_power(self, samples: np.ndarray, signal_abs: np.ndarray) -> float: + if self.conv_method == 'max': + return float(np.max(signal_abs * signal_abs)) + + if self.metric_mode in {'fft', 'fft_top_bins', 'top_bins'}: + window = self._build_window(samples.size) + windowed = samples.astype(np.complex64, copy=False) * window + spectrum = np.fft.fft(windowed) + power_bins = (np.abs(spectrum) ** 2).astype(np.float32, copy=False) + power_bins /= max(float(np.sum(window * window)), 1.0) + + bins_to_keep = min(self.fft_top_bins, power_bins.size) + top_bins = np.partition(power_bins, power_bins.size - bins_to_keep)[-bins_to_keep:] + return float(np.mean(top_bins)) + + return float(np.mean(signal_abs * signal_abs)) + def signal_preprocessing(self, length) -> float: """ Предобработка сигнала. @@ -64,10 +91,7 @@ class Signal: signal = np.array([i, q], dtype=np.float32) signal_abs = np.sqrt(i * i + q * q).astype(np.float32, copy=False) - if self.conv_method == 'max': - power = float(np.max(signal_abs * signal_abs)) - else: - power = float(np.mean(signal_abs * signal_abs)) + power = self._compute_iq_power(samples, signal_abs) result = 10.0 * math.log10(max(power, 1e-20)) self.signal = signal