diff --git a/NN_server/server.py b/NN_server/server.py index 1900052..b1dc7db 100644 --- a/NN_server/server.py +++ b/NN_server/server.py @@ -14,6 +14,7 @@ import shutil import json import gc import logging +import time TORCHSIG_PATH = "/app/torchsig" if TORCHSIG_PATH not in sys.path: @@ -164,6 +165,10 @@ gen_server_ip = config['GENERAL_SERVER_IP'] gen_server_port = config['GENERAL_SERVER_PORT'] drone_streaks = {} MODEL_SPECS = build_model_specs() +INFERENCE_TELEMETRY_HOST = os.getenv('telemetry_host', '127.0.0.1') +INFERENCE_TELEMETRY_PORT = os.getenv('telemetry_port', '5020') +INFERENCE_TELEMETRY_ENDPOINT = os.getenv('telemetry_inference_endpoint', 'inference/result') +INFERENCE_TELEMETRY_TIMEOUT_SEC = float(os.getenv('telemetry_inference_timeout_sec', '0.30')) def recreate_directory(path): @@ -172,6 +177,40 @@ def recreate_directory(path): os.makedirs(path, exist_ok=True) +def get_result_dir(): + if not MODEL_SPECS: + return '' + return MODEL_SPECS[0]['src_result'] + + +def collect_inference_images(result_id): + result_dir = get_result_dir() + if not result_dir or not os.path.isdir(result_dir): + return [] + + needle = f"_inference_{result_id}_" + images = [] + for name in sorted(os.listdir(result_dir)): + if needle in name and name.endswith('.png'): + images.append(name) + return images + + +def send_inference_result(payload): + try: + requests.post( + "http://{0}:{1}/{2}".format( + INFERENCE_TELEMETRY_HOST, + INFERENCE_TELEMETRY_PORT, + INFERENCE_TELEMETRY_ENDPOINT.lstrip('/'), + ), + json=payload, + timeout=INFERENCE_TELEMETRY_TIMEOUT_SEC, + ) + except Exception as exc: + print(str(exc)) + + def init_data_for_inference(): try: if MODEL_SPECS: @@ -227,9 +266,12 @@ def find_model_for_freq(freq): def receive_data(): try: print() - data = json.loads(request.json) + data = request.json + if isinstance(data, str): + data = json.loads(data) print('#' * 100) print('Получен пакет ' + str(Model.get_ind_inference())) + result_id = Model.get_ind_inference() freq = int(data['freq']) print('Частота: ' + str(freq)) @@ -256,6 +298,18 @@ def receive_data(): result_msg[str(model.get_model_name())]['drone_probability'] = str(drone_probability) result_msg[str(model.get_model_name())]['drone_threshold'] = str(get_required_drone_prob(freq)) prediction_list.append(prediction) + inference_event = { + 'result_id': result_id, + 'ts': time.time(), + 'freq': str(freq), + 'model': model.get_model_name(), + 'prediction': prediction, + 'probability': float(probability), + 'drone_probability': drone_probability, + 'drone_threshold': str(get_required_drone_prob(freq)), + 'images': collect_inference_images(result_id), + } + send_inference_result(inference_event) print('-' * 100) print() diff --git a/deploy/docker/docker-compose.yml b/deploy/docker/docker-compose.yml index 62aa353..50a5a7b 100644 --- a/deploy/docker/docker-compose.yml +++ b/deploy/docker/docker-compose.yml @@ -97,6 +97,7 @@ services: - ../../.env:/app/.env:ro - ../../telemetry:/app/telemetry - ../../common:/app/common + - ../../NN_server/result:/app/inference_result:ro networks: - dronedetector-net diff --git a/telemetry/telemetry_server.py b/telemetry/telemetry_server.py index d507fb4..f3a5796 100644 --- a/telemetry/telemetry_server.py +++ b/telemetry/telemetry_server.py @@ -2,10 +2,11 @@ import asyncio import os import time from collections import defaultdict, deque +from pathlib import Path from typing import Any, Deque, Dict, List, Optional -from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect -from fastapi.responses import HTMLResponse +from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect +from fastapi.responses import FileResponse, HTMLResponse from pydantic import BaseModel, Field from common.runtime import load_root_env @@ -16,15 +17,24 @@ TELEMETRY_BIND_HOST = os.getenv('telemetry_bind_host', os.getenv('lochost', '0.0 TELEMETRY_BIND_PORT = int(os.getenv('telemetry_bind_port', os.getenv('telemetry_port', '5020'))) TELEMETRY_HISTORY_SEC = int(float(os.getenv('telemetry_history_sec', '900'))) TELEMETRY_MAX_POINTS_PER_FREQ = int(os.getenv('telemetry_max_points_per_freq', '5000')) +INFERENCE_HISTORY_SEC = int(float(os.getenv('inference_history_sec', str(TELEMETRY_HISTORY_SEC)))) +INFERENCE_MAX_RESULTS_PER_FREQ = int(os.getenv('inference_max_results_per_freq', '100')) +INFERENCE_RESULT_DIR = Path(os.getenv('inference_result_dir', '/app/inference_result')).resolve() def _new_buffer() -> Deque[Dict[str, Any]]: return deque(maxlen=TELEMETRY_MAX_POINTS_PER_FREQ) +def _new_inference_buffer() -> Deque[Dict[str, Any]]: + return deque(maxlen=INFERENCE_MAX_RESULTS_PER_FREQ) + + app = FastAPI(title='DroneDetector Telemetry Server') _buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_buffer) _ws_clients: List[WebSocket] = [] +_inference_buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_inference_buffer) +_inference_ws_clients: List[WebSocket] = [] _state_lock = asyncio.Lock() @@ -41,6 +51,18 @@ class TelemetryPoint(BaseModel): alarm_channels: Optional[List[int]] = None +class InferenceResult(BaseModel): + result_id: int + ts: float = Field(default_factory=lambda: time.time()) + freq: str + model: str + prediction: str + probability: float + drone_probability: Optional[float] = None + drone_threshold: Optional[str] = None + images: List[str] = Field(default_factory=list) + + def _prune_freq_locked(freq: str, now_ts: float) -> None: cutoff = now_ts - TELEMETRY_HISTORY_SEC buf = _buffers[freq] @@ -62,6 +84,40 @@ def _copy_series_locked(seconds: int, freq: Optional[str] = None) -> Dict[str, L return series +def _prune_inference_freq_locked(freq: str, now_ts: float) -> None: + cutoff = now_ts - INFERENCE_HISTORY_SEC + buf = _inference_buffers[freq] + while buf and float(buf[0].get('ts', 0.0)) < cutoff: + buf.popleft() + + +def _copy_inference_series_locked(limit: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]: + now_ts = time.time() + cutoff = now_ts - INFERENCE_HISTORY_SEC + + def _slice(buf: Deque[Dict[str, Any]]) -> List[Dict[str, Any]]: + recent = [item for item in buf if float(item.get('ts', 0.0)) >= cutoff] + return recent[-limit:] + + if freq is not None: + return {freq: _slice(_inference_buffers.get(freq, deque()))} + + series: Dict[str, List[Dict[str, Any]]] = {} + for key, buf in _inference_buffers.items(): + series[key] = _slice(buf) + return series + + +def _sanitize_image_names(names: List[str]) -> List[str]: + safe_names: List[str] = [] + for name in names: + base = Path(str(name)).name + if not base or not base.endswith('.png'): + continue + safe_names.append(base) + return safe_names + + async def _broadcast(message: Dict[str, Any]) -> None: dead: List[WebSocket] = [] for ws in list(_ws_clients): @@ -77,6 +133,21 @@ async def _broadcast(message: Dict[str, Any]) -> None: _ws_clients.remove(ws) +async def _broadcast_inference(message: Dict[str, Any]) -> None: + dead: List[WebSocket] = [] + for ws in list(_inference_ws_clients): + try: + await ws.send_json(message) + except Exception: + dead.append(ws) + + if dead: + async with _state_lock: + for ws in dead: + if ws in _inference_ws_clients: + _inference_ws_clients.remove(ws) + + @app.post('/telemetry') async def ingest_telemetry(point: TelemetryPoint): payload = point.model_dump() @@ -102,6 +173,31 @@ async def telemetry_history( return {'seconds': seconds, 'series': series} +@app.post('/inference/result') +async def ingest_inference_result(result: InferenceResult): + payload = result.model_dump() + payload['images'] = _sanitize_image_names(payload.get('images', [])) + freq = str(payload['freq']) + now_ts = time.time() + + async with _state_lock: + _inference_buffers[freq].append(payload) + _prune_inference_freq_locked(freq, now_ts) + + await _broadcast_inference({'type': 'inference_result', 'data': payload}) + return {'ok': True} + + +@app.get('/inference/history') +async def inference_history( + freq: Optional[str] = Query(default=None), + limit: int = Query(default=20, ge=1, le=200), +): + async with _state_lock: + series = _copy_inference_series_locked(limit=limit, freq=freq) + return {'limit': limit, 'series': series} + + @app.websocket('/telemetry/ws') async def telemetry_ws(websocket: WebSocket): await websocket.accept() @@ -123,6 +219,26 @@ async def telemetry_ws(websocket: WebSocket): _ws_clients.remove(websocket) +@app.websocket('/inference/ws') +async def inference_ws(websocket: WebSocket): + await websocket.accept() + async with _state_lock: + _inference_ws_clients.append(websocket) + snapshot = _copy_inference_series_locked(limit=20, freq=None) + + await websocket.send_json({'type': 'snapshot', 'data': snapshot}) + + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + async with _state_lock: + if websocket in _inference_ws_clients: + _inference_ws_clients.remove(websocket) + + MONITOR_HTML = """ @@ -453,12 +569,294 @@ loadInitial().then(connectWs).catch((e) => { """ +INFERENCE_VIEWER_HTML = """ + + + + + + DroneDetector Inference Viewer + + + +
+
+
+

DroneDetector Inference Viewer

+
Latest inference card per frequency. Browser keeps last 20 results per frequency.
+
+
+
connecting...
+ +
+
+
+
+ + + + +""" + + @app.get('/', response_class=HTMLResponse) @app.get('/monitor', response_class=HTMLResponse) async def monitor_page(): return HTMLResponse(content=MONITOR_HTML) +@app.get('/inference-viewer', response_class=HTMLResponse) +async def inference_viewer_page(): + return HTMLResponse(content=INFERENCE_VIEWER_HTML) + + +@app.get('/inference/images/{filename}') +async def inference_image(filename: str): + safe_name = Path(filename).name + if safe_name != filename: + raise HTTPException(status_code=404, detail='image not found') + + image_path = (INFERENCE_RESULT_DIR / safe_name).resolve() + if image_path.parent != INFERENCE_RESULT_DIR or not image_path.is_file(): + raise HTTPException(status_code=404, detail='image not found') + + return FileResponse(image_path) + + if __name__ == '__main__': import uvicorn