You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/src/core/multichannelswitcher.py

177 lines
8.8 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
from core.data_buffer import DataBuffer
def get_centre_freq(freq):
"""
Получить название частоты по ее диапазону.
:param freq: Частота, которую обрабатываем.
:return: Название частоты.
"""
c_freq = 0
if 5.46e9 <= freq <= 6.0e9:
c_freq = 5800
if 5.0e9 <= freq <= 5.4e9:
c_freq = 5200
if 4.5e9 <= freq <= 4.7e9:
c_freq = 4500
if 3.3e9 <= freq <= 3.5e9:
c_freq = 3300
if 2.4e9 <= freq <= 2.5e9:
c_freq = 2400
if 1e9 <= freq <= 1.36e9:
c_freq = 1200
if 0.9e9 <= freq <= 0.960e9:
c_freq = 915
if 0.830e9 <= freq <= 0.890e9:
c_freq = 868
if 0.700e9 <= freq <= 0.780e9:
c_freq = 750
if 0.380e9 <= freq <= 0.500e9:
c_freq = 433
return str(c_freq)
class MultiChannel:
"""
Класс с реализацией переключателя каналов. Присутствует поддержка нескольких частот, а поэтому
Атрибуты:
steps: Массив шагов для разных частот. Ex. steps = [-20e6, -5e6, -3e6], i-ый элемент соответствует i-ой
частоте для обработке, типа 1.2, 915 и 868.
bases: Массив верхних границ диапазонов рассматриваемых частот. Ex bases = [1.36e9, 0.93e9, 0.87e9] для
1.2, 915 и 868.
roofs: То же самое, только нижних границ. Ex roofs = [1e9, 0.9e9, 0.85e9]
cur_channel: Указатель на текущий канал, который обрабатываем.
cur_roof: Указатель на нижнюю границу текущей обрабатываемой частоты.
cur_step: Указатель на шаг текущей обрабатываемой частоты.
num_chs: Массив из каналов по обрабатываемым частотам. Вычисляется автоматически исходя из границ и шага.
init_freq: Чекер на инициализацию частоты перед началом работы скрипта. Нужен из-за особенности
работы графов GNURadio и функции work в embedded Python блоке.
DB: Список из циклических буферов для соответствующих чатсот.
"""
def __init__(self, steps, bases, roofs):
"""
Инициализация класса.
:param steps: Список с шагами для соответствующих частот.
:param bases: Список верхних границ диапазонов частот, с которыми работаем.
:param roofs: Список нижних границ --//--.
"""
self.steps = steps
self.bases = bases
self.roofs = roofs
self.cur_channel = self.bases[0]
self.cur_roof = self.roofs[0]
self.cur_step = self.steps[0]
self.num_chs = []
self.init_freq = False
self.DB = []
def init_f(self):
"""
Инициализация начальной частоты, с которой начинаем обработку.
:return: Верхняя граница первой частоты из набора частот.
"""
self.init_freq = True
return self.bases[0]
def get_cur_channel(self):
"""
Получить текущий обрабатываемый канал.
:return: Канал обработки.
"""
return self.cur_channel
def change_channel(self):
"""
Функция смены канала. Идет от верхней границы диапазона частоты к нижней с шагом step. Если дошли до нижней
границы, то переключаемся на следующую частоту посредством переноса курсора текущего канала на верхнюю границу
новой частоты и указатель нижней границы также двигаем на следующую позицию. Если частота для обработки одна, то
указатель текущего канала возвращается в начало - верхней границы этой же частоты. Указатель нижней границы не
изменяется.
:return: Канал после смены.
"""
if not self.init_freq:
return self.init_f()
if self.cur_channel <= self.cur_roof:
if self.cur_roof == self.roofs[-1]:
self.cur_channel = self.bases[0]
self.cur_roof = self.roofs[0]
self.cur_step = self.steps[0]
else:
next_roofs = self.roofs.index(self.cur_roof) + 1
self.cur_channel = self.bases[next_roofs]
self.cur_roof = self.roofs[next_roofs]
self.cur_step = self.steps[next_roofs]
else:
self.cur_channel += self.cur_step
# print('Канал частоты изменен на ', self.cur_channel / 1000000)
return self.get_cur_channel()
def get_num_chs(self, idx_freq):
"""
Вычисляет количество каналов на частоте исходя из верхнего, нижнего диапазонов и шага.
:param idx_freq: id частоты внутри класса. Т.е. в данный момент обрабатывается несколько частот, то id =
индексу верхней границы в bases для данной частоты, или нижней границы в roofs или шагу в steps.
В примерах из описания атрибутов индекс частоты 915 будет равен единице (т.к. идет вторым элементом в списках).
:return: Количество каналов.
"""
if (idx_freq + 1) > len(self.num_chs):
tmp = self.bases[idx_freq]
counter = 0
while tmp >= self.roofs[idx_freq]:
counter += 1
tmp += self.steps[idx_freq]
self.num_chs.append(counter)
return counter
else:
return self.num_chs[idx_freq]
def check_f(self, freq):
"""
Проверить наличие частоты в классе. Если да, то вернуть количество каналов и циклический буфер этой частоты.
:param freq: Частота.
:return: Количество каналов, циклический буфер выбранной частоты ИЛИ none.
"""
for i in range(len(self.bases)):
if self.roofs[i] <= freq <= self.bases[i]:
return self.get_num_chs(i), self.DB[i]
else:
return None, None
def fill_DB(self):
"""
Инициализировать циклические буферы для всех частот в отдельный список.
:return: N0nE.
"""
for i in range(len(self.bases)):
freq = get_centre_freq(self.bases[i])
buffer_columns_size = int(os.getenv('buffer_columns_size_' + str(freq)))
num_of_thinning_iter = int(os.getenv('num_of_thinning_iter_' + str(freq)))
multiply_factor = float(os.getenv('multiply_factor_' + str(freq)))
num_for_alarm = int(os.getenv('num_for_alarm_' + str(freq)))
num_chs = self.get_num_chs(i)
self.DB.append(
DataBuffer(
buffer_columns_size,
num_of_thinning_iter,
num_chs,
multiply_factor,
num_for_alarm,
freq_tag=str(freq),
)
)
def db_alarms_zeros(self, circle_buffer):
"""
При отработке системы зануляет алармы во всех буферах, кроме текущего, т.к. в текущем уже занулилось.
:param circle_buffer: Циклический буфер текущей обрабатываемой частоты.
:return: None.
"""
for i in range(len(self.DB)):
if self.DB[i] != circle_buffer:
self.DB[i].alarms_fill_zeros()