добавил подсчет по dbfs

fft
Sergey Revyakin 3 weeks ago
parent 3a936fac9c
commit 6911d8ab25

@ -10,11 +10,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('HACKID_1200', 'orange_scripts/main_1200.py')
return resolve_hackrf_index('HACKID_1200', 'orange_scripts/main_1200.py')
serial_number = os.getenv('HACKID_1200')
pos = None
output = []

@ -10,11 +10,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('HACKID_2400', 'orange_scripts/main_2400.py')
return resolve_hackrf_index('HACKID_2400', 'orange_scripts/main_2400.py')
serial_number = os.getenv('HACKID_2400')
pos = None
output = []

@ -10,11 +10,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('HACKID_915', 'orange_scripts/main_915.py')
return resolve_hackrf_index('HACKID_915', 'orange_scripts/main_915.py')
serial_number = os.getenv('HACKID_915')
pos = None
output = []

@ -1,170 +1,182 @@
import statistics
# Более лучшая версия кода есть в FRScanner
class DataBuffer:
"""
Класс с реализацией циклического буффера.
Атрибуты:
current_column: Указатель на текущий столбец буфера, который обновляем.
thinning_counter: Прореживающий множитель на текующей итерации.
current_counter: Указатель на количество чтений между последним обновлением столбца и предыдущим атрибутом.
num_of_thinning_iter: Прореживающий множитель. Раз в это количечество раз будет обнволяться столбец буфера.
line_size: Количество строк буфера = количеству каналов.
columns_size: Количество столбцов = фиксированное число.
multiply_factor: Процентный показатель превышения сигналом уровня шума. ex m_p = 1.1 => триггер, если
сигнал превышает шум на 10%.
num_for_alarm: Количество раз, превышающих шум, при которых триггеримся = фиксированное число.
is_init: Флаг инициализации буфера. = True, если инициализирован.
buffer: Массив для буфера.
buffer_medians: Массив для медиан столбцов букера.
buffer_alarms: Массив для количества тревог по столбца буфера.
"""
def __init__(self, columns_size, num_of_thinning_iter, num_of_channels, multiply_factor, num_for_alarm):
"""
Инициализируем класс.
:param columns_size:
:param num_of_thinning_iter:
:param num_of_channels:
:param multiply_factor:
:param num_for_alarm:
"""
self.current_column = 0
self.thinning_counter = 1
self.current_counter = 1
self.num_of_thinning_iter = num_of_thinning_iter
self.line_size = num_of_channels
self.columns_size = columns_size
self.multiply_factor = multiply_factor
self.num_for_alarm = num_for_alarm
self.is_init = False
self.buffer = [[0 for _ in range(self.columns_size)] for _ in range(self.line_size)]
self.buffer_medians = [0] * self.line_size
self.buffer_alarms = [0] * self.line_size
def get_buffer(self):
return self.buffer
def get_medians(self):
return self.buffer_medians
def get_alarms(self):
return self.buffer_alarms
def check_init(self):
return self.is_init
def print(self):
print('buffer is: ')
for i in range(self.line_size):
print(self.buffer[i], end=' ')
print()
def medians(self):
"""
Вычислить медиану по строке буфера.
:return: None
"""
if self.check_init():
for i in range(self.line_size):
self.buffer_medians[i] = statistics.median(self.buffer[i])
# print('medians is: ', self.buffer_medians)
# return self.buffer_medians
def alarms_fill_zeros(self):
self.buffer_alarms = [0] * self.line_size
def update(self, data):
"""
Обновление буфера.
Если номер текущего чтения совпадает с количеством прореживающего множителя на текущем обновлении буфера, то
1. Обновляем буфер.
2. Двигаем курсор на след столбец. Если был последний столбец, то двигаем курсор в начало.
3. Берем медианы по буферу, если он уже проиницализирован.
4. Сбрасываем счетчик текущих чтений.
5. Если был последний столбец (и мы уже переключились на первый), то
Если прореживающий множитель на текующей итерации был единица, то мы иницилизировались
До тех пор, пока множитель на итерации меньше фиксированного, увеличиваем в два раза.
В противном случае - увеличиваем номер чтения.
:param data: Массив с метриками сигнала по каналам.
:return: None
"""
# TODO: Добавить время релаксации - если система затриггерилась, то перестать обновлять буфер на N чтений,
# где N задается в .env-template. Сейчас есть бага, что буфер перестает обновляться только когда система
# триггерится. Между тем, когда приходит аларм и num_for_alarm, когда сигнал алармовский, но система еще не
# триггерится, буфер продолжает обновляться. В таких условиях буфер может набрать в себя алармовских сигналов
# и повысить пороги. Пример такой ситуации: дрон висит на 1км, система его видит, но сигнал превышает порог раз
# через раз и аларм срабатывает не всегда. В таких условиях наберется высокий сигнал, повысятся пороги и когда
# дрон начнет движение вперед, он будет заметен на более низкой дистанции, чем обычно, так как пороги повышены.
if self.current_counter == self.thinning_counter:
for i in range(self.line_size):
self.buffer[i][self.current_column] = data[i]
self.current_column = (self.current_column + 1) % self.columns_size
#print('Столбец {0} обновлен. Перешли к столбцу {1}: '.format(self.current_column - 1, self.current_column))
self.medians()
self.current_counter = 1
if self.current_column == 0:
if self.thinning_counter == 1:
self.is_init = True
self.medians()
print('Начальная калибровка завершена.')
if self.thinning_counter < self.num_of_thinning_iter:
self.thinning_counter *= 2
# print('thinning counter обновлен: ', self.thinning_counter)
else:
self.current_counter += 1
# print('curr counter обновлен: ', self.current_counter)
def check_alarm(self, data):
"""
Проверка триггера системы.
Если значение по каналу превышает медиану (порог) шума на какой-то процент, то инкремент буфер аларма по каналу.
Превышение num_for_alarm подряд - триггер. Если после n превышений, где n<num_for_alarm приходит сигнал не
первышающий порог, то сбрасываем буфер алармов.
:param data:
:return: Да/нет.
"""
if self.check_init():
ratios=[]
print("="*50)
for i in range(len(data)):
exceeding = data[i] > self.multiply_factor * self.buffer_medians[i]
ratios.append(data[i]/self.buffer_medians[i])
if exceeding:
self.buffer_alarms[i] += 1
# print('Инкремент буффер алармов по каналу {0}, текущее число по этому каналу: {1}'.format(i,self.buffer_alarms[i]))
else:
self.buffer_alarms[i] = 0
# print('Обнулили буффер алармов по каналу {0}, текущее число по этому каналу: {1}'.format(i,self.buffer_alarms[i]))
if self.buffer_alarms[i] >= self.num_for_alarm:
# print('Сработала тревога по каналу {0}, текущее число по этому каналу: {1}'.format(i,self.buffer_alarms[i]))
self.buffer_alarms = [0] * self.line_size
print("Отношения:", [f"{r:.3f}" for r in ratios])
print("!"*50)
return True
print("Отношения:", [f"{r:.3f}" for r in ratios])
print("="*50)
return False
def check_single_alarm(self, median, cur_channel):
"""
Проверка, является ли текущая медиана по каналу превышающей порог.
:param median: меди (хар-ка) по каналу.
:param cur_channel: индекс канала внутри частоты.
:return: Да/нет.
"""
if self.check_init():
exceeding = median > self.multiply_factor * self.buffer_medians[cur_channel]
print(median/self.buffer_medians[cur_channel])
if exceeding:
return True
else:
return False
import os
import math
import statistics
# Более лучшая версия кода есть в FRScanner
class DataBuffer:
"""
Класс с реализацией циклического буффера.
Атрибуты:
current_column: Указатель на текущий столбец буфера, который обновляем.
thinning_counter: Прореживающий множитель на текующей итерации.
current_counter: Указатель на количество чтений между последним обновлением столбца и предыдущим атрибутом.
num_of_thinning_iter: Прореживающий множитель. Раз в это количечество раз будет обнволяться столбец буфера.
line_size: Количество строк буфера = количеству каналов.
columns_size: Количество столбцов = фиксированное число.
multiply_factor: Процентный показатель превышения сигналом уровня шума.
num_for_alarm: Количество раз, превышающих шум, при которых триггеримся.
is_init: Флаг инициализации буфера. = True, если инициализирован.
buffer: Массив для буфера.
buffer_medians: Массив для медиан столбцов букера.
buffer_alarms: Массив для количества тревог по столбца буфера.
"""
def __init__(self, columns_size, num_of_thinning_iter, num_of_channels, multiply_factor, num_for_alarm):
"""
Инициализируем класс.
:param columns_size:
:param num_of_thinning_iter:
:param num_of_channels:
:param multiply_factor:
:param num_for_alarm:
"""
self.current_column = 0
self.thinning_counter = 1
self.current_counter = 1
self.num_of_thinning_iter = num_of_thinning_iter
self.line_size = num_of_channels
self.columns_size = columns_size
self.multiply_factor = multiply_factor
self.num_for_alarm = num_for_alarm
self.is_init = False
self.buffer = [[0 for _ in range(self.columns_size)] for _ in range(self.line_size)]
self.buffer_medians = [0] * self.line_size
self.buffer_alarms = [0] * self.line_size
self.prev_values = [None] * self.line_size
self.trend_streak = [0] * self.line_size
# Рост в 15% по линейной мощности относительно фоновой медианы в dBFS.
self.dbfs_delta_ratio = float(os.getenv('dbfs_delta_percent', 15)) / 100.0
# Допускаем небольшой обратный ход, чтобы не сбрасываться от микрошума.
self.dbfs_max_backstep_db = float(os.getenv('dbfs_max_backstep_db', 0.25))
# Минимум подряд "плавных" шагов перед учетом как устойчивого роста.
self.dbfs_min_trend_steps = int(os.getenv('dbfs_min_trend_steps', max(1, self.num_for_alarm)))
def get_buffer(self):
return self.buffer
def get_medians(self):
return self.buffer_medians
def get_alarms(self):
return self.buffer_alarms
def check_init(self):
return self.is_init
def print(self):
print('buffer is: ')
for i in range(self.line_size):
print(self.buffer[i], end=' ')
print()
def medians(self):
"""
Вычислить медиану по строке буфера.
:return: None
"""
if self.check_init():
for i in range(self.line_size):
self.buffer_medians[i] = statistics.median(self.buffer[i])
def alarms_fill_zeros(self):
self.buffer_alarms = [0] * self.line_size
self.trend_streak = [0] * self.line_size
self.prev_values = [None] * self.line_size
@staticmethod
def _dbfs_growth_ratio(current_db, baseline_db):
return math.pow(10.0, (current_db - baseline_db) / 10.0) - 1.0
def update(self, data):
"""
Обновление буфера.
Если номер текущего чтения совпадает с количеством прореживающего множителя на текущем обновлении буфера, то
1. Обновляем буфер.
2. Двигаем курсор на след столбец. Если был последний столбец, то двигаем курсор в начало.
3. Берем медианы по буферу, если он уже проиницализирован.
4. Сбрасываем счетчик текущих чтений.
5. Если был последний столбец (и мы уже переключились на первый), то
Если прореживающий множитель на текующей итерации был единица, то мы иницилизировались.
До тех пор, пока множитель на итерации меньше фиксированного, увеличиваем в два раза.
В противном случае - увеличиваем номер чтения.
:param data: Массив с метриками сигнала по каналам.
:return: None
"""
if self.current_counter == self.thinning_counter:
for i in range(self.line_size):
self.buffer[i][self.current_column] = data[i]
self.current_column = (self.current_column + 1) % self.columns_size
self.medians()
self.current_counter = 1
if self.current_column == 0:
if self.thinning_counter == 1:
self.is_init = True
self.medians()
print('Начальная калибровка завершена.')
if self.thinning_counter < self.num_of_thinning_iter:
self.thinning_counter *= 2
else:
self.current_counter += 1
def check_alarm(self, data):
"""
Проверка триггера системы по dBFS во времени.
Триггер: устойчивый рост относительно фоновой медианы не меньше dbfs_delta_percent,
подтвержденный несколькими последовательными чтениями.
"""
if self.check_init():
for i in range(len(data)):
baseline = self.buffer_medians[i]
current = data[i]
growth_ratio = self._dbfs_growth_ratio(current, baseline)
prev = self.prev_values[i]
delta_db = 0.0 if prev is None else current - prev
monotonic_or_stable = (prev is None) or (delta_db >= -self.dbfs_max_backstep_db)
if monotonic_or_stable:
self.trend_streak[i] += 1
else:
self.trend_streak[i] = 0
exceeding = (
growth_ratio >= self.dbfs_delta_ratio
and self.trend_streak[i] >= self.dbfs_min_trend_steps
)
if exceeding:
self.buffer_alarms[i] += 1
else:
self.buffer_alarms[i] = 0
self.prev_values[i] = current
if self.buffer_alarms[i] >= self.num_for_alarm:
self.buffer_alarms = [0] * self.line_size
self.trend_streak = [0] * self.line_size
return True
return False
def check_single_alarm(self, median, cur_channel):
"""
Проверка, является ли текущая метрика по каналу превышающей порог роста.
:param median: текущая метрика в dBFS.
:param cur_channel: индекс канала внутри частоты.
:return: Да/нет.
"""
if self.check_init():
baseline = self.buffer_medians[cur_channel]
exceeding = self._dbfs_growth_ratio(median, baseline) >= self.dbfs_delta_ratio
if exceeding:
return True
else:
return False

@ -23,7 +23,6 @@ from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_3300', 'src/main_3300.py')
serial_number = os.getenv('hack_3300')

@ -21,11 +21,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_433', 'src/main_433.py')
return resolve_hackrf_index('hack_433', 'src/main_433.py')
serial_number = os.getenv('hack_433')
pos = None
output = []

@ -23,7 +23,6 @@ from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_4500', 'src/main_4500.py')
serial_number = os.getenv('hack_4500')

@ -18,14 +18,12 @@ import time
import threading
import subprocess
import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_5200', 'src/main_5200.py')
return resolve_hackrf_index('hack_5200', 'src/main_5200.py')
serial_number = os.getenv('hack_5200')
pos = None
output = []

@ -21,11 +21,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_5800', 'src/main_5800.py')
return resolve_hackrf_index('hack_5800', 'src/main_5800.py')
serial_number = os.getenv('hack_5800')
pos = None
output = []

@ -21,11 +21,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_750', 'src/main_750.py')
return resolve_hackrf_index('hack_750', 'src/main_750.py')
serial_number = os.getenv('hack_750')
pos = None
output = []

@ -21,11 +21,10 @@ import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_868', 'src/main_868.py')
return resolve_hackrf_index('hack_868', 'src/main_868.py')
serial_number = os.getenv('hack_868')
pos = None
output = []

Loading…
Cancel
Save