import asyncio import os import time from collections import defaultdict, deque from typing import Any, Deque, Dict, List, Optional from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from common.runtime import load_root_env load_root_env(__file__) TELEMETRY_BIND_HOST = os.getenv('telemetry_bind_host', os.getenv('lochost', '0.0.0.0')) TELEMETRY_BIND_PORT = int(os.getenv('telemetry_bind_port', os.getenv('telemetry_port', '5020'))) TELEMETRY_HISTORY_SEC = int(float(os.getenv('telemetry_history_sec', '900'))) TELEMETRY_MAX_POINTS_PER_FREQ = int(os.getenv('telemetry_max_points_per_freq', '5000')) def _new_buffer() -> Deque[Dict[str, Any]]: return deque(maxlen=TELEMETRY_MAX_POINTS_PER_FREQ) app = FastAPI(title='DroneDetector Telemetry Server') _buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_buffer) _ws_clients: List[WebSocket] = [] _state_lock = asyncio.Lock() class TelemetryPoint(BaseModel): freq: str ts: float = Field(default_factory=lambda: time.time()) dbfs_current: float dbfs_threshold: Optional[float] = None alarm: bool = False channel_idx: int = 0 channels_total: int = 1 def _prune_freq_locked(freq: str, now_ts: float) -> None: cutoff = now_ts - TELEMETRY_HISTORY_SEC buf = _buffers[freq] while buf and float(buf[0].get('ts', 0.0)) < cutoff: buf.popleft() def _copy_series_locked(seconds: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]: now_ts = time.time() cutoff = now_ts - seconds if freq is not None: data = [point for point in _buffers.get(freq, []) if float(point.get('ts', 0.0)) >= cutoff] return {freq: data} series: Dict[str, List[Dict[str, Any]]] = {} for key, buf in _buffers.items(): series[key] = [point for point in buf if float(point.get('ts', 0.0)) >= cutoff] return series async def _broadcast(message: Dict[str, Any]) -> None: dead: List[WebSocket] = [] for ws in list(_ws_clients): try: await ws.send_json(message) except Exception: dead.append(ws) if dead: async with _state_lock: for ws in dead: if ws in _ws_clients: _ws_clients.remove(ws) @app.post('/telemetry') async def ingest_telemetry(point: TelemetryPoint): payload = point.model_dump() freq = str(payload['freq']) now_ts = time.time() async with _state_lock: _buffers[freq].append(payload) _prune_freq_locked(freq, now_ts) await _broadcast({'type': 'point', 'data': payload}) return {'ok': True} @app.get('/telemetry/history') async def telemetry_history( freq: Optional[str] = Query(default=None), seconds: int = Query(default=300, ge=10, le=86400), ): seconds = min(seconds, TELEMETRY_HISTORY_SEC) async with _state_lock: series = _copy_series_locked(seconds=seconds, freq=freq) return {'seconds': seconds, 'series': series} @app.websocket('/telemetry/ws') async def telemetry_ws(websocket: WebSocket): await websocket.accept() async with _state_lock: _ws_clients.append(websocket) snapshot = _copy_series_locked(seconds=min(300, TELEMETRY_HISTORY_SEC), freq=None) await websocket.send_json({'type': 'snapshot', 'data': snapshot}) try: while True: # Keepalive channel from browser; content is ignored. await websocket.receive_text() except WebSocketDisconnect: pass finally: async with _state_lock: if websocket in _ws_clients: _ws_clients.remove(websocket) MONITOR_HTML = """