You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/telemetry/telemetry_server.py

466 lines
13 KiB
Python

import asyncio
import os
import time
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional
from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, Field
from common.runtime import load_root_env
load_root_env(__file__)
TELEMETRY_BIND_HOST = os.getenv('telemetry_bind_host', os.getenv('lochost', '0.0.0.0'))
TELEMETRY_BIND_PORT = int(os.getenv('telemetry_bind_port', os.getenv('telemetry_port', '5020')))
TELEMETRY_HISTORY_SEC = int(float(os.getenv('telemetry_history_sec', '900')))
TELEMETRY_MAX_POINTS_PER_FREQ = int(os.getenv('telemetry_max_points_per_freq', '5000'))
def _new_buffer() -> Deque[Dict[str, Any]]:
return deque(maxlen=TELEMETRY_MAX_POINTS_PER_FREQ)
app = FastAPI(title='DroneDetector Telemetry Server')
_buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_buffer)
_ws_clients: List[WebSocket] = []
_state_lock = asyncio.Lock()
class TelemetryPoint(BaseModel):
freq: str
ts: float = Field(default_factory=lambda: time.time())
dbfs_current: float
dbfs_threshold: Optional[float] = None
alarm: bool = False
channel_idx: int = 0
channels_total: int = 1
channel_values: Optional[List[float]] = None
channel_thresholds: Optional[List[Optional[float]]] = None
alarm_channels: Optional[List[int]] = None
def _prune_freq_locked(freq: str, now_ts: float) -> None:
cutoff = now_ts - TELEMETRY_HISTORY_SEC
buf = _buffers[freq]
while buf and float(buf[0].get('ts', 0.0)) < cutoff:
buf.popleft()
def _copy_series_locked(seconds: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]:
now_ts = time.time()
cutoff = now_ts - seconds
if freq is not None:
data = [point for point in _buffers.get(freq, []) if float(point.get('ts', 0.0)) >= cutoff]
return {freq: data}
series: Dict[str, List[Dict[str, Any]]] = {}
for key, buf in _buffers.items():
series[key] = [point for point in buf if float(point.get('ts', 0.0)) >= cutoff]
return series
async def _broadcast(message: Dict[str, Any]) -> None:
dead: List[WebSocket] = []
for ws in list(_ws_clients):
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
if dead:
async with _state_lock:
for ws in dead:
if ws in _ws_clients:
_ws_clients.remove(ws)
@app.post('/telemetry')
async def ingest_telemetry(point: TelemetryPoint):
payload = point.model_dump()
freq = str(payload['freq'])
now_ts = time.time()
async with _state_lock:
_buffers[freq].append(payload)
_prune_freq_locked(freq, now_ts)
await _broadcast({'type': 'point', 'data': payload})
return {'ok': True}
@app.get('/telemetry/history')
async def telemetry_history(
freq: Optional[str] = Query(default=None),
seconds: int = Query(default=300, ge=10, le=86400),
):
seconds = min(seconds, TELEMETRY_HISTORY_SEC)
async with _state_lock:
series = _copy_series_locked(seconds=seconds, freq=freq)
return {'seconds': seconds, 'series': series}
@app.websocket('/telemetry/ws')
async def telemetry_ws(websocket: WebSocket):
await websocket.accept()
async with _state_lock:
_ws_clients.append(websocket)
snapshot = _copy_series_locked(seconds=min(300, TELEMETRY_HISTORY_SEC), freq=None)
await websocket.send_json({'type': 'snapshot', 'data': snapshot})
try:
while True:
# Keepalive channel from browser; content is ignored.
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
async with _state_lock:
if websocket in _ws_clients:
_ws_clients.remove(websocket)
MONITOR_HTML = """
<!doctype html>
<html>
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<title>DroneDetector Telemetry</title>
<script src=\"https://cdn.plot.ly/plotly-2.35.2.min.js\"></script>
<style>
:root {
--bg: #f6f8fb;
--card: #ffffff;
--line: #d9dde5;
--text: #1c232e;
--green: #12b76a;
--red: #ef4444;
--muted: #5b6574;
}
body { margin: 0; background: var(--bg); color: var(--text); font-family: system-ui, -apple-system, Segoe UI, sans-serif; }
.wrap { max-width: 1800px; margin: 0 auto; padding: 14px; }
.head { display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px; }
.meta { font-size: 13px; color: var(--muted); }
.grid { display: flex; flex-direction: column; gap: 10px; }
.card { width: 100%; background: var(--card); border: 1px solid var(--line); border-radius: 10px; padding: 8px 8px 8px; }
.title-row { display: flex; justify-content: space-between; align-items: center; margin: 4px 8px; }
.title { font-size: 20px; font-weight: 700; }
.ctrl { display: flex; align-items: center; gap: 6px; }
.ctrl label { font-size: 12px; color: var(--muted); }
.ctrl select { border: 1px solid var(--line); border-radius: 6px; padding: 2px 6px; }
.plot { height: 260px; width: 100%; }
.events-title { font-size: 12px; color: var(--muted); margin: 2px 8px 4px; }
.events { max-height: 110px; overflow-y: auto; border-top: 1px dashed var(--line); margin: 0 8px; padding-top: 4px; }
.ev { display: flex; justify-content: space-between; font-size: 12px; line-height: 1.4; color: var(--text); }
.ev-t { color: var(--muted); }
.ev-empty { color: var(--muted); font-size: 12px; }
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"head\">
<div>
<h2 style=\"margin:0;\">DroneDetector Telemetry Monitor</h2>
<div class=\"meta\">Green: dBFS current, Red: channel threshold, Red dots: alarm points</div>
</div>
<div class=\"meta\" id=\"status\">connecting...</div>
</div>
<div class=\"grid\" id=\"plots\"></div>
</div>
<script>
const windowSec = 300;
const state = {}; // freq -> points[]
const selectedChannel = {}; // freq -> 'max' | channel index as string
function numericSortFreq(a, b) {
return Number(a) - Number(b);
}
function formatTime(ts) {
return new Date(Number(ts) * 1000).toLocaleTimeString('ru-RU', {hour12: false});
}
function getChannelCount(freq) {
const pts = state[freq] || [];
let maxCount = 1;
for (const p of pts) {
if (Number.isFinite(Number(p.channels_total))) {
maxCount = Math.max(maxCount, Number(p.channels_total));
}
if (Array.isArray(p.channel_values)) {
maxCount = Math.max(maxCount, p.channel_values.length);
}
}
return maxCount;
}
function ensurePlot(freq) {
if (document.getElementById(`plot-${freq}`)) return;
const card = document.createElement('div');
card.className = 'card';
card.innerHTML = `
<div class=\"title-row\">
<div class=\"title\">${freq} MHz</div>
<div class=\"ctrl\">
<label for=\"chan-${freq}\">channel</label>
<select id=\"chan-${freq}\"></select>
</div>
</div>
<div class=\"plot\" id=\"plot-${freq}\"></div>
<div class=\"events-title\">Alarms (time -> channel)</div>
<div class=\"events\" id=\"events-${freq}\"></div>
`;
document.getElementById('plots').appendChild(card);
selectedChannel[freq] = 'max';
const sel = document.getElementById(`chan-${freq}`);
sel.addEventListener('change', () => {
selectedChannel[freq] = sel.value;
render(freq);
});
}
function updateChannelSelector(freq) {
const sel = document.getElementById(`chan-${freq}`);
if (!sel) return;
const prev = selectedChannel[freq] ?? 'max';
const count = getChannelCount(freq);
const opts = ['max'];
for (let i = 0; i < count; i += 1) opts.push(String(i));
sel.innerHTML = '';
for (const v of opts) {
const option = document.createElement('option');
option.value = v;
option.textContent = v === 'max' ? 'max' : `ch ${v}`;
sel.appendChild(option);
}
selectedChannel[freq] = opts.includes(prev) ? prev : 'max';
sel.value = selectedChannel[freq];
}
function trimPoints(freq) {
const arr = state[freq] || [];
const cutoff = Date.now() / 1000 - windowSec;
state[freq] = arr.filter(p => Number(p.ts) >= cutoff);
}
function getPointValueForSelection(point, selection) {
if (selection === 'max') {
return {
y: point.dbfs_current ?? null,
threshold: point.dbfs_threshold ?? null,
};
}
const idx = Number(selection);
if (!Number.isInteger(idx)) {
return {y: null, threshold: null};
}
const y = Array.isArray(point.channel_values) && idx < point.channel_values.length
? point.channel_values[idx]
: null;
const threshold = Array.isArray(point.channel_thresholds) && idx < point.channel_thresholds.length
? point.channel_thresholds[idx]
: null;
return {y, threshold};
}
function isAlarmForSelection(point, selection) {
if (point.alarm !== true) return false;
if (selection === 'max') return true;
const idx = Number(selection);
if (!Number.isInteger(idx)) return false;
if (Array.isArray(point.alarm_channels) && point.alarm_channels.length > 0) {
return point.alarm_channels.includes(idx);
}
return Number(point.channel_idx) === idx;
}
function renderAlarmEvents(freq, pts) {
const el = document.getElementById(`events-${freq}`);
if (!el) return;
const alarmPts = pts.filter(p => p.alarm === true);
if (alarmPts.length === 0) {
el.innerHTML = '<div class=\"ev-empty\">no alarms</div>';
return;
}
const rows = alarmPts.slice(-20).reverse().map((p) => {
const channels = Array.isArray(p.alarm_channels) && p.alarm_channels.length > 0
? p.alarm_channels.join(',')
: String(p.channel_idx ?? '-');
return `<div class=\"ev\"><span class=\"ev-t\">${formatTime(p.ts)}</span><span>ch ${channels}</span></div>`;
});
el.innerHTML = rows.join('');
}
function render(freq) {
ensurePlot(freq);
trimPoints(freq);
updateChannelSelector(freq);
const pts = state[freq] || [];
const sel = selectedChannel[freq] ?? 'max';
const x = [];
const y = [];
const thr = [];
const alarmX = [];
const alarmY = [];
for (const p of pts) {
const metric = getPointValueForSelection(p, sel);
if (metric.y === null || metric.y === undefined) {
continue;
}
const ts = new Date(Number(p.ts) * 1000);
x.push(ts);
y.push(metric.y);
thr.push(metric.threshold);
if (isAlarmForSelection(p, sel)) {
alarmX.push(ts);
alarmY.push(metric.y);
}
}
const labelSuffix = sel === 'max' ? 'max' : `ch ${sel}`;
const traces = [
{
x,
y,
mode: 'lines',
name: `dBFS (${labelSuffix})`,
line: {color: '#12b76a', width: 2},
},
{
x,
y: thr,
mode: 'lines',
name: `Threshold (${labelSuffix})`,
line: {color: '#ef4444', width: 2, dash: 'dash'},
},
{
x: alarmX,
y: alarmY,
mode: 'markers',
name: 'Alarm',
marker: {color: '#ef4444', size: 6, symbol: 'circle'},
},
];
Plotly.react(`plot-${freq}`, traces, {
margin: {l: 40, r: 12, t: 12, b: 32},
showlegend: true,
legend: {orientation: 'h', y: 1.16},
xaxis: {
title: 'time',
tickformat: '%H:%M:%S',
hoverformat: '%H:%M:%S',
range: [new Date(Date.now() - windowSec * 1000), new Date()],
},
yaxis: {title: 'dBFS'},
}, {displayModeBar: false, responsive: true});
renderAlarmEvents(freq, pts);
}
function renderAll() {
const freqs = Object.keys(state).sort(numericSortFreq);
freqs.forEach(render);
}
async function loadInitial() {
const res = await fetch(`/telemetry/history?seconds=${windowSec}`);
const payload = await res.json();
const series = payload.series || {};
for (const [freq, points] of Object.entries(series)) {
state[freq] = points;
}
renderAll();
}
function connectWs() {
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
const ws = new WebSocket(`${proto}://${location.host}/telemetry/ws`);
ws.onopen = () => {
document.getElementById('status').textContent = 'ws connected';
setInterval(() => {
if (ws.readyState === 1) ws.send('ping');
}, 20000);
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'snapshot' && msg.data) {
for (const [freq, points] of Object.entries(msg.data)) {
state[freq] = points;
}
renderAll();
return;
}
if (msg.type !== 'point') return;
const p = msg.data;
const freq = String(p.freq);
if (!state[freq]) state[freq] = [];
state[freq].push(p);
render(freq);
};
ws.onclose = () => {
document.getElementById('status').textContent = 'ws disconnected, retrying...';
setTimeout(connectWs, 1500);
};
ws.onerror = () => {
document.getElementById('status').textContent = 'ws error';
};
}
setInterval(() => {
renderAll();
}, 1000);
loadInitial().then(connectWs).catch((e) => {
document.getElementById('status').textContent = `init error: ${e}`;
connectWs();
});
</script>
</body>
</html>
"""
@app.get('/', response_class=HTMLResponse)
@app.get('/monitor', response_class=HTMLResponse)
async def monitor_page():
return HTMLResponse(content=MONITOR_HTML)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host=TELEMETRY_BIND_HOST, port=TELEMETRY_BIND_PORT)