diff --git a/telemetry/__init__.py b/telemetry/__init__.py new file mode 100644 index 0000000..11ccedd --- /dev/null +++ b/telemetry/__init__.py @@ -0,0 +1 @@ +# telemetry package diff --git a/telemetry/telemetry_server.py b/telemetry/telemetry_server.py new file mode 100644 index 0000000..1cacf14 --- /dev/null +++ b/telemetry/telemetry_server.py @@ -0,0 +1,299 @@ +import asyncio +import os +import time +from collections import defaultdict, deque +from typing import Any, Deque, Dict, List, Optional + +from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse +from pydantic import BaseModel, Field + +from common.runtime import load_root_env + +load_root_env(__file__) + +TELEMETRY_BIND_HOST = os.getenv('telemetry_bind_host', os.getenv('lochost', '0.0.0.0')) +TELEMETRY_BIND_PORT = int(os.getenv('telemetry_bind_port', os.getenv('telemetry_port', '5020'))) +TELEMETRY_HISTORY_SEC = int(float(os.getenv('telemetry_history_sec', '900'))) +TELEMETRY_MAX_POINTS_PER_FREQ = int(os.getenv('telemetry_max_points_per_freq', '5000')) + + +def _new_buffer() -> Deque[Dict[str, Any]]: + return deque(maxlen=TELEMETRY_MAX_POINTS_PER_FREQ) + + +app = FastAPI(title='DroneDetector Telemetry Server') +_buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_buffer) +_ws_clients: List[WebSocket] = [] +_state_lock = asyncio.Lock() + + +class TelemetryPoint(BaseModel): + freq: str + ts: float = Field(default_factory=lambda: time.time()) + dbfs_current: float + dbfs_threshold: Optional[float] = None + alarm: bool = False + channel_idx: int = 0 + channels_total: int = 1 + + +def _prune_freq_locked(freq: str, now_ts: float) -> None: + cutoff = now_ts - TELEMETRY_HISTORY_SEC + buf = _buffers[freq] + while buf and float(buf[0].get('ts', 0.0)) < cutoff: + buf.popleft() + + +def _copy_series_locked(seconds: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]: + now_ts = time.time() + cutoff = now_ts - seconds + + if freq is not None: + data = [point for point in _buffers.get(freq, []) if float(point.get('ts', 0.0)) >= cutoff] + return {freq: data} + + series: Dict[str, List[Dict[str, Any]]] = {} + for key, buf in _buffers.items(): + series[key] = [point for point in buf if float(point.get('ts', 0.0)) >= cutoff] + return series + + +async def _broadcast(message: Dict[str, Any]) -> None: + dead: List[WebSocket] = [] + for ws in list(_ws_clients): + try: + await ws.send_json(message) + except Exception: + dead.append(ws) + + if dead: + async with _state_lock: + for ws in dead: + if ws in _ws_clients: + _ws_clients.remove(ws) + + +@app.post('/telemetry') +async def ingest_telemetry(point: TelemetryPoint): + payload = point.model_dump() + freq = str(payload['freq']) + now_ts = time.time() + + async with _state_lock: + _buffers[freq].append(payload) + _prune_freq_locked(freq, now_ts) + + await _broadcast({'type': 'point', 'data': payload}) + return {'ok': True} + + +@app.get('/telemetry/history') +async def telemetry_history( + freq: Optional[str] = Query(default=None), + seconds: int = Query(default=300, ge=10, le=86400), +): + seconds = min(seconds, TELEMETRY_HISTORY_SEC) + async with _state_lock: + series = _copy_series_locked(seconds=seconds, freq=freq) + return {'seconds': seconds, 'series': series} + + +@app.websocket('/telemetry/ws') +async def telemetry_ws(websocket: WebSocket): + await websocket.accept() + async with _state_lock: + _ws_clients.append(websocket) + snapshot = _copy_series_locked(seconds=min(300, TELEMETRY_HISTORY_SEC), freq=None) + + await websocket.send_json({'type': 'snapshot', 'data': snapshot}) + + try: + while True: + # Keepalive channel from browser; content is ignored. + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + async with _state_lock: + if websocket in _ws_clients: + _ws_clients.remove(websocket) + + +MONITOR_HTML = """ + + + + + + DroneDetector Telemetry + + + + +
+
+
+

DroneDetector Telemetry Monitor

+
Green: dBFS current, Red: dynamic alarm threshold
+
+
connecting...
+
+
+
+ + + + +""" + + +@app.get('/', response_class=HTMLResponse) +@app.get('/monitor', response_class=HTMLResponse) +async def monitor_page(): + return HTMLResponse(content=MONITOR_HTML) + + +if __name__ == '__main__': + import uvicorn + + uvicorn.run(app, host=TELEMETRY_BIND_HOST, port=TELEMETRY_BIND_PORT)