добавил блок телеметрии в datas_processing

fft
Sergey Revyakin 2 weeks ago
parent 40d7e9ecde
commit b8ab9b366d

@ -1,149 +1,183 @@
import os import os
import io import io
import csv import csv
import itertools import time
import requests import itertools
import numpy as np import requests
from datetime import datetime import numpy as np
from datetime import datetime
def pack_elems(names, file_types, *elems):
if len(names) != len(file_types) or len(names) != len(elems): _telemetry_error_last_ts = 0.0
raise ValueError('Длин массивов имен и типов файлов и не совпадает с количество элементов для сохранения')
return {name: {'file_type': file_type, 'elem': elem} for name, file_type, elem in zip(names, file_types, elems)}
def pack_elems(names, file_types, *elems):
if len(names) != len(file_types) or len(names) != len(elems):
def agregator(freq, alarm): raise ValueError('Длин массивов имен и типов файлов и не совпадает с количество элементов для сохранения')
if alarm: return {name: {'file_type': file_type, 'elem': elem} for name, file_type, elem in zip(names, file_types, elems)}
amplitude = 9
else:
amplitude = 0 def agregator(freq, alarm):
if alarm:
data = {"freq": freq, amplitude = 9
"amplitude": amplitude else:
} amplitude = 0
return data
data = {"freq": freq,
"amplitude": amplitude
def send_data(data, localhost, localport, endpoint): }
""" return data
Отправка данных по POST на модуль сервер.
:param data: Данные для отправки.
:param localhost: Хост модуль сервера. def send_data(data, localhost, localport, endpoint):
:param localport: Порт модуль сервера. """
""" Отправка данных по POST на модуль сервер.
:param data: Данные для отправки.
def _post(port): :param localhost: Хост модуль сервера.
url = "http://{0}:{1}/{2}".format(localhost, port, endpoint) :param localport: Порт модуль сервера.
return requests.post(url, json=data), url """
try: def _post(port):
response, url = _post(localport) url = "http://{0}:{1}/{2}".format(localhost, port, endpoint)
if response.status_code == 200: return requests.post(url, json=data), url
print("Данные успешно отправлены и приняты!", url)
return try:
response, url = _post(localport)
# Частый кейс: порт 5000 занят локальным registry (DroneDetectPCSoft). if response.status_code == 200:
# Пробуем порт модуля сервера из env (например, 5010). print("Данные успешно отправлены и приняты!", url)
fallback_port = os.getenv('GENERAL_SERVER_PORT') return
if response.status_code == 404 and fallback_port and str(localport) != str(fallback_port):
response_fb, url_fb = _post(fallback_port) # Частый кейс: порт 5000 занят локальным registry (DroneDetectPCSoft).
if response_fb.status_code == 200: # Пробуем порт модуля сервера из env (например, 5010).
#print("Данные успешно отправлены и приняты!", url_fb) fallback_port = os.getenv('GENERAL_SERVER_PORT')
return if response.status_code == 404 and fallback_port and str(localport) != str(fallback_port):
print("Ошибка при отправке данных:", response_fb.status_code, url_fb) response_fb, url_fb = _post(fallback_port)
return if response_fb.status_code == 200:
return
print("Ошибка при отправке данных:", response.status_code, url) print("Ошибка при отправке данных:", response_fb.status_code, url_fb)
except Exception as e: return
print(str(e))
print("Ошибка при отправке данных:", response.status_code, url)
except Exception as e:
def save_data(path_to_save, freq, *args): print(str(e))
"""
Сохранение данных в csv файл. Используется для сохранения метрик и медиан сигнала на каналах с датой и временем
- для анализа. def send_telemetry(data, host, port, endpoint='telemetry', timeout_sec=0.30):
:param path_to_save: Путь для сохранения. """
:param freq: Обрабатываемая частота. Best-effort отправка телеметрии на отдельный telemetry-server.
:param args: Что сохраняем в файл. Ошибки намеренно не пробрасываются, чтобы не влиять на основной детект/аларм поток.
""" """
global _telemetry_error_last_ts
try:
if not os.path.exists(path_to_save): host = '' if host is None else str(host).strip()
print('Folder was created.') port = '' if port is None else str(port).strip()
os.makedirs(path_to_save) endpoint = str(endpoint or 'telemetry').strip().lstrip('/')
with open(path_to_save + 'data_' + str(freq) + '.csv', 'a', newline='') as f: if not host or not port:
writer = csv.writer(f) return
args2 = itertools.chain(*(arg if isinstance(arg, list) else [arg] for arg in args))
writer.writerow(args2) try:
print('Write csv.') url = f"http://{host}:{port}/{endpoint}"
response = requests.post(url, json=data, timeout=float(timeout_sec))
except Exception as e: if response.status_code == 200:
print(str(e)) return
now = time.time()
def prepare_folders_paths(path): if now - _telemetry_error_last_ts >= 10.0:
folders = path.split('/') print(f"telemetry http error: {response.status_code} {url}")
folders.pop() _telemetry_error_last_ts = now
folders = [elem + '/' for elem in folders] except Exception as exc:
print(folders) now = time.time()
cur_path = '' if now - _telemetry_error_last_ts >= 10.0:
print(cur_path) print(f"telemetry send failed: {exc}")
return folders, cur_path _telemetry_error_last_ts = now
def remote_save_data(conn, data, module_name, freq, share_folder, path_to_save): def save_data(path_to_save, freq, *args):
""" """
Сохранение данных (сигнала) в файл на удаленный диск. Сохранение данных в csv файл. Используется для сохранения метрик и медиан сигнала на каналах с датой и временем
:param conn: - для анализа.
:param data: :param path_to_save: Путь для сохранения.
:param module_name: :param freq: Обрабатываемая частота.
:param freq: :param args: Что сохраняем в файл.
:param share_folder: """
:param path_to_save:
:return: try:
""" if not os.path.exists(path_to_save):
# cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') print('Folder was created.')
# file_name = f'alarm_{module_name}_{freq}_{cur_datetime}.npy' os.makedirs(path_to_save)
# path = f"{path_to_save_medians}{module_name}/{str(freq)}/"
# path_to_file = f"{path}{file_name}" with open(path_to_save + 'data_' + str(freq) + '.csv', 'a', newline='') as f:
# print(path_to_file) writer = csv.writer(f)
# args2 = itertools.chain(*(arg if isinstance(arg, list) else [arg] for arg in args))
# folders, cur_path = prepare_folders_paths(path) writer.writerow(args2)
# print('Write csv.')
# buffer = io.BytesIO()
# np.save(buffer, data) except Exception as e:
# buffer.seek(0) print(str(e))
#
# for i in range(len(folders)):
# cur_path = cur_path + folders[i] def prepare_folders_paths(path):
# try: folders = path.split('/')
# conn.listPath(share_folder, cur_path) folders.pop()
# except Exception: folders = [elem + '/' for elem in folders]
# conn.createDirectory(share_folder, cur_path) print(folders)
# cur_path = ''
# conn.storeFile(share_folder, path_to_file, buffer) print(cur_path)
for name, values in data.items(): return folders, cur_path
elem_name = name
file_type = values['file_type']
elem_data = values['elem'] def remote_save_data(conn, data, module_name, freq, share_folder, path_to_save):
print(elem_data.shape) """
buffer = io.BytesIO() Сохранение данных (сигнала) в файл на удаленный диск.
np.save(buffer, elem_data) :param conn:
buffer.seek(0) :param data:
:param module_name:
cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S') :param freq:
file_name = f'alarm_{elem_name}_{module_name}_{freq}_{cur_datetime}.{file_type}' :param share_folder:
path = f"{path_to_save}{module_name}/{str(freq)}/{elem_name}/" :param path_to_save:
path_to_file = f"{path}{file_name}" :return:
folders, cur_path = prepare_folders_paths(path) """
# cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S')
for i in range(len(folders)): # file_name = f'alarm_{module_name}_{freq}_{cur_datetime}.npy'
cur_path = cur_path + folders[i] # path = f"{path_to_save_medians}{module_name}/{str(freq)}/"
try: # path_to_file = f"{path}{file_name}"
conn.listPath(share_folder, cur_path) # print(path_to_file)
except Exception: #
conn.createDirectory(share_folder, cur_path) # folders, cur_path = prepare_folders_paths(path)
#
conn.storeFile(share_folder, path_to_file, buffer) # buffer = io.BytesIO()
# np.save(buffer, data)
# buffer.seek(0)
#
# for i in range(len(folders)):
# cur_path = cur_path + folders[i]
# try:
# conn.listPath(share_folder, cur_path)
# except Exception:
# conn.createDirectory(share_folder, cur_path)
#
# conn.storeFile(share_folder, path_to_file, buffer)
for name, values in data.items():
elem_name = name
file_type = values['file_type']
elem_data = values['elem']
print(elem_data.shape)
buffer = io.BytesIO()
np.save(buffer, elem_data)
buffer.seek(0)
cur_datetime = datetime.now().strftime('%d_%m_%Y_%H_%M_%S')
file_name = f'alarm_{elem_name}_{module_name}_{freq}_{cur_datetime}.{file_type}'
path = f"{path_to_save}{module_name}/{str(freq)}/{elem_name}/"
path_to_file = f"{path}{file_name}"
folders, cur_path = prepare_folders_paths(path)
for i in range(len(folders)):
cur_path = cur_path + folders[i]
try:
conn.listPath(share_folder, cur_path)
except Exception:
conn.createDirectory(share_folder, cur_path)
conn.storeFile(share_folder, path_to_file, buffer)

Loading…
Cancel
Save