import asyncio import os import re import time from collections import defaultdict, deque from pathlib import Path from typing import Any, Deque, Dict, List, Optional from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse, HTMLResponse from pydantic import BaseModel, Field from common.runtime import load_root_env load_root_env(__file__) TELEMETRY_BIND_HOST = os.getenv('telemetry_bind_host', os.getenv('lochost', '0.0.0.0')) TELEMETRY_BIND_PORT = int(os.getenv('telemetry_bind_port', os.getenv('telemetry_port', '5020'))) TELEMETRY_HISTORY_SEC = int(float(os.getenv('telemetry_history_sec', '900'))) TELEMETRY_MAX_POINTS_PER_FREQ = int(os.getenv('telemetry_max_points_per_freq', '5000')) INFERENCE_HISTORY_SEC = int(float(os.getenv('inference_history_sec', str(TELEMETRY_HISTORY_SEC)))) INFERENCE_MAX_RESULTS_PER_FREQ = int(os.getenv('inference_max_results_per_freq', '100')) INFERENCE_RESULT_DIR = Path(os.getenv('inference_result_dir', '/app/inference_result')).resolve() INFERENCE_IMAGE_RE = re.compile(r"_inference_(\d+)_") def _new_buffer() -> Deque[Dict[str, Any]]: return deque(maxlen=TELEMETRY_MAX_POINTS_PER_FREQ) def _new_inference_buffer() -> Deque[Dict[str, Any]]: return deque(maxlen=INFERENCE_MAX_RESULTS_PER_FREQ) app = FastAPI(title='DroneDetector Telemetry Server') _buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_buffer) _ws_clients: List[WebSocket] = [] _inference_buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_inference_buffer) _inference_ws_clients: List[WebSocket] = [] _state_lock = asyncio.Lock() class TelemetryPoint(BaseModel): freq: str ts: float = Field(default_factory=lambda: time.time()) dbfs_current: float dbfs_threshold: Optional[float] = None alarm: bool = False channel_idx: int = 0 channels_total: int = 1 channel_values: Optional[List[float]] = None channel_thresholds: Optional[List[Optional[float]]] = None alarm_channels: Optional[List[int]] = None class InferenceResult(BaseModel): result_id: int ts: float = Field(default_factory=lambda: time.time()) freq: str model: str prediction: str probability: float drone_probability: Optional[float] = None drone_threshold: Optional[str] = None images: List[str] = Field(default_factory=list) def _prune_freq_locked(freq: str, now_ts: float) -> None: cutoff = now_ts - TELEMETRY_HISTORY_SEC buf = _buffers[freq] while buf and float(buf[0].get('ts', 0.0)) < cutoff: buf.popleft() def _copy_series_locked(seconds: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]: now_ts = time.time() cutoff = now_ts - seconds if freq is not None: data = [point for point in _buffers.get(freq, []) if float(point.get('ts', 0.0)) >= cutoff] return {freq: data} series: Dict[str, List[Dict[str, Any]]] = {} for key, buf in _buffers.items(): series[key] = [point for point in buf if float(point.get('ts', 0.0)) >= cutoff] return series def _prune_inference_freq_locked(freq: str, now_ts: float) -> None: cutoff = now_ts - INFERENCE_HISTORY_SEC buf = _inference_buffers[freq] while buf and float(buf[0].get('ts', 0.0)) < cutoff: buf.popleft() def _copy_inference_series_locked(limit: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]: now_ts = time.time() cutoff = now_ts - INFERENCE_HISTORY_SEC def _slice(buf: Deque[Dict[str, Any]]) -> List[Dict[str, Any]]: recent = [item for item in buf if float(item.get('ts', 0.0)) >= cutoff] return recent[-limit:] if freq is not None: return {freq: _slice(_inference_buffers.get(freq, deque()))} series: Dict[str, List[Dict[str, Any]]] = {} for key, buf in _inference_buffers.items(): series[key] = _slice(buf) return series def _sanitize_image_names(names: List[str]) -> List[str]: safe_names: List[str] = [] for name in names: base = Path(str(name)).name if not base or not base.endswith('.png'): continue safe_names.append(base) return safe_names def _resolve_latest_images_for_model(payload: Dict[str, Any]) -> Dict[str, Any]: model_name = str(payload.get('model', '')) if not model_name or not INFERENCE_RESULT_DIR.is_dir(): payload['images'] = _sanitize_image_names(payload.get('images', [])) return payload model_suffix = f"_{model_name}.png" grouped: Dict[int, List[str]] = {} for path in INFERENCE_RESULT_DIR.iterdir(): if not path.is_file(): continue name = path.name if not name.endswith(model_suffix): continue match = INFERENCE_IMAGE_RE.search(name) if match is None: continue grouped.setdefault(int(match.group(1)), []).append(name) if not grouped: payload['images'] = _sanitize_image_names(payload.get('images', [])) return payload current_id = int(payload.get('result_id', 0) or 0) if current_id in grouped: payload['images'] = sorted(grouped[current_id]) else: payload['images'] = _sanitize_image_names(payload.get('images', [])) return payload async def _broadcast(message: Dict[str, Any]) -> None: dead: List[WebSocket] = [] for ws in list(_ws_clients): try: await ws.send_json(message) except Exception: dead.append(ws) if dead: async with _state_lock: for ws in dead: if ws in _ws_clients: _ws_clients.remove(ws) async def _broadcast_inference(message: Dict[str, Any]) -> None: dead: List[WebSocket] = [] for ws in list(_inference_ws_clients): try: await ws.send_json(message) except Exception: dead.append(ws) if dead: async with _state_lock: for ws in dead: if ws in _inference_ws_clients: _inference_ws_clients.remove(ws) @app.post('/telemetry') async def ingest_telemetry(point: TelemetryPoint): payload = point.model_dump() freq = str(payload['freq']) now_ts = time.time() async with _state_lock: _buffers[freq].append(payload) _prune_freq_locked(freq, now_ts) await _broadcast({'type': 'point', 'data': payload}) return {'ok': True} @app.get('/telemetry/history') async def telemetry_history( freq: Optional[str] = Query(default=None), seconds: int = Query(default=300, ge=10, le=86400), ): seconds = min(seconds, TELEMETRY_HISTORY_SEC) async with _state_lock: series = _copy_series_locked(seconds=seconds, freq=freq) return {'seconds': seconds, 'series': series} @app.post('/inference/result') async def ingest_inference_result(result: InferenceResult): payload = result.model_dump() payload = _resolve_latest_images_for_model(payload) freq = str(payload['freq']) now_ts = time.time() async with _state_lock: _inference_buffers[freq].append(payload) _prune_inference_freq_locked(freq, now_ts) await _broadcast_inference({'type': 'inference_result', 'data': payload}) return {'ok': True} @app.get('/inference/history') async def inference_history( freq: Optional[str] = Query(default=None), limit: int = Query(default=20, ge=1, le=200), ): async with _state_lock: series = _copy_inference_series_locked(limit=limit, freq=freq) return {'limit': limit, 'series': series} @app.websocket('/telemetry/ws') async def telemetry_ws(websocket: WebSocket): await websocket.accept() async with _state_lock: _ws_clients.append(websocket) snapshot = _copy_series_locked(seconds=min(300, TELEMETRY_HISTORY_SEC), freq=None) await websocket.send_json({'type': 'snapshot', 'data': snapshot}) try: while True: # Keepalive channel from browser; content is ignored. await websocket.receive_text() except WebSocketDisconnect: pass finally: async with _state_lock: if websocket in _ws_clients: _ws_clients.remove(websocket) @app.websocket('/inference/ws') async def inference_ws(websocket: WebSocket): await websocket.accept() async with _state_lock: _inference_ws_clients.append(websocket) snapshot = _copy_inference_series_locked(limit=20, freq=None) await websocket.send_json({'type': 'snapshot', 'data': snapshot}) try: while True: await websocket.receive_text() except WebSocketDisconnect: pass finally: async with _state_lock: if websocket in _inference_ws_clients: _inference_ws_clients.remove(websocket) MONITOR_HTML = """