inference_viewer

Automatica-3
Sergey Revyakin 2 days ago
parent 3bf93aab3f
commit 98f6fbdbdc

@ -15,6 +15,8 @@ import shutil
import json
import gc
import logging
import time
import re
TORCHSIG_PATH = "/app/torchsig"
if TORCHSIG_PATH not in sys.path:
@ -51,20 +53,89 @@ config = dict(dotenv_values(ROOT_ENV))
if not config:
raise RuntimeError("[NN_server/server.py] .env was loaded but no keys were parsed")
if not any(key.startswith("NN_") for key in config):
MODEL_ENV_RE = re.compile(r"^NN_\d+$")
if not any(MODEL_ENV_RE.match(key) for key in config):
raise RuntimeError("[NN_server/server.py] no NN_* model entries configured")
logging.info("NN config loaded from %s", ROOT_ENV)
gen_server_ip = config['GENERAL_SERVER_IP']
gen_server_port = config['GENERAL_SERVER_PORT']
INFERENCE_TELEMETRY_HOST = os.getenv('telemetry_host', '127.0.0.1')
INFERENCE_TELEMETRY_PORT = os.getenv('telemetry_port', '5020')
INFERENCE_TELEMETRY_ENDPOINT = os.getenv('telemetry_inference_endpoint', 'inference/result')
INFERENCE_TELEMETRY_TIMEOUT_SEC = float(os.getenv('telemetry_inference_timeout_sec', '0.30'))
INFERENCE_IMAGE_RE = re.compile(r"_inference_(\d+)_")
def get_result_dir():
return config.get('SRC_RESULT', '')
def collect_inference_images(result_id, model_name=''):
result_dir = get_result_dir()
if not result_dir or not os.path.isdir(result_dir):
return result_id, []
needle = f"_inference_{result_id}_"
model_suffix = f"_{model_name}.png" if model_name else ''
exact_images = []
grouped_images = {}
for name in sorted(os.listdir(result_dir)):
if not name.endswith('.png'):
continue
if model_suffix and not name.endswith(model_suffix):
continue
match = INFERENCE_IMAGE_RE.search(name)
if match is None:
continue
image_result_id = int(match.group(1))
grouped_images.setdefault(image_result_id, []).append(name)
if image_result_id == result_id and needle in name:
exact_images.append(name)
if exact_images:
return result_id, exact_images
if not grouped_images:
return result_id, []
latest_result_id = max(grouped_images)
return latest_result_id, grouped_images[latest_result_id]
def send_inference_result(payload):
try:
requests.post(
"http://{0}:{1}/{2}".format(
INFERENCE_TELEMETRY_HOST,
INFERENCE_TELEMETRY_PORT,
INFERENCE_TELEMETRY_ENDPOINT.lstrip('/'),
),
json=payload,
timeout=INFERENCE_TELEMETRY_TIMEOUT_SEC,
)
except Exception as exc:
print(str(exc))
def reset_directory_contents(path):
os.makedirs(path, exist_ok=True)
for name in os.listdir(path):
full_path = os.path.join(path, name)
try:
if os.path.isdir(full_path) and not os.path.islink(full_path):
shutil.rmtree(full_path)
else:
os.remove(full_path)
except FileNotFoundError:
continue
def init_data_for_inference():
try:
if os.path.isdir(config['SRC_RESULT']):
shutil.rmtree(config['SRC_RESULT'])
os.mkdir(config['SRC_RESULT'])
if os.path.isdir(config['SRC_EXAMPLE']):
shutil.rmtree(config['SRC_EXAMPLE'])
os.mkdir(config['SRC_EXAMPLE'])
reset_directory_contents(config['SRC_RESULT'])
reset_directory_contents(config['SRC_EXAMPLE'])
except Exception as exc:
print(str(exc))
print()
@ -72,7 +143,7 @@ def init_data_for_inference():
try:
global model_list
for key in config.keys():
if key.startswith('NN_'):
if MODEL_ENV_RE.match(key):
params = config[key].split(' && ')
module = importlib.import_module('Models.' + params[4])
classes = {}
@ -111,9 +182,12 @@ def run_example():
def receive_data():
try:
print()
data = json.loads(request.json)
data = request.json
if isinstance(data, str):
data = json.loads(data)
print('#' * 100)
print('Получен пакет ' + str(Model.get_ind_inference()))
result_id = Model.get_ind_inference()
freq = int(data['freq'])
print('Частота: ' + str(freq))
# print('Канал: ' + str(data['channel']))
@ -133,6 +207,18 @@ def receive_data():
result_msg[str(model.get_model_name())]['prediction'] = prediction
result_msg[str(model.get_model_name())]['probability'] = str(probability)
prediction_list.append(prediction)
image_result_id, images = collect_inference_images(result_id, model.get_model_name())
send_inference_result({
'result_id': image_result_id,
'ts': time.time(),
'freq': str(freq),
'model': model.get_model_name(),
'prediction': prediction,
'probability': float(probability),
'drone_probability': float(probability) if prediction == 'drone' else 0.0,
'drone_threshold': None,
'images': images,
})
print('-' * 100)
print()

@ -37,6 +37,7 @@ services:
environment:
- PYTHONPATH=/app:/app/NN_server
- NN_HOT_RELOAD=${NN_HOT_RELOAD:-1}
- telemetry_host=dronedetector-telemetry-server
working_dir: /app/NN_server
command:
- sh
@ -76,6 +77,7 @@ services:
- ../../.env:/app/.env:ro
- ../../telemetry:/app/telemetry
- ../../common:/app/common
- ../../NN_server/result:/app/inference_result:ro
networks:
- dronedetector-net

@ -1,11 +1,13 @@
import asyncio
import os
import re
import time
from collections import defaultdict, deque
from pathlib import Path
from typing import Any, Deque, Dict, List, Optional
from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, HTMLResponse
from pydantic import BaseModel, Field
from common.runtime import load_root_env
@ -16,15 +18,25 @@ TELEMETRY_BIND_HOST = os.getenv('telemetry_bind_host', os.getenv('lochost', '0.0
TELEMETRY_BIND_PORT = int(os.getenv('telemetry_bind_port', os.getenv('telemetry_port', '5020')))
TELEMETRY_HISTORY_SEC = int(float(os.getenv('telemetry_history_sec', '900')))
TELEMETRY_MAX_POINTS_PER_FREQ = int(os.getenv('telemetry_max_points_per_freq', '5000'))
INFERENCE_HISTORY_SEC = int(float(os.getenv('inference_history_sec', str(TELEMETRY_HISTORY_SEC))))
INFERENCE_MAX_RESULTS_PER_FREQ = int(os.getenv('inference_max_results_per_freq', '100'))
INFERENCE_RESULT_DIR = Path(os.getenv('inference_result_dir', '/app/inference_result')).resolve()
INFERENCE_IMAGE_RE = re.compile(r"_inference_(\d+)_")
def _new_buffer() -> Deque[Dict[str, Any]]:
return deque(maxlen=TELEMETRY_MAX_POINTS_PER_FREQ)
def _new_inference_buffer() -> Deque[Dict[str, Any]]:
return deque(maxlen=INFERENCE_MAX_RESULTS_PER_FREQ)
app = FastAPI(title='DroneDetector Telemetry Server')
_buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_buffer)
_ws_clients: List[WebSocket] = []
_inference_buffers: Dict[str, Deque[Dict[str, Any]]] = defaultdict(_new_inference_buffer)
_inference_ws_clients: List[WebSocket] = []
_state_lock = asyncio.Lock()
@ -41,6 +53,18 @@ class TelemetryPoint(BaseModel):
alarm_channels: Optional[List[int]] = None
class InferenceResult(BaseModel):
result_id: int
ts: float = Field(default_factory=lambda: time.time())
freq: str
model: str
prediction: str
probability: float
drone_probability: Optional[float] = None
drone_threshold: Optional[str] = None
images: List[str] = Field(default_factory=list)
def _prune_freq_locked(freq: str, now_ts: float) -> None:
cutoff = now_ts - TELEMETRY_HISTORY_SEC
buf = _buffers[freq]
@ -62,6 +86,70 @@ def _copy_series_locked(seconds: int, freq: Optional[str] = None) -> Dict[str, L
return series
def _prune_inference_freq_locked(freq: str, now_ts: float) -> None:
cutoff = now_ts - INFERENCE_HISTORY_SEC
buf = _inference_buffers[freq]
while buf and float(buf[0].get('ts', 0.0)) < cutoff:
buf.popleft()
def _copy_inference_series_locked(limit: int, freq: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]:
now_ts = time.time()
cutoff = now_ts - INFERENCE_HISTORY_SEC
def _slice(buf: Deque[Dict[str, Any]]) -> List[Dict[str, Any]]:
recent = [item for item in buf if float(item.get('ts', 0.0)) >= cutoff]
return recent[-limit:]
if freq is not None:
return {freq: _slice(_inference_buffers.get(freq, deque()))}
series: Dict[str, List[Dict[str, Any]]] = {}
for key, buf in _inference_buffers.items():
series[key] = _slice(buf)
return series
def _sanitize_image_names(names: List[str]) -> List[str]:
safe_names: List[str] = []
for name in names:
base = Path(str(name)).name
if not base or not base.endswith('.png'):
continue
safe_names.append(base)
return safe_names
def _resolve_latest_images_for_model(payload: Dict[str, Any]) -> Dict[str, Any]:
model_name = str(payload.get('model', ''))
if not model_name or not INFERENCE_RESULT_DIR.is_dir():
payload['images'] = _sanitize_image_names(payload.get('images', []))
return payload
model_suffix = f"_{model_name}.png"
grouped: Dict[int, List[str]] = {}
for path in INFERENCE_RESULT_DIR.iterdir():
if not path.is_file():
continue
name = path.name
if not name.endswith(model_suffix):
continue
match = INFERENCE_IMAGE_RE.search(name)
if match is None:
continue
grouped.setdefault(int(match.group(1)), []).append(name)
if not grouped:
payload['images'] = _sanitize_image_names(payload.get('images', []))
return payload
current_id = int(payload.get('result_id', 0) or 0)
resolved_id = current_id if current_id in grouped else max(grouped)
payload['result_id'] = resolved_id
payload['images'] = sorted(grouped[resolved_id])
return payload
async def _broadcast(message: Dict[str, Any]) -> None:
dead: List[WebSocket] = []
for ws in list(_ws_clients):
@ -77,6 +165,21 @@ async def _broadcast(message: Dict[str, Any]) -> None:
_ws_clients.remove(ws)
async def _broadcast_inference(message: Dict[str, Any]) -> None:
dead: List[WebSocket] = []
for ws in list(_inference_ws_clients):
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
if dead:
async with _state_lock:
for ws in dead:
if ws in _inference_ws_clients:
_inference_ws_clients.remove(ws)
@app.post('/telemetry')
async def ingest_telemetry(point: TelemetryPoint):
payload = point.model_dump()
@ -102,6 +205,31 @@ async def telemetry_history(
return {'seconds': seconds, 'series': series}
@app.post('/inference/result')
async def ingest_inference_result(result: InferenceResult):
payload = result.model_dump()
payload = _resolve_latest_images_for_model(payload)
freq = str(payload['freq'])
now_ts = time.time()
async with _state_lock:
_inference_buffers[freq].append(payload)
_prune_inference_freq_locked(freq, now_ts)
await _broadcast_inference({'type': 'inference_result', 'data': payload})
return {'ok': True}
@app.get('/inference/history')
async def inference_history(
freq: Optional[str] = Query(default=None),
limit: int = Query(default=20, ge=1, le=200),
):
async with _state_lock:
series = _copy_inference_series_locked(limit=limit, freq=freq)
return {'limit': limit, 'series': series}
@app.websocket('/telemetry/ws')
async def telemetry_ws(websocket: WebSocket):
await websocket.accept()
@ -123,6 +251,26 @@ async def telemetry_ws(websocket: WebSocket):
_ws_clients.remove(websocket)
@app.websocket('/inference/ws')
async def inference_ws(websocket: WebSocket):
await websocket.accept()
async with _state_lock:
_inference_ws_clients.append(websocket)
snapshot = _copy_inference_series_locked(limit=20, freq=None)
await websocket.send_json({'type': 'snapshot', 'data': snapshot})
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
async with _state_lock:
if websocket in _inference_ws_clients:
_inference_ws_clients.remove(websocket)
MONITOR_HTML = """
<!doctype html>
<html>
@ -453,12 +601,294 @@ loadInitial().then(connectWs).catch((e) => {
"""
INFERENCE_VIEWER_HTML = """
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>DroneDetector Inference Viewer</title>
<style>
:root {
--bg: #f4f6f8;
--card: #ffffff;
--line: #d7dde5;
--text: #1c232e;
--muted: #647084;
--ok: #16a34a;
--warn: #b45309;
--accent: #0f6fff;
}
* { box-sizing: border-box; }
body { margin: 0; background: var(--bg); color: var(--text); font-family: system-ui, -apple-system, Segoe UI, sans-serif; }
.wrap { max-width: 1800px; margin: 0 auto; padding: 16px; }
.head { display: flex; justify-content: space-between; align-items: center; gap: 16px; margin-bottom: 16px; }
.meta { color: var(--muted); font-size: 13px; }
.actions { display: flex; gap: 8px; align-items: center; }
button { border: 1px solid var(--line); background: var(--card); color: var(--text); border-radius: 8px; padding: 8px 12px; cursor: pointer; }
button.primary { background: var(--accent); color: #fff; border-color: var(--accent); }
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(520px, 1fr)); gap: 14px; }
.card { background: var(--card); border: 1px solid var(--line); border-radius: 14px; padding: 14px; }
.card-head { display: flex; justify-content: space-between; align-items: start; gap: 12px; margin-bottom: 10px; }
.freq { font-size: 28px; font-weight: 700; }
.pill { display: inline-flex; align-items: center; gap: 6px; border-radius: 999px; padding: 4px 10px; font-size: 12px; }
.pill.live { background: #dcfce7; color: #166534; }
.pill.paused { background: #fef3c7; color: #92400e; }
.summary { display: grid; grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 8px 12px; margin-bottom: 12px; }
.summary-item { border: 1px solid var(--line); border-radius: 10px; padding: 8px 10px; }
.summary-label { color: var(--muted); font-size: 12px; margin-bottom: 4px; }
.summary-value { font-size: 14px; font-weight: 600; word-break: break-word; }
.images { display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 10px; margin-bottom: 12px; }
.image-card { border: 1px solid var(--line); border-radius: 10px; overflow: hidden; background: #f8fafc; }
.image-card img { width: 100%; height: 180px; object-fit: contain; display: block; background: #fff; }
.image-card .cap { padding: 6px 8px; font-size: 12px; color: var(--muted); border-top: 1px solid var(--line); }
.history-title { font-size: 13px; font-weight: 600; margin-bottom: 8px; }
.history { display: flex; gap: 8px; overflow-x: auto; padding-bottom: 4px; }
.thumb { min-width: 110px; max-width: 110px; border: 1px solid var(--line); border-radius: 10px; background: #fff; padding: 6px; cursor: pointer; }
.thumb.active { border-color: var(--accent); box-shadow: inset 0 0 0 1px var(--accent); }
.thumb img { width: 100%; height: 70px; object-fit: contain; display: block; background: #f8fafc; border-radius: 6px; }
.thumb .t1 { font-size: 12px; font-weight: 600; margin-top: 6px; }
.thumb .t2 { font-size: 11px; color: var(--muted); }
.empty { color: var(--muted); font-size: 13px; padding: 16px 0; }
</style>
</head>
<body>
<div class="wrap">
<div class="head">
<div>
<h2 style="margin:0 0 4px;">DroneDetector Inference Viewer</h2>
<div class="meta">Latest inference card per frequency. Browser keeps last 20 results per frequency.</div>
</div>
<div class="actions">
<div class="meta" id="ws-status">connecting...</div>
<button id="pause-btn" class="primary">Pause updates</button>
</div>
</div>
<div class="grid" id="cards"></div>
</div>
<script>
const MAX_RESULTS = 20;
const state = {};
const selected = {};
let paused = false;
let wsKeepalive = null;
function freqSort(a, b) { return Number(a) - Number(b); }
function formatTs(ts) {
return new Date(Number(ts) * 1000).toLocaleString('ru-RU', { hour12: false });
}
function escapeHtml(value) {
return String(value ?? '').replaceAll('&', '&amp;').replaceAll('<', '&lt;').replaceAll('>', '&gt;').replaceAll('"', '&quot;');
}
function imageUrl(name) {
return `/inference/images/${encodeURIComponent(name)}`;
}
function primaryImage(result) {
return (result.images || [])[0] || '';
}
function trimResults(freq) {
const arr = state[freq] || [];
state[freq] = arr.slice(-MAX_RESULTS);
}
function ensureCard(freq) {
if (document.getElementById(`card-${freq}`)) return;
const card = document.createElement('div');
card.className = 'card';
card.id = `card-${freq}`;
card.innerHTML = `
<div class="card-head">
<div class="freq">${escapeHtml(freq)} MHz</div>
<div class="pill ${paused ? 'paused' : 'live'}" id="badge-${freq}">${paused ? 'paused' : 'live'}</div>
</div>
<div class="summary" id="summary-${freq}"></div>
<div class="images" id="images-${freq}"></div>
<div class="history-title">Recent results</div>
<div class="history" id="history-${freq}"></div>
`;
document.getElementById('cards').appendChild(card);
}
function renderSummary(freq, result) {
const el = document.getElementById(`summary-${freq}`);
el.innerHTML = `
<div class="summary-item"><div class="summary-label">Model</div><div class="summary-value">${escapeHtml(result.model)}</div></div>
<div class="summary-item"><div class="summary-label">Prediction</div><div class="summary-value">${escapeHtml(result.prediction)}</div></div>
<div class="summary-item"><div class="summary-label">Confidence</div><div class="summary-value">${Number(result.probability).toFixed(3)}</div></div>
<div class="summary-item"><div class="summary-label">Time</div><div class="summary-value">${escapeHtml(formatTs(result.ts))}</div></div>
`;
}
function renderImages(freq, result) {
const el = document.getElementById(`images-${freq}`);
const images = result.images || [];
if (!images.length) {
el.innerHTML = '<div class="empty">No images for this inference.</div>';
return;
}
el.innerHTML = images.map((name) => `
<div class="image-card">
<img loading="lazy" src="${imageUrl(name)}" alt="${escapeHtml(name)}" />
<div class="cap">${escapeHtml(name)}</div>
</div>
`).join('');
}
function renderHistory(freq) {
const el = document.getElementById(`history-${freq}`);
const results = state[freq] || [];
if (!results.length) {
el.innerHTML = '<div class="empty">No results yet.</div>';
return;
}
el.innerHTML = results.slice().reverse().map((result) => {
const active = String(selected[freq]) === String(result.result_id) ? 'active' : '';
const image = primaryImage(result);
return `
<button class="thumb ${active}" data-freq="${escapeHtml(freq)}" data-result-id="${result.result_id}">
${image ? `<img loading="lazy" src="${imageUrl(image)}" alt="${escapeHtml(result.prediction)}" />` : '<div class="empty" style="padding:20px 0;">no image</div>'}
<div class="t1">${escapeHtml(result.prediction)}</div>
<div class="t2">p=${Number(result.probability).toFixed(2)}</div>
</button>
`;
}).join('');
el.querySelectorAll('.thumb').forEach((node) => {
node.addEventListener('click', () => {
selected[freq] = Number(node.dataset.resultId);
render(freq);
});
});
}
function render(freq) {
ensureCard(freq);
trimResults(freq);
const badge = document.getElementById(`badge-${freq}`);
badge.textContent = paused ? 'paused' : 'live';
badge.className = `pill ${paused ? 'paused' : 'live'}`;
const results = state[freq] || [];
if (!results.length) {
document.getElementById(`summary-${freq}`).innerHTML = '<div class="empty">No inference results yet.</div>';
document.getElementById(`images-${freq}`).innerHTML = '';
document.getElementById(`history-${freq}`).innerHTML = '';
return;
}
const active = results.find((item) => String(item.result_id) === String(selected[freq])) || results[results.length - 1];
selected[freq] = active.result_id;
renderSummary(freq, active);
renderImages(freq, active);
renderHistory(freq);
}
function renderAll() {
Object.keys(state).sort(freqSort).forEach(render);
}
function applySnapshot(series) {
const next = {};
for (const [freq, results] of Object.entries(series || {})) {
next[String(freq)] = Array.isArray(results) ? results.slice(-MAX_RESULTS) : [];
}
for (const freq of Object.keys(next)) {
state[freq] = next[freq];
if (state[freq].length) {
selected[freq] = state[freq][state[freq].length - 1].result_id;
}
}
renderAll();
}
function ingestResult(result) {
const freq = String(result.freq);
if (!state[freq]) state[freq] = [];
state[freq].push(result);
trimResults(freq);
selected[freq] = result.result_id;
render(freq);
}
async function loadHistory() {
const res = await fetch(`/inference/history?limit=${MAX_RESULTS}`);
const payload = await res.json();
applySnapshot(payload.series || {});
}
function setPaused(nextPaused) {
paused = nextPaused;
const btn = document.getElementById('pause-btn');
btn.textContent = paused ? 'Resume updates' : 'Pause updates';
btn.className = paused ? '' : 'primary';
renderAll();
}
function connectWs() {
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
const ws = new WebSocket(`${proto}://${location.host}/inference/ws`);
ws.onopen = () => {
document.getElementById('ws-status').textContent = paused ? 'ws connected (paused)' : 'ws connected';
if (wsKeepalive) clearInterval(wsKeepalive);
wsKeepalive = setInterval(() => {
if (ws.readyState === 1) ws.send('ping');
}, 20000);
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'snapshot' && msg.data) {
if (!paused) applySnapshot(msg.data);
return;
}
if (msg.type !== 'inference_result' || paused) return;
ingestResult(msg.data);
};
ws.onclose = () => {
document.getElementById('ws-status').textContent = 'ws disconnected, retrying...';
if (wsKeepalive) clearInterval(wsKeepalive);
setTimeout(connectWs, 1500);
};
ws.onerror = () => {
document.getElementById('ws-status').textContent = 'ws error';
};
}
document.getElementById('pause-btn').addEventListener('click', async () => {
if (paused) {
setPaused(false);
document.getElementById('ws-status').textContent = 'syncing...';
try {
await loadHistory();
document.getElementById('ws-status').textContent = 'ws connected';
} catch (err) {
document.getElementById('ws-status').textContent = `history error: ${err}`;
}
} else {
setPaused(true);
document.getElementById('ws-status').textContent = 'ws connected (paused)';
}
});
loadHistory().then(connectWs).catch((err) => {
document.getElementById('ws-status').textContent = `init error: ${err}`;
connectWs();
});
</script>
</body>
</html>
"""
@app.get('/', response_class=HTMLResponse)
@app.get('/monitor', response_class=HTMLResponse)
async def monitor_page():
return HTMLResponse(content=MONITOR_HTML)
@app.get('/inference-viewer', response_class=HTMLResponse)
async def inference_viewer_page():
return HTMLResponse(content=INFERENCE_VIEWER_HTML)
@app.get('/inference/images/{filename}')
async def inference_image(filename: str):
safe_name = Path(filename).name
if safe_name != filename:
raise HTTPException(status_code=404, detail='image not found')
image_path = (INFERENCE_RESULT_DIR / safe_name).resolve()
if image_path.parent != INFERENCE_RESULT_DIR or not image_path.is_file():
raise HTTPException(status_code=404, detail='image not found')
return FileResponse(image_path)
if __name__ == '__main__':
import uvicorn

Loading…
Cancel
Save