from tqdm import tqdm import numpy as np import random import os import re import gc class Model(object): _model_id = 0 _ind_inference = 1 _result_list = dict() @staticmethod def get_model_id(): try: return Model._model_id except Exception as exc: print(str(exc)) @staticmethod def _get_inc_model_id(): try: Model._model_id += 1 return Model._model_id except Exception as exc: print(str(exc)) @staticmethod def get_ind_inference(): try: return Model._ind_inference except Exception as exc: print(str(exc)) @staticmethod def get_inc_ind_inference(): try: Model._ind_inference += 1 except Exception as exc: print(str(exc)) @staticmethod def _init_result_list(type_model=''): try: Model._result_list[type_model] = {} except Exception as exc: print(str(exc)) @staticmethod def get_result_list(): try: def get_max(dict_inf=None): try: return max([(len(i[0]) if len(i) else 0) for i in dict_inf.values()]) except Exception as error: print(str(error)) return 0 max_length_label = max([get_max(i) for i in Model._result_list.values()]) num_inf = max(list(map(int, list(Model._result_list.values())[0].keys()))) max_length_type_model = max([len(i) for i in Model._result_list.keys()]) num_gaps = (max_length_type_model + 4) * (len(Model._result_list) + 1) + 2 * (len(Model._result_list) + 2) print('_' * num_gaps) print('||' + ' ' * (max_length_type_model + 4) + '|', end='') for type_model in Model._result_list.keys(): print('|' + ' ' * ((max_length_type_model - len(type_model)) // 2 + 2), end='') print(type_model, end='') print(' ' * ((max_length_type_model - len(type_model)) // 2 + 2) + '|', end='') print('|') for ind_inf in range(1, num_inf+1): print('||' + ' ' * ((max_length_type_model - len(str(ind_inf))) // 2 + 2), end='') print(str(ind_inf) if len(str(ind_inf)) % 2 == 0 else str(ind_inf) + ' ', end='') print(' ' * ((max_length_type_model - len(str(ind_inf))) // 2 + 2) + '|', end='') for info_inference in Model._result_list.values(): if len(info_inference[ind_inf]) != 0: length_gap_left = (max_length_label - len(info_inference[ind_inf][0])) // 2 + (max_length_label - len(info_inference[ind_inf][0])) % 2 length_gap_right = (max_length_label - len(info_inference[ind_inf][0])) // 2 + 1 to_print = (' ' * length_gap_left + info_inference[ind_inf][0] + ' ' * length_gap_right) + (str(info_inference[ind_inf][1]) if len(str(info_inference[ind_inf][1])) != 3 else str(info_inference[ind_inf][1]) + ' ') else: to_print = ' ' * max_length_type_model print('|' + ' ' * ((max_length_type_model - len(to_print)) // 2 + 2), end='') print(to_print, end='') print(' ' * ((max_length_type_model - len(to_print)) // 2 + 2) + '|', end='') print('|') print('_' * num_gaps) except Exception as exc: print(str(exc)) @staticmethod def _add_in_result_list(type_model='', ind_inference=0, list_to_add=None): try: Model._result_list[type_model][ind_inference] = list_to_add except Exception as exc: print(str(exc)) def __init__(self, file_model='', file_config='', src_example='', src_result='', type_model='', build_model_func=None, pre_func=None, inference_func=None, post_func=None, classes=None, number_synthetic_examples=0, number_src_data_for_one_synthetic_example=0, path_to_src_dataset=''): try: self._file_model = file_model self._file_config = file_config self._src_example = src_example self._src_result = src_result self._type_model = type_model self._build_model_func = build_model_func self._pre_func = pre_func self._inference_func = inference_func self._post_func = post_func self._classes = classes self._num_outputs = len(self._classes.keys()) self._number_synthetic_examples = number_synthetic_examples self._number_src_data_for_one_synthetic_example = number_src_data_for_one_synthetic_example self._path_to_src_dataset = path_to_src_dataset self._data = None self._shablon = ' Модель ' + str(self._model_id+1) + ' с типом ' + str(self._type_model) self._model = self._build_model() self._model_id = Model._get_inc_model_id() self._init_result_list(type_model=self._type_model) except Exception as exc: print(str(exc)) def __str__(self): try: if self._model is None: return self._shablon + ' не работает!' + '\n' else: return self._shablon + ' работает!' + '\n' except Exception as exc: print(str(exc)) def get_mapping(self): return list(self._classes.values()) def get_model_name(self): return self._type_model def get_shablon(self): return self._shablon def get_model(self): return self._model def _build_model(self): try: print('Инициализация' + self._shablon) return self._build_model_func(file_model=self._file_model, file_config=self._file_config, num_classes=len(self._classes)) except Exception as exc: print(str(exc)) def _prepare_data(self, data=None): try: print('Подготовка данных' + self._shablon) self._data = self._pre_func(data, src=self._src_result, ind_inference=Model.get_ind_inference()) except Exception as exc: print(str(exc)) def _post_data(self, prediction=None): print('Постобработка данных' + self._shablon) self._ind_inference += 1 self._post_func(src=self._src_result, data=self._data, model_id=self._model_id, model_type=self._type_model, ind_inference=Model.get_ind_inference(), prediction=prediction) def get_test_inference(self): try: self._test_inference() except Exception as exc: print(str(exc)) def _create_synthetic_examples(self): try: print('#' * 100) print('Создание синтетических примеров: ' + self._shablon) path_to_example_directory = os.path.join(self._src_example, self._type_model) os.mkdir(path_to_example_directory) for ind in tqdm(range(self._number_synthetic_examples)): try: label = self._classes[random.randint(0, self._num_outputs-1)] path_to_src_directory = os.path.join(self._path_to_src_dataset, label) with open(path_to_src_directory + '/' + os.listdir(path_to_src_directory)[0], 'rb') as data_file: data = np.frombuffer(data_file.read(), dtype=np.float32) array_example = np.zeros(np.shape(data)) for _ in range(self._number_src_data_for_one_synthetic_example): with open(path_to_src_directory + '/' + random.choice(os.listdir(path_to_src_directory)), 'rb') as data_file: data = np.frombuffer(data_file.read(), dtype=np.float32) array_example = np.add(array_example, data) np.save(path_to_example_directory + '/' + label + '_' + str(ind+1), array_example / self._number_src_data_for_one_synthetic_example) except Exception as exc: print(str(exc)) print('Создание синтетических примеров завершено!') print() except Exception as exc: print(str(exc)) print() def _test_inference(self): try: self._create_synthetic_examples() count_access = 1 count_attempt = 1 path_to_example = os.path.join(self._src_example, self._type_model) _, _, files = next(os.walk(path_to_example)) if files: ind_inference = 0 for file in files: with open(path_to_example + '/' + file, 'rb') as data_file: self._data = np.frombuffer(data_file.read(), dtype=np.float32) print() self._prepare_data(data=self._data) print('Тестовый инференс' + self._shablon + ' попытка ' + str(count_attempt)) prediction, probability = self._inference_func(data=self._data, model=self._model, mapping=self._classes, shablon=self._shablon) for value in self._classes.values(): if value in re.split('[._/]', file): if value == prediction: print('Тест ' + str(count_attempt) + ' пройден!\n') count_access += 1 else: print('Тест ' + str(count_attempt) + ' провален!\n') count_attempt += 1 break print() print('Постобработка данных' + self._shablon) ind_inference += 1 self._post_func(src=path_to_example+'/', data=self._data, ind_inference=ind_inference, model_id=self._model_id, model_type=self._type_model, prediction=prediction) print('\nТестовый инференс' + self._shablon + ' пройден с результатом ' + str(100 * (count_access - 1) / (count_attempt - 1)) + ' %') print('#' * 100) print() else: print('\nНет данных для тестового инференса') print() except Exception as exc: print(str(exc)) def get_inference(self, data=None): try: return self._inference(data=data) except Exception as exc: print(str(exc)) return None def _inference(self, data=None): try: Model._add_in_result_list(type_model=self._type_model, ind_inference=self.get_ind_inference(), list_to_add=[]) self._prepare_data(data=data) print('Инференс' + self._shablon) prediction, probability = self._inference_func(data=self._data, model=self._model, mapping=self._classes, shablon=self._shablon) Model._add_in_result_list(type_model=self._type_model, ind_inference=self.get_ind_inference(), list_to_add=[prediction, probability]) self._post_data(prediction=prediction) gc.collect() return prediction, probability except Exception as exc: print(str(exc)) return None