#! /usr/bin/env python # -*- coding: utf-8 -*- import os import json import re import httpx import asyncio import requests import websockets import socket import uuid from copy import deepcopy from fastapi import FastAPI from common.runtime import load_root_env, validate_env, as_bool, as_float, as_int, as_str from datetime import datetime, timedelta import logging logging.basicConfig(level=logging.INFO) app = FastAPI() ############################################################################ # VARIABLES ############################################################################ load_root_env(__file__) validate_env("src/server_to_master.py", { "lochost": as_str, "locport": as_int, "jamhost": as_str, "jamport": as_int, "master_server_ip": as_str, "master_server_port": as_int, "freqs": as_str, "num_of_clear_packs": as_int, "threshold_to_alarm": as_int, "time_to_jam": as_int, "time_to_fresh": as_int, "active_interval_to_send": as_int, "passive_interval_to_send": as_int, "jammer_timeout": as_int, "master_timeout": as_int, "debug_module_flag": as_bool, "send_to_module_flag": as_bool, "send_to_master_flag": as_bool, "send_to_jammer_flag": as_bool, "latitude": as_float, "longitude": as_float, }) lochost = os.getenv('lochost') locport = os.getenv('locport') jamhost = os.getenv('jamhost') jamport = os.getenv('jamport') master_server_ip = os.getenv('master_server_ip') master_server_port = os.getenv('master_server_port') freqs = [str(x) for x in os.getenv('freqs').split(',')] num_of_clear_packs = int(os.getenv('num_of_clear_packs')) threshold_to_alarm = int(os.getenv('threshold_to_alarm')) time_to_jam = int(os.getenv('time_to_jam')) time_to_fresh = int(os.getenv('time_to_fresh')) active_interval_to_send = int(os.getenv('active_interval_to_send')) passive_interval_to_send = int(os.getenv('passive_interval_to_send')) jammer_timeout = int(os.getenv('jammer_timeout')) master_timeout = int(os.getenv('master_timeout')) debug_module_flag = as_bool(os.getenv('debug_module_flag', '0')) send_to_module_flag = as_bool(os.getenv('send_to_module_flag', '0')) send_to_master_flag = as_bool(os.getenv('send_to_master_flag', '0')) send_to_jammer_flag = as_bool(os.getenv('send_to_jammer_flag', '0')) latitude = float(os.getenv('latitude')) longitude = float(os.getenv('longitude')) i = 0 flag = 0 max_len_bulk = 1 bulk_data = [] sending_data_task = None jam_server_connect = None alarm = False jammer_event = False data_queue = [None] * len(freqs) freqs_alarm = {freq: 0 for freq in freqs} #TODO: # 1. Вырезать flag, если нужно и возможно. # 2. Пофиксить костыль с asyncio.create_task(sending_data) - после jammer_event'a перестает подавать признаки жизни, # поэтому гасим, когда включается глушилка и запускаем заново, когда глушилка отключается. # 3. Пофиксить момент с data_queue:из-за асинхронных функций и старой реализации сервака происходит так ,что # прилетает пакет аларма, система это видит, хочет его отправить, начинает отправку и из-за того, что это немного # долгий процесс, то успевает прилететь чистый пакет и на мастер улетает чистый пакет. # 4. Добавить print, только если deub_module_flag. ############################################################################ # GPS MODULE - INACTIVE ############################################################################ # Создание планировщика # scheduler = BackgroundScheduler(daemon=True) # scheduler.start() # @app.route('/get_gps', methods = ['POST']) # @scheduler.scheduled_job(IntervalTrigger(minutes=1)) # def update_gps_coordinates(): # # data_gps = request.json # result = { # 'latitude': latitude, # 'longitude': longitude # } # try: # url = "http://{0}:{1}/data/gps/{2}".format(master_server_ip, master_server_port, mac_address) # response = requests.post(url, json=result) # if response.status_code == 200: # print('gps успешно отправлен') # else: # print('gps не был отправлен') # # except Exception: # print('gps не были отправлены из-за отстутствия сервера в поле видимости') # return result # # # @scheduler.scheduled_job(IntervalTrigger(seconds=10)) # def send_gps_to_master(): # try: # subprocess.run(["python3", "GPS_get_coords.py"]) # #mac_address = get_mac_address() # data_gps = update_gps_coordinates() # url = "http://{0}:{1}/data/gps/{2}".format(master_server_ip, master_server_port, mac_address) # response = requests.post(url, json=data_gps) # if response.status_code == 200: # print('gps успешно отправлен') # else: # print('gps не был отправлены по какой-то причине') # except Exception: # print('gps не были отправлены по причине отсутствия сервера в поле видимости') # ############################################################################ # MODULE RIGISTR ############################################################################ def _normalize_mac(value: str | None): if not value: return None value = value.strip().lower().replace('-', ':') if not re.fullmatch(r"[0-9a-f]{2}(:[0-9a-f]{2}){5}", value): return None return value def get_mac_address(interface='enp5s0'): """ Получить MAC текущего устройства. Приоритет: module_mac из env -> uuid.getnode(). """ env_mac = _normalize_mac(os.getenv('module_mac')) if env_mac: return env_mac try: mac_int = uuid.getnode() mac = ':'.join(f'{(mac_int >> shift) & 0xff:02x}' for shift in range(40, -1, -8)) return _normalize_mac(mac) except Exception as e: print('Ошибка при получении MAC-адреса:' + str(e)) return None def get_ip_address(interface='enp5s0'): """ Получить IP текущего устройства. Приоритет: module_ip из env -> исходящий IP до master_server. """ env_ip = os.getenv('module_ip') if env_ip: return env_ip.strip() try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.connect((master_server_ip, int(master_server_port))) return sock.getsockname()[0] except Exception as e: print('Ошибка при получении IP-адреса:' + str(e)) return None def register_module(): """ Регистрация модуля на мастер сервере. """ data = {'mac': get_mac_address(), 'ip': get_ip_address()} try: url = f"http://{master_server_ip}:{master_server_port}/module/register" response = requests.post(url, json=data) response.raise_for_status() # Проверка успешности запроса print("Модуль зарегистрирован успешно = ", data) except requests.exceptions.RequestException as e: flag = 1 print("Ошибка при регистрации модуля:" + str(e), data) ############################################################################ # SEND DATA TO MASTER ############################################################################ async def send_to_master(ModuleDataSingleV2, flag): """ Отправка данных на мастер по посту или через булк. По посту установлен лимит времени на отправку. В случае его превышения - данные не отправлены. В случа неудачи отправки по любому из методов - данные не отправлены. :param ModuleDataSingleV2: Пакет данных. :param flag: :return: """ mac_address = get_mac_address() if not mac_address: print('MAC адрес не определен, отправка на master пропущена') return async with httpx.AsyncClient() as client: try: if flag == 0: url = f"http://{master_server_ip}:{master_server_port}/data/single/{mac_address}" else: url = f"http://{master_server_ip}:{master_server_port}/data/bulk/{mac_address}" response = await client.post(url, json=ModuleDataSingleV2, timeout=master_timeout) if response.status_code == 200: print('Данные успешно отправлены') flag = 0 bulk_data.clear() else: flag = 1 if len(bulk_data) > max_len_bulk: # Если лимит bulk_data превышен, то удаляем первый элемент списка bulk_data.pop(0) bulk_data.append(ModuleDataSingleV2) print('Данные не были отправлены по какой-то причине') except (httpx.RequestError, asyncio.TimeoutError) as e: if len(bulk_data) > max_len_bulk: # Если лимит bulk_data превышен, то удаляем первый элемент списка bulk_data.pop(0) bulk_data.append(ModuleDataSingleV2) flag = 1 print('Данные не были отправлены по причине отсутствия сервера в поле видимости') ############################################################################ # PROCESS DATA ############################################################################ async def check_alarm(amplitude: int): """ Проверка амплитуды на превышение границы отработки системы. :param amplitude: Амплитуда. :return: Превышает/не превышает. """ if amplitude > threshold_to_alarm: return True else: return False async def agregate_data(data_to_agregate: list): """ Сбор пакета для отправки на мастер сервер. :param data_to_agregate: Список из частотных пакетов. Длина списка = количесту обнаруживаемых частот. Может содержать None, если частота ничего не присылает на модуль сервер. :return: Пакет данных для отправки на мастер. """ data = [] if any(item is not None for item in data_to_agregate): for item in data_to_agregate: if item is not None: item['freq'] = int(item['freq']) data.append(item) now = datetime.utcnow() - timedelta(seconds=2) now = now.strftime("%Y-%m-%d %H:%M:%S") data = { "registeredAt": now, "data": data } for i in range(len(freqs)): data_queue[i] = None return data async def sending_data(): #TODO: Надо по-хорошему нормально эту функцию переписать """ Отправка пакета данных на мастер сервер раз в некоторое время в определенном формате. Время отправки зависит от текущего статуса тревоги (аларм/не аларм). """ global i global alarm global jammer_event if i == 0: while True: i=1 print('while true!') ModuleDataSingleV2 = await agregate_data(deepcopy(data_queue)) if send_to_master_flag: print(f'На Мастер будет отправлена следующая информация: {ModuleDataSingleV2}') await send_to_master(ModuleDataSingleV2, flag) # Если перед отправкой на мастер все было чисто, то ждем 60 сек. # Если во время этих 60 сек. пришел пакет с алармом, то рассматриваем ситуации: if not alarm: for i in range(passive_interval_to_send, 0, -1): print('ТАЙМЕР ', i) await asyncio.sleep(1) if alarm: break # Если стоит флаг отправить данные на джеммер и при этом еще не был получен ивент на глушилку, то # отправляем на джеммер данные. elif alarm and send_to_jammer_flag and not jammer_event: if await send_jam_server_alarm(): print('Отправили на сервис подавления и все дошло успешно') else: print('Не смогли отправить на сервис подавления') # Сюда почему-то не заходит и вообще функция не подает признаков жизни после запуска подваителя(( if alarm and jammer_event: print('ПОДАВИТЕЛЬ РАБОТАЕТ РАЗБЕГАЙСЯ ААААААААААААААААААА') # В случае аларма ждем секунду перед новой отправкой данных. if alarm: await asyncio.sleep(active_interval_to_send) i = 0 @app.post('/waterfall') async def waterfall(data: dict): print('Received data: ', data) @app.post('/process_data') async def process_data(data: dict): """ Прием данных со скриптов детекции в формате data = {"freq": freq, "amplitude": amplitude } где freq - строка, amplitude - int и их первичная обработка. :param data: словарь с двумя ключами и значениями. """ global alarm print('Received data: ', data) data_dict = deepcopy(data) # Агрегируем N пакетов данных от частот в один общий список, он используется в функции agregate data. # Каждая позиция списка фиксируется за отдельной частотой. freq = str(data_dict.get('freq')) for i in range(len(freqs)): if freq == freqs[i]: #Так делаем потому, что сервак является центром принятия решений по триггеру. trigger = await check_alarm(data_dict['amplitude']) data_dict.update({'triggered': trigger}) data_queue[i] = deepcopy(data_dict) data_dict.clear() # Если прилетел триггер и глушилка не включена, то запускаем/обновляем счетчик чистых пакетов на этой # частоте. if trigger and not jammer_event: freqs_alarm[freq] = num_of_clear_packs print(f'freqs_alarm выглядит следующим обазом: {freqs_alarm}') #Если прилетел триггер и модуль еще не заалармлен, то алармим. if trigger and not alarm: print('Приелет триггерa со сканнера. Работаем, ребята!') alarm = True # Если прилетел триггер и модуль заалармлен, но при этом глушилка не работает и счетчик чистых пакетов # данной частоты не равен нулю, то уменьшаем его. А когда в словаре счетчиков все нули, то убираем alarm. elif not trigger and alarm and not jammer_event and freqs_alarm[freq] != 0: freqs_alarm[freq] -= 1 print(f'Чистый пакет. Уменьшаем в выбранной частоте: {freqs_alarm}') if all(value == 0 for value in freqs_alarm.values()): alarm = False print(f'Прилетело {num_of_clear_packs}. Отключаем аларм и freqs_alarm выглядит так: {freqs_alarm}') else: continue print('После получения данных data_queue выглядит следующим образом: ', data_queue) ############################################################################ # JAMMER ############################################################################ async def jammer_active(): """ Включение подавителя. Отменяем таску на отправку данных на мастер. Зануляем словарь чистых пакетов и объявляем о прилете ивента с сервиса подавления. """ global jammer_event global freqs_alarm global sending_data_task if sending_data_task is not None: sending_data_task.cancel() freqs_alarm = {freq: 0 for freq in freqs} jammer_event = True print('АКТИВИРУЕМ ПОДАВИТЕЛЬ ААААААААААААААААААААААААААААААААААААААААААААААА!!!!') print('-' * 20) print('Статус по переменным:') print(f'freqs_alarm: {freqs_alarm}') print(f'jammer_event: {jammer_event}') print(f'alarm: {alarm}') print('-' * 20) async def jammer_deactive(): """ Отключение подавителя. Отрубаем аларм на модуле, отрубаем ивент сервера подавителей и запускаем таску отправки данных на мастер. :return: """ global jammer_event global alarm global sending_data_task alarm = False jammer_event = False sending_data_task = asyncio.create_task(sending_data()) print('ОТКЛЮАЕМ ПОДАВИТЕЛЬ ААААААААААААААААААААААААААААААААААААААААААААААААА!!!!') print('-' * 20) print('Статус по переменным:') print(f'freqs_alarm: {freqs_alarm}') print(f'jammer_event: {jammer_event}') print(f'alarm: {alarm}') print('-' * 20) async def send_jam_server_alarm(): """ Отправка алармовского пакета на сервер подавления по вебсокету. На отправку дается jammer_timeout секунд. При неудаче - данные не отправлены. """ global jam_server_connect msg = {'type': 'freq_alarm', 'data': True} if jam_server_connect: try: await jam_server_connect.send(json.dumps(msg)) await asyncio.wait_for(jam_server_connect.recv(), jammer_timeout) return True except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as e: print(f"WebSocket error or timeout: {e}") return False else: return False async def jam_server(): """ Прием данных по вебсокету с сервера подавления и их обработка. Включение/отключение подавителя. При разрыве соединения принудительно отключаем подавитель. """ uri = f'ws://{jamhost}:{jamport}/ws' global jam_server_connect while True: try: jam_server_connect = await websockets.connect(uri) while True: data_from_jam_server = await jam_server_connect.recv() data_from_jam_server = json.loads(data_from_jam_server) print('Принял с сервера глушилок: ', data_from_jam_server) if data_from_jam_server['type'] == 'run': alarm_status = (data_from_jam_server['data'])['state'] print(alarm_status) if alarm_status: await jammer_active() else: await jammer_deactive() except Exception as e: jam_server_connect = None if jammer_event: await jammer_deactive() @app.on_event("startup") async def startup_event(): """ Запускаем параллельно задачи jam_server и sending_data. """ global sending_data_task asyncio.create_task(jam_server()) sending_data_task = asyncio.create_task(sending_data()) if __name__ == '__main__': import uvicorn # update_gps_coordinates() register_module() # Регистрация модуля на сервере uvicorn.run(app, host=lochost, port=int(locport))