import os import io import csv import itertools import requests import numpy as np from datetime import datetime def pack_elems(names, file_types, *elems): if len(names) != len(file_types) or len(names) != len(elems): raise ValueError('Длин массивов имен и типов файлов и не совпадает с количество элементов для сохранения') return {name: {'file_type': file_type, 'elem': elem} for name, file_type, elem in zip(names, file_types, elems)} def agregator(freq, alarm): if alarm: amplitude = 9 else: amplitude = 0 data = {"freq": freq, "amplitude": amplitude } return data def send_data(data, localhost, localport, endpoint): """ Отправка данных по POST на модуль сервер. :param data: Данные для отправки. :param localhost: Хост модуль сервера. :param localport: Порт модуль сервера. """ def _post(port): url = "http://{0}:{1}/{2}".format(localhost, port, endpoint) return requests.post(url, json=data), url try: response, url = _post(localport) if response.status_code == 200: print("Данные успешно отправлены и приняты!", url) return # Частый кейс: порт 5000 занят локальным registry (DroneDetectPCSoft). # Пробуем порт модуля сервера из env (например, 5010). fallback_port = os.getenv('GENERAL_SERVER_PORT') if response.status_code == 404 and fallback_port and str(localport) != str(fallback_port): response_fb, url_fb = _post(fallback_port) if response_fb.status_code == 200: print("Данные успешно отправлены и приняты!", url_fb) return print("Ошибка при отправке данных:", response_fb.status_code, url_fb) return print("Ошибка при отправке данных:", response.status_code, url) except Exception as e: print(str(e)) def save_data(path_to_save, freq, *args): """ Сохранение данных в csv файл. Используется для сохранения метрик и медиан сигнала на каналах с датой и временем - для анализа. :param path_to_save: Путь для сохранения. :param freq: Обрабатываемая частота. :param args: Что сохраняем в файл. """ try: if not os.path.exists(path_to_save): print('Folder was created.') os.makedirs(path_to_save) with open(path_to_save + 'data_' + str(freq) + '.csv', 'a', newline='') as f: writer = csv.writer(f) args2 = itertools.chain(*(arg if isinstance(arg, list) else [arg] for arg in args)) writer.writerow(args2) print('Write csv.') except Exception as e: print(str(e)) def prepare_folders_paths(path): folders = path.split('/') folders.pop() folders = [elem + '/' for elem in folders] print(folders) cur_path = '' print(cur_path) return folders, cur_path def remote_save_data(conn, data, module_name, freq, share_folder, path_to_save): """ Сохранение данных (сигнала) в файл на удаленный диск. :param conn: :param data: :param module_name: :param freq: :param share_folder: :param path_to_save: :return: """ # cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') # file_name = f'alarm_{module_name}_{freq}_{cur_datetime}.npy' # path = f"{path_to_save_medians}{module_name}/{str(freq)}/" # path_to_file = f"{path}{file_name}" # print(path_to_file) # # folders, cur_path = prepare_folders_paths(path) # # buffer = io.BytesIO() # np.save(buffer, data) # buffer.seek(0) # # for i in range(len(folders)): # cur_path = cur_path + folders[i] # try: # conn.listPath(share_folder, cur_path) # except Exception: # conn.createDirectory(share_folder, cur_path) # # conn.storeFile(share_folder, path_to_file, buffer) for name, values in data.items(): elem_name = name file_type = values['file_type'] elem_data = values['elem'] print(elem_data.shape) buffer = io.BytesIO() np.save(buffer, elem_data) buffer.seek(0) cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') file_name = f'alarm_{elem_name}_{module_name}_{freq}_{cur_datetime}.{file_type}' path = f"{path_to_save}{module_name}/{str(freq)}/{elem_name}/" path_to_file = f"{path}{file_name}" folders, cur_path = prepare_folders_paths(path) for i in range(len(folders)): cur_path = cur_path + folders[i] try: conn.listPath(share_folder, cur_path) except Exception: conn.createDirectory(share_folder, cur_path) conn.storeFile(share_folder, path_to_file, buffer)