From b8ab9b366dcb2e70a2c95f5c5af33b355471d335 Mon Sep 17 00:00:00 2001 From: Sergey Revyakin Date: Fri, 6 Mar 2026 12:03:10 +0700 Subject: [PATCH] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20?= =?UTF-8?q?=D0=B1=D0=BB=D0=BE=D0=BA=20=D1=82=D0=B5=D0=BB=D0=B5=D0=BC=D0=B5?= =?UTF-8?q?=D1=82=D1=80=D0=B8=D0=B8=20=D0=B2=20datas=5Fprocessing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/utils/datas_processing.py | 332 +++++++++++++++++++--------------- 1 file changed, 183 insertions(+), 149 deletions(-) diff --git a/src/utils/datas_processing.py b/src/utils/datas_processing.py index 6676fe8..4e51a6d 100644 --- a/src/utils/datas_processing.py +++ b/src/utils/datas_processing.py @@ -1,149 +1,183 @@ -import os -import io -import csv -import itertools -import requests -import numpy as np -from datetime import datetime - - -def pack_elems(names, file_types, *elems): - if len(names) != len(file_types) or len(names) != len(elems): - raise ValueError('Длин массивов имен и типов файлов и не совпадает с количество элементов для сохранения') - return {name: {'file_type': file_type, 'elem': elem} for name, file_type, elem in zip(names, file_types, elems)} - - -def agregator(freq, alarm): - if alarm: - amplitude = 9 - else: - amplitude = 0 - - data = {"freq": freq, - "amplitude": amplitude - } - return data - - -def send_data(data, localhost, localport, endpoint): - """ - Отправка данных по POST на модуль сервер. - :param data: Данные для отправки. - :param localhost: Хост модуль сервера. - :param localport: Порт модуль сервера. - """ - - def _post(port): - url = "http://{0}:{1}/{2}".format(localhost, port, endpoint) - return requests.post(url, json=data), url - - try: - response, url = _post(localport) - if response.status_code == 200: - print("Данные успешно отправлены и приняты!", url) - return - - # Частый кейс: порт 5000 занят локальным registry (DroneDetectPCSoft). - # Пробуем порт модуля сервера из env (например, 5010). - fallback_port = os.getenv('GENERAL_SERVER_PORT') - if response.status_code == 404 and fallback_port and str(localport) != str(fallback_port): - response_fb, url_fb = _post(fallback_port) - if response_fb.status_code == 200: - #print("Данные успешно отправлены и приняты!", url_fb) - return - print("Ошибка при отправке данных:", response_fb.status_code, url_fb) - return - - print("Ошибка при отправке данных:", response.status_code, url) - except Exception as e: - print(str(e)) - - -def save_data(path_to_save, freq, *args): - """ - Сохранение данных в csv файл. Используется для сохранения метрик и медиан сигнала на каналах с датой и временем - - для анализа. - :param path_to_save: Путь для сохранения. - :param freq: Обрабатываемая частота. - :param args: Что сохраняем в файл. - """ - - try: - if not os.path.exists(path_to_save): - print('Folder was created.') - os.makedirs(path_to_save) - - with open(path_to_save + 'data_' + str(freq) + '.csv', 'a', newline='') as f: - writer = csv.writer(f) - args2 = itertools.chain(*(arg if isinstance(arg, list) else [arg] for arg in args)) - writer.writerow(args2) - print('Write csv.') - - except Exception as e: - print(str(e)) - - -def prepare_folders_paths(path): - folders = path.split('/') - folders.pop() - folders = [elem + '/' for elem in folders] - print(folders) - cur_path = '' - print(cur_path) - return folders, cur_path - - -def remote_save_data(conn, data, module_name, freq, share_folder, path_to_save): - """ - Сохранение данных (сигнала) в файл на удаленный диск. - :param conn: - :param data: - :param module_name: - :param freq: - :param share_folder: - :param path_to_save: - :return: - """ - # cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') - # file_name = f'alarm_{module_name}_{freq}_{cur_datetime}.npy' - # path = f"{path_to_save_medians}{module_name}/{str(freq)}/" - # path_to_file = f"{path}{file_name}" - # print(path_to_file) - # - # folders, cur_path = prepare_folders_paths(path) - # - # buffer = io.BytesIO() - # np.save(buffer, data) - # buffer.seek(0) - # - # for i in range(len(folders)): - # cur_path = cur_path + folders[i] - # try: - # conn.listPath(share_folder, cur_path) - # except Exception: - # conn.createDirectory(share_folder, cur_path) - # - # conn.storeFile(share_folder, path_to_file, buffer) - for name, values in data.items(): - elem_name = name - file_type = values['file_type'] - elem_data = values['elem'] - print(elem_data.shape) - buffer = io.BytesIO() - np.save(buffer, elem_data) - buffer.seek(0) - - cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') - file_name = f'alarm_{elem_name}_{module_name}_{freq}_{cur_datetime}.{file_type}' - path = f"{path_to_save}{module_name}/{str(freq)}/{elem_name}/" - path_to_file = f"{path}{file_name}" - folders, cur_path = prepare_folders_paths(path) - - for i in range(len(folders)): - cur_path = cur_path + folders[i] - try: - conn.listPath(share_folder, cur_path) - except Exception: - conn.createDirectory(share_folder, cur_path) - - conn.storeFile(share_folder, path_to_file, buffer) +import os +import io +import csv +import time +import itertools +import requests +import numpy as np +from datetime import datetime + + +_telemetry_error_last_ts = 0.0 + + +def pack_elems(names, file_types, *elems): + if len(names) != len(file_types) or len(names) != len(elems): + raise ValueError('Длин массивов имен и типов файлов и не совпадает с количество элементов для сохранения') + return {name: {'file_type': file_type, 'elem': elem} for name, file_type, elem in zip(names, file_types, elems)} + + +def agregator(freq, alarm): + if alarm: + amplitude = 9 + else: + amplitude = 0 + + data = {"freq": freq, + "amplitude": amplitude + } + return data + + +def send_data(data, localhost, localport, endpoint): + """ + Отправка данных по POST на модуль сервер. + :param data: Данные для отправки. + :param localhost: Хост модуль сервера. + :param localport: Порт модуль сервера. + """ + + def _post(port): + url = "http://{0}:{1}/{2}".format(localhost, port, endpoint) + return requests.post(url, json=data), url + + try: + response, url = _post(localport) + if response.status_code == 200: + print("Данные успешно отправлены и приняты!", url) + return + + # Частый кейс: порт 5000 занят локальным registry (DroneDetectPCSoft). + # Пробуем порт модуля сервера из env (например, 5010). + fallback_port = os.getenv('GENERAL_SERVER_PORT') + if response.status_code == 404 and fallback_port and str(localport) != str(fallback_port): + response_fb, url_fb = _post(fallback_port) + if response_fb.status_code == 200: + return + print("Ошибка при отправке данных:", response_fb.status_code, url_fb) + return + + print("Ошибка при отправке данных:", response.status_code, url) + except Exception as e: + print(str(e)) + + +def send_telemetry(data, host, port, endpoint='telemetry', timeout_sec=0.30): + """ + Best-effort отправка телеметрии на отдельный telemetry-server. + Ошибки намеренно не пробрасываются, чтобы не влиять на основной детект/аларм поток. + """ + global _telemetry_error_last_ts + + host = '' if host is None else str(host).strip() + port = '' if port is None else str(port).strip() + endpoint = str(endpoint or 'telemetry').strip().lstrip('/') + + if not host or not port: + return + + try: + url = f"http://{host}:{port}/{endpoint}" + response = requests.post(url, json=data, timeout=float(timeout_sec)) + if response.status_code == 200: + return + + now = time.time() + if now - _telemetry_error_last_ts >= 10.0: + print(f"telemetry http error: {response.status_code} {url}") + _telemetry_error_last_ts = now + except Exception as exc: + now = time.time() + if now - _telemetry_error_last_ts >= 10.0: + print(f"telemetry send failed: {exc}") + _telemetry_error_last_ts = now + + +def save_data(path_to_save, freq, *args): + """ + Сохранение данных в csv файл. Используется для сохранения метрик и медиан сигнала на каналах с датой и временем + - для анализа. + :param path_to_save: Путь для сохранения. + :param freq: Обрабатываемая частота. + :param args: Что сохраняем в файл. + """ + + try: + if not os.path.exists(path_to_save): + print('Folder was created.') + os.makedirs(path_to_save) + + with open(path_to_save + 'data_' + str(freq) + '.csv', 'a', newline='') as f: + writer = csv.writer(f) + args2 = itertools.chain(*(arg if isinstance(arg, list) else [arg] for arg in args)) + writer.writerow(args2) + print('Write csv.') + + except Exception as e: + print(str(e)) + + +def prepare_folders_paths(path): + folders = path.split('/') + folders.pop() + folders = [elem + '/' for elem in folders] + print(folders) + cur_path = '' + print(cur_path) + return folders, cur_path + + +def remote_save_data(conn, data, module_name, freq, share_folder, path_to_save): + """ + Сохранение данных (сигнала) в файл на удаленный диск. + :param conn: + :param data: + :param module_name: + :param freq: + :param share_folder: + :param path_to_save: + :return: + """ + # cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') + # file_name = f'alarm_{module_name}_{freq}_{cur_datetime}.npy' + # path = f"{path_to_save_medians}{module_name}/{str(freq)}/" + # path_to_file = f"{path}{file_name}" + # print(path_to_file) + # + # folders, cur_path = prepare_folders_paths(path) + # + # buffer = io.BytesIO() + # np.save(buffer, data) + # buffer.seek(0) + # + # for i in range(len(folders)): + # cur_path = cur_path + folders[i] + # try: + # conn.listPath(share_folder, cur_path) + # except Exception: + # conn.createDirectory(share_folder, cur_path) + # + # conn.storeFile(share_folder, path_to_file, buffer) + for name, values in data.items(): + elem_name = name + file_type = values['file_type'] + elem_data = values['elem'] + print(elem_data.shape) + buffer = io.BytesIO() + np.save(buffer, elem_data) + buffer.seek(0) + + cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') + file_name = f'alarm_{elem_name}_{module_name}_{freq}_{cur_datetime}.{file_type}' + path = f"{path_to_save}{module_name}/{str(freq)}/{elem_name}/" + path_to_file = f"{path}{file_name}" + folders, cur_path = prepare_folders_paths(path) + + for i in range(len(folders)): + cur_path = cur_path + folders[i] + try: + conn.listPath(share_folder, cur_path) + except Exception: + conn.createDirectory(share_folder, cur_path) + + conn.storeFile(share_folder, path_to_file, buffer)