подсчет типа dbfs

fft
Sergey Revyakin 2 weeks ago
parent 93072ea0fc
commit b53f6022b2

@ -1,107 +1,129 @@
import os import os
import numpy as np import math
from typing import Union import numpy as np
from common.runtime import load_root_env from typing import Union
from common.runtime import load_root_env
load_root_env(__file__) load_root_env(__file__)
def get_signal_length(freq): def get_signal_length(freq):
length = int(os.getenv('signal_length_' + str(freq))) length = int(os.getenv('signal_length_' + str(freq)))
return length return length
class Signal: class Signal:
""" """
Класс сбора и предобработки сигнала. Класс сбора и предобработки сигнала.
Атрибуты: Атрибуты:
length: Длина сигнала. length: Длина сигнала.
signal: Массив, в который собираем сигнал. signal: Массив, в который собираем сигнал.
""" """
def __init__(self, conv_method='average'): def __init__(self, conv_method='average'):
self.conv_method = conv_method self.conv_method = conv_method
self.signal = [] self.signal = []
self.signal_abs = [] self.signal_abs = []
def get_signal(self): def get_signal(self):
""" """
Возвращает собранный сигнал. Возвращает собранный сигнал.
:return: Массив с сигналом. :return: Массив с сигналом.
""" """
return self.signal, self.signal_abs return self.signal, self.signal_abs
def clear(self) -> None: def clear(self) -> None:
""" """
Очистить массив с сигналом после предобработки? Очистить массив с сигналом после предобработки?
:return: None :return: None
""" """
self.signal = [] self.signal = []
self.signal_abs = [] self.signal_abs = []
def signal_preprocessing(self, length) -> float: def signal_preprocessing(self, length) -> float:
""" """
Предобработка сигнала. Предобработка сигнала.
:return: Число типа float - "характеристика сигнала". :return: Число типа float - "характеристика сигнала".
""" """
signal = np.array([self.signal.real[0:length], self.signal.imag[0:length]], dtype=np.float32) samples = np.asarray(self.signal).ravel()[0:length]
signal_abs = np.linalg.norm(signal, axis=0) # Поэлементный модуль комплексного числа. shape.result if samples.size == 0:
# (1, self.length) return 0.0
if self.conv_method == 'max':
result = np.max(signal_abs) # Основной режим: считаем dBFS из IQ-вектора.
else: if np.iscomplexobj(samples):
result = np.median(signal_abs) i = samples.real.astype(np.float32, copy=False)
self.signal = signal q = samples.imag.astype(np.float32, copy=False)
self.signal_abs = signal_abs signal = np.array([i, q], dtype=np.float32)
return result signal_abs = np.sqrt(i * i + q * q).astype(np.float32, copy=False)
def fill_signal(self, lvl, length) -> Union[int, float]: if self.conv_method == 'max':
""" power = float(np.max(signal_abs * signal_abs))
Сбор сигнала в соответствующий массив. Если уже собран, то предобработка. else:
:param lvl: Массив, без ограничения общности, с неизвестной длиной, содержащий сигнал. power = float(np.mean(signal_abs * signal_abs))
:param length:
:return: 0 - если еще нет нужного количества сигнала, "характеристика" иначе. result = 10.0 * math.log10(max(power, 1e-20))
""" self.signal = signal
if len(self.signal) <= length: self.signal_abs = signal_abs
y = np.array(lvl).ravel() return result
self.signal = np.concatenate((self.signal, y), axis=None)
return 0 # Fallback: если на вход уже подали скалярную метрику, агрегируем как есть.
else: scalar_samples = samples.astype(np.float32, copy=False)
preproc_signal = self.signal_preprocessing(length) if self.conv_method == 'max':
#self.clear() result = float(np.max(scalar_samples))
return preproc_signal else:
result = float(np.median(scalar_samples))
class SignalsArray: self.signal = scalar_samples
""" self.signal_abs = np.abs(scalar_samples)
Класс для сохранения медиан сигналов на частотах. return result
Атрибуты:
sig_array: Список для сохранения медиан. def fill_signal(self, lvl, length) -> Union[int, float]:
counter: Индикатор наполненности массива. """
""" Сбор сигнала в соответствующий массив. Если уже собран, то предобработка.
def __init__(self): :param lvl: Массив, без ограничения общности, с неизвестной длиной, содержащий сигнал.
self.sig_array = [] :param length:
self.counter = 0 :return: 0 - если еще нет нужного количества сигнала, "характеристика" иначе.
"""
def fill_sig_arr(self, metrica, num_chs=3): if len(self.signal) <= length:
""" y = np.array(lvl).ravel()
Аппендим характеристику сигнала (метрику) в массив длиной num_chs. self.signal = np.concatenate((self.signal, y), axis=None)
:param metrica: Характеристика сигнала (метрика). return 0
:param num_chs: Количество каналов на частоте. else:
:return: Индекс канала внутри частоты и массив с характеристиками, если заполнен, иначе - пустой. preproc_signal = self.signal_preprocessing(length)
""" return preproc_signal
if num_chs:
if self.counter < num_chs:
self.sig_array.append(metrica) class SignalsArray:
self.counter += 1 """
if self.counter == num_chs: Класс для сохранения медиан сигналов на частотах.
arr = self.sig_array Атрибуты:
self.sig_array = [] sig_array: Список для сохранения медиан.
self.counter = 0 counter: Индикатор наполненности массива.
return num_chs - 1, arr """
else:
return self.counter - 1, [] def __init__(self):
else: self.sig_array = []
return 0, [] self.counter = 0
def fill_sig_arr(self, metrica, num_chs=3):
"""
Аппендим характеристику сигнала (метрику) в массив длиной num_chs.
:param metrica: Характеристика сигнала (метрика).
:param num_chs: Количество каналов на частоте.
:return: Индекс канала внутри частоты и массив с характеристиками, если заполнен, иначе - пустой.
"""
if num_chs:
if self.counter < num_chs:
self.sig_array.append(metrica)
self.counter += 1
if self.counter == num_chs:
arr = self.sig_array
self.sig_array = []
self.counter = 0
return num_chs - 1, arr
else:
return self.counter - 1, []
else:
return 0, []

Loading…
Cancel
Save