You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/NN_server/Model.py

273 lines
12 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from tqdm import tqdm
import numpy as np
import random
import os
import re
import gc
class Model(object):
_model_id = 0
_ind_inference = 1
_result_list = dict()
@staticmethod
def get_model_id():
try:
return Model._model_id
except Exception as exc:
print(str(exc))
@staticmethod
def _get_inc_model_id():
try:
Model._model_id += 1
return Model._model_id
except Exception as exc:
print(str(exc))
@staticmethod
def get_ind_inference():
try:
return Model._ind_inference
except Exception as exc:
print(str(exc))
@staticmethod
def get_inc_ind_inference():
try:
Model._ind_inference += 1
except Exception as exc:
print(str(exc))
@staticmethod
def _init_result_list(type_model=''):
try:
Model._result_list[type_model] = {}
except Exception as exc:
print(str(exc))
@staticmethod
def get_result_list():
try:
def get_max(dict_inf=None):
try:
return max([(len(i[0]) if len(i) else 0) for i in dict_inf.values()])
except Exception as error:
print(str(error))
return 0
max_length_label = max([get_max(i) for i in Model._result_list.values()])
num_inf = max(list(map(int, list(Model._result_list.values())[0].keys())))
max_length_type_model = max([len(i) for i in Model._result_list.keys()])
num_gaps = (max_length_type_model + 4) * (len(Model._result_list) + 1) + 2 * (len(Model._result_list) + 2)
print('_' * num_gaps)
print('||' + ' ' * (max_length_type_model + 4) + '|', end='')
for type_model in Model._result_list.keys():
print('|' + ' ' * ((max_length_type_model - len(type_model)) // 2 + 2), end='')
print(type_model, end='')
print(' ' * ((max_length_type_model - len(type_model)) // 2 + 2) + '|', end='')
print('|')
for ind_inf in range(1, num_inf+1):
print('||' + ' ' * ((max_length_type_model - len(str(ind_inf))) // 2 + 2), end='')
print(str(ind_inf) if len(str(ind_inf)) % 2 == 0 else str(ind_inf) + ' ', end='')
print(' ' * ((max_length_type_model - len(str(ind_inf))) // 2 + 2) + '|', end='')
for info_inference in Model._result_list.values():
if len(info_inference[ind_inf]) != 0:
length_gap_left = (max_length_label - len(info_inference[ind_inf][0])) // 2 + (max_length_label - len(info_inference[ind_inf][0])) % 2
length_gap_right = (max_length_label - len(info_inference[ind_inf][0])) // 2 + 1
to_print = (' ' * length_gap_left + info_inference[ind_inf][0] + ' ' * length_gap_right) + (str(info_inference[ind_inf][1])
if len(str(info_inference[ind_inf][1])) != 3 else str(info_inference[ind_inf][1]) + ' ')
else:
to_print = ' ' * max_length_type_model
print('|' + ' ' * ((max_length_type_model - len(to_print)) // 2 + 2), end='')
print(to_print, end='')
print(' ' * ((max_length_type_model - len(to_print)) // 2 + 2) + '|', end='')
print('|')
print('_' * num_gaps)
except Exception as exc:
print(str(exc))
@staticmethod
def _add_in_result_list(type_model='', ind_inference=0, list_to_add=None):
try:
Model._result_list[type_model][ind_inference] = list_to_add
except Exception as exc:
print(str(exc))
def __init__(self, file_model='', file_config='', src_example='', src_result='', type_model='',
build_model_func=None, pre_func=None, inference_func=None, post_func=None, classes=None,
number_synthetic_examples=0, number_src_data_for_one_synthetic_example=0, path_to_src_dataset=''):
try:
self._file_model = file_model
self._file_config = file_config
self._src_example = src_example
self._src_result = src_result
self._type_model = type_model
self._build_model_func = build_model_func
self._pre_func = pre_func
self._inference_func = inference_func
self._post_func = post_func
self._classes = classes
self._num_outputs = len(self._classes.keys())
self._number_synthetic_examples = number_synthetic_examples
self._number_src_data_for_one_synthetic_example = number_src_data_for_one_synthetic_example
self._path_to_src_dataset = path_to_src_dataset
self._data = None
self._shablon = ' Модель ' + str(self._model_id+1) + ' с типом ' + str(self._type_model)
self._model = self._build_model()
self._model_id = Model._get_inc_model_id()
self._init_result_list(type_model=self._type_model)
except Exception as exc:
print(str(exc))
def __str__(self):
try:
if self._model is None:
return self._shablon + ' не работает!' + '\n'
else:
return self._shablon + ' работает!' + '\n'
except Exception as exc:
print(str(exc))
def get_mapping(self):
return list(self._classes.values())
def get_model_name(self):
return self._type_model
def get_shablon(self):
return self._shablon
def get_model(self):
return self._model
def _build_model(self):
try:
print('Инициализация' + self._shablon)
return self._build_model_func(file_model=self._file_model, file_config=self._file_config,
num_classes=len(self._classes))
except Exception as exc:
print(str(exc))
def _prepare_data(self, data=None):
try:
print('Подготовка данных' + self._shablon)
self._data = self._pre_func(data, src=self._src_result, ind_inference=Model.get_ind_inference())
except Exception as exc:
print(str(exc))
def _post_data(self, prediction=None):
print('Постобработка данных' + self._shablon)
self._ind_inference += 1
self._post_func(src=self._src_result, data=self._data, model_id=self._model_id, model_type=self._type_model,
ind_inference=Model.get_ind_inference(), prediction=prediction)
def get_test_inference(self):
try:
self._test_inference()
except Exception as exc:
print(str(exc))
def _create_synthetic_examples(self):
try:
print('#' * 100)
print('Создание синтетических примеров: ' + self._shablon)
path_to_example_directory = os.path.join(self._src_example, self._type_model)
os.mkdir(path_to_example_directory)
for ind in tqdm(range(self._number_synthetic_examples)):
try:
label = self._classes[random.randint(0, self._num_outputs-1)]
path_to_src_directory = os.path.join(self._path_to_src_dataset, label)
with open(path_to_src_directory + '/' + os.listdir(path_to_src_directory)[0], 'rb') as data_file:
data = np.frombuffer(data_file.read(), dtype=np.float32)
array_example = np.zeros(np.shape(data))
for _ in range(self._number_src_data_for_one_synthetic_example):
with open(path_to_src_directory + '/' + random.choice(os.listdir(path_to_src_directory)), 'rb') as data_file:
data = np.frombuffer(data_file.read(), dtype=np.float32)
array_example = np.add(array_example, data)
np.save(path_to_example_directory + '/' + label + '_' + str(ind+1), array_example / self._number_src_data_for_one_synthetic_example)
except Exception as exc:
print(str(exc))
print('Создание синтетических примеров завершено!')
print()
except Exception as exc:
print(str(exc))
print()
def _test_inference(self):
try:
self._create_synthetic_examples()
count_access = 1
count_attempt = 1
path_to_example = os.path.join(self._src_example, self._type_model)
_, _, files = next(os.walk(path_to_example))
if files:
ind_inference = 0
for file in files:
with open(path_to_example + '/' + file, 'rb') as data_file:
self._data = np.frombuffer(data_file.read(), dtype=np.float32)
print()
self._prepare_data(data=self._data)
print('Тестовый инференс' + self._shablon + ' попытка ' + str(count_attempt))
prediction, probability = self._inference_func(data=self._data, model=self._model, mapping=self._classes,
shablon=self._shablon)
for value in self._classes.values():
if value in re.split('[._/]', file):
if value == prediction:
print('Тест ' + str(count_attempt) + ' пройден!\n')
count_access += 1
else:
print('Тест ' + str(count_attempt) + ' провален!\n')
count_attempt += 1
break
print()
print('Постобработка данных' + self._shablon)
ind_inference += 1
self._post_func(src=path_to_example+'/', data=self._data, ind_inference=ind_inference, model_id=self._model_id, model_type=self._type_model, prediction=prediction)
print('\nТестовый инференс' + self._shablon + ' пройден с результатом ' + str(100 * (count_access - 1) / (count_attempt - 1)) + ' %')
print('#' * 100)
print()
else:
print('\nНет данных для тестового инференса')
print()
except Exception as exc:
print(str(exc))
def get_inference(self, data=None):
try:
return self._inference(data=data)
except Exception as exc:
print(str(exc))
return None
def _inference(self, data=None):
try:
Model._add_in_result_list(type_model=self._type_model, ind_inference=self.get_ind_inference(), list_to_add=[])
self._prepare_data(data=data)
print('Инференс' + self._shablon)
prediction, probability = self._inference_func(data=self._data, model=self._model, mapping=self._classes,
shablon=self._shablon)
Model._add_in_result_list(type_model=self._type_model, ind_inference=self.get_ind_inference(), list_to_add=[prediction, probability])
self._post_data(prediction=prediction)
gc.collect()
return prediction, probability
except Exception as exc:
print(str(exc))
return None