diff --git a/src/core/sig_n_medi_collect.py b/src/core/sig_n_medi_collect.py index dddcb3d..bd196f4 100644 --- a/src/core/sig_n_medi_collect.py +++ b/src/core/sig_n_medi_collect.py @@ -1,107 +1,129 @@ -import os -import numpy as np -from typing import Union -from common.runtime import load_root_env - +import os +import math +import numpy as np +from typing import Union +from common.runtime import load_root_env + load_root_env(__file__) - - -def get_signal_length(freq): - length = int(os.getenv('signal_length_' + str(freq))) - return length - - -class Signal: - """ - Класс сбора и предобработки сигнала. - - Атрибуты: - length: Длина сигнала. - signal: Массив, в который собираем сигнал. - """ - - def __init__(self, conv_method='average'): - self.conv_method = conv_method - self.signal = [] - self.signal_abs = [] - - def get_signal(self): - """ - Возвращает собранный сигнал. - :return: Массив с сигналом. - """ - return self.signal, self.signal_abs - - def clear(self) -> None: - """ - Очистить массив с сигналом после предобработки? - :return: None - """ - self.signal = [] - self.signal_abs = [] - - def signal_preprocessing(self, length) -> float: - """ - Предобработка сигнала. - - :return: Число типа float - "характеристика сигнала". - """ - signal = np.array([self.signal.real[0:length], self.signal.imag[0:length]], dtype=np.float32) - signal_abs = np.linalg.norm(signal, axis=0) # Поэлементный модуль комплексного числа. shape.result - # (1, self.length) - if self.conv_method == 'max': - result = np.max(signal_abs) - else: - result = np.median(signal_abs) - self.signal = signal - self.signal_abs = signal_abs - return result - - def fill_signal(self, lvl, length) -> Union[int, float]: - """ - Сбор сигнала в соответствующий массив. Если уже собран, то предобработка. - :param lvl: Массив, без ограничения общности, с неизвестной длиной, содержащий сигнал. - :param length: - :return: 0 - если еще нет нужного количества сигнала, "характеристика" иначе. - """ - if len(self.signal) <= length: - y = np.array(lvl).ravel() - self.signal = np.concatenate((self.signal, y), axis=None) - return 0 - else: - preproc_signal = self.signal_preprocessing(length) - #self.clear() - return preproc_signal - - -class SignalsArray: - """ - Класс для сохранения медиан сигналов на частотах. - Атрибуты: - sig_array: Список для сохранения медиан. - counter: Индикатор наполненности массива. - """ - def __init__(self): - self.sig_array = [] - self.counter = 0 - - def fill_sig_arr(self, metrica, num_chs=3): - """ - Аппендим характеристику сигнала (метрику) в массив длиной num_chs. - :param metrica: Характеристика сигнала (метрика). - :param num_chs: Количество каналов на частоте. - :return: Индекс канала внутри частоты и массив с характеристиками, если заполнен, иначе - пустой. - """ - if num_chs: - if self.counter < num_chs: - self.sig_array.append(metrica) - self.counter += 1 - if self.counter == num_chs: - arr = self.sig_array - self.sig_array = [] - self.counter = 0 - return num_chs - 1, arr - else: - return self.counter - 1, [] - else: - return 0, [] + + +def get_signal_length(freq): + length = int(os.getenv('signal_length_' + str(freq))) + return length + + +class Signal: + """ + Класс сбора и предобработки сигнала. + + Атрибуты: + length: Длина сигнала. + signal: Массив, в который собираем сигнал. + """ + + def __init__(self, conv_method='average'): + self.conv_method = conv_method + self.signal = [] + self.signal_abs = [] + + def get_signal(self): + """ + Возвращает собранный сигнал. + :return: Массив с сигналом. + """ + return self.signal, self.signal_abs + + def clear(self) -> None: + """ + Очистить массив с сигналом после предобработки? + :return: None + """ + self.signal = [] + self.signal_abs = [] + + def signal_preprocessing(self, length) -> float: + """ + Предобработка сигнала. + + :return: Число типа float - "характеристика сигнала". + """ + samples = np.asarray(self.signal).ravel()[0:length] + if samples.size == 0: + return 0.0 + + # Основной режим: считаем dBFS из IQ-вектора. + if np.iscomplexobj(samples): + i = samples.real.astype(np.float32, copy=False) + q = samples.imag.astype(np.float32, copy=False) + signal = np.array([i, q], dtype=np.float32) + signal_abs = np.sqrt(i * i + q * q).astype(np.float32, copy=False) + + if self.conv_method == 'max': + power = float(np.max(signal_abs * signal_abs)) + else: + power = float(np.mean(signal_abs * signal_abs)) + + result = 10.0 * math.log10(max(power, 1e-20)) + self.signal = signal + self.signal_abs = signal_abs + return result + + # Fallback: если на вход уже подали скалярную метрику, агрегируем как есть. + scalar_samples = samples.astype(np.float32, copy=False) + if self.conv_method == 'max': + result = float(np.max(scalar_samples)) + else: + result = float(np.median(scalar_samples)) + + self.signal = scalar_samples + self.signal_abs = np.abs(scalar_samples) + return result + + def fill_signal(self, lvl, length) -> Union[int, float]: + """ + Сбор сигнала в соответствующий массив. Если уже собран, то предобработка. + :param lvl: Массив, без ограничения общности, с неизвестной длиной, содержащий сигнал. + :param length: + :return: 0 - если еще нет нужного количества сигнала, "характеристика" иначе. + """ + if len(self.signal) <= length: + y = np.array(lvl).ravel() + self.signal = np.concatenate((self.signal, y), axis=None) + return 0 + else: + preproc_signal = self.signal_preprocessing(length) + return preproc_signal + + +class SignalsArray: + """ + Класс для сохранения медиан сигналов на частотах. + Атрибуты: + sig_array: Список для сохранения медиан. + counter: Индикатор наполненности массива. + """ + + def __init__(self): + self.sig_array = [] + self.counter = 0 + + def fill_sig_arr(self, metrica, num_chs=3): + """ + Аппендим характеристику сигнала (метрику) в массив длиной num_chs. + :param metrica: Характеристика сигнала (метрика). + :param num_chs: Количество каналов на частоте. + :return: Индекс канала внутри частоты и массив с характеристиками, если заполнен, иначе - пустой. + """ + if num_chs: + if self.counter < num_chs: + self.sig_array.append(metrica) + self.counter += 1 + if self.counter == num_chs: + arr = self.sig_array + self.sig_array = [] + self.counter = 0 + return num_chs - 1, arr + else: + return self.counter - 1, [] + else: + return 0, []