|
|
# ptz_tracker_modular/servers.py
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import asyncio
|
|
|
import logging
|
|
|
import struct
|
|
|
from typing import Any, Dict, Tuple, List
|
|
|
|
|
|
import websockets
|
|
|
|
|
|
# --- конфиг/утилы/состояние ---
|
|
|
try:
|
|
|
from . import config
|
|
|
from .utils import json_dumps, json_loads
|
|
|
from .state import (
|
|
|
video_clients,
|
|
|
vue_control_clients,
|
|
|
latest_jpeg_by_cam,
|
|
|
detected_cameras,
|
|
|
detection_queue,
|
|
|
ptz_states,
|
|
|
)
|
|
|
from .preview import publish_preview, maybe_downscale
|
|
|
from .cameras_io import add_or_update_camera_pair, get_all as get_all_cameras
|
|
|
except Exception: # запуск вне пакета (не рекомендуется)
|
|
|
import config # type: ignore
|
|
|
from utils import json_dumps, json_loads # type: ignore
|
|
|
from state import ( # type: ignore
|
|
|
video_clients,
|
|
|
vue_control_clients,
|
|
|
latest_jpeg_by_cam,
|
|
|
detected_cameras,
|
|
|
detection_queue,
|
|
|
ptz_states,
|
|
|
)
|
|
|
from preview import publish_preview, maybe_downscale # type: ignore
|
|
|
from cameras_io import add_or_update_camera_pair, get_all as get_all_cameras # type: ignore
|
|
|
|
|
|
# >>> импорт триггера дворника (ISAPI)
|
|
|
try:
|
|
|
from .wiper import trigger_wiper_once
|
|
|
except Exception: # запасной путь
|
|
|
try:
|
|
|
from wiper import trigger_wiper_once # type: ignore
|
|
|
except Exception:
|
|
|
trigger_wiper_once = None # type: ignore
|
|
|
|
|
|
logger = logging.getLogger("PTZTracker")
|
|
|
|
|
|
PACKER_HI = struct.Struct(">HI") # cam_id:uint16, frame_id:uint32
|
|
|
|
|
|
|
|
|
# ===================== VIDEO WS =====================
|
|
|
|
|
|
async def video_ws_handler(ws, path=None) -> None:
|
|
|
addr = getattr(ws, "remote_address", None)
|
|
|
logger.info("[VIDEO WS] client connected: %s path=%s", addr, path)
|
|
|
video_clients.add(ws)
|
|
|
try:
|
|
|
await ws.wait_closed()
|
|
|
finally:
|
|
|
video_clients.discard(ws)
|
|
|
logger.info("[VIDEO WS] client disconnected: %s", addr)
|
|
|
|
|
|
|
|
|
async def video_broadcaster() -> None:
|
|
|
"""
|
|
|
Раздает JPEG-кадры всем подписчикам video WS.
|
|
|
Держит темп для каждого клиента отдельно и выкидывает тех, у кого буфер распух.
|
|
|
"""
|
|
|
client_state: Dict[Any, Dict[str, Any]] = {}
|
|
|
min_interval_fast = config.VIDEO_MIN_INTERVAL_FAST
|
|
|
min_interval_slow = config.VIDEO_MIN_INTERVAL_SLOW
|
|
|
send_slow_sec = 0.02
|
|
|
buf_soft = 4 * 1024 * 1024
|
|
|
buf_hard = 8 * 1024 * 1024
|
|
|
loop_tick = 1.0 / 200.0
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
if not video_clients or not latest_jpeg_by_cam:
|
|
|
await asyncio.sleep(loop_tick)
|
|
|
continue
|
|
|
|
|
|
now = asyncio.get_running_loop().time()
|
|
|
dead: set[Any] = set()
|
|
|
|
|
|
for ws in list(video_clients):
|
|
|
st = client_state.setdefault(
|
|
|
ws, {"last_fid": {}, "min_interval": min_interval_fast, "last_sent": 0.0}
|
|
|
)
|
|
|
if (now - st["last_sent"]) < st["min_interval"]:
|
|
|
continue
|
|
|
|
|
|
transport = getattr(ws, "transport", None)
|
|
|
buf_sz = transport.get_write_buffer_size() if transport else 0
|
|
|
if buf_sz > buf_hard:
|
|
|
try:
|
|
|
await ws.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
dead.add(ws)
|
|
|
continue
|
|
|
|
|
|
pending: List[Tuple[int, int, bytes]] = []
|
|
|
last_fid: Dict[int, int] = st["last_fid"]
|
|
|
|
|
|
for cam_id, (fid, jpeg) in latest_jpeg_by_cam.items():
|
|
|
if last_fid.get(cam_id, -1) != fid:
|
|
|
payload = PACKER_HI.pack(int(cam_id), int(fid)) + jpeg
|
|
|
pending.append((cam_id, fid, payload))
|
|
|
|
|
|
if config.SEND_DUPLICATES_WHEN_IDLE and not pending:
|
|
|
for cam_id, (fid, jpeg) in latest_jpeg_by_cam.items():
|
|
|
payload = PACKER_HI.pack(int(cam_id), int(fid)) + jpeg
|
|
|
pending.append((cam_id, fid, payload))
|
|
|
|
|
|
if not pending:
|
|
|
continue
|
|
|
|
|
|
max_per_tick = 12 if st["min_interval"] <= (1.0 / 30.0) else 6
|
|
|
if len(pending) > max_per_tick:
|
|
|
pending.sort(key=lambda x: x[1], reverse=True)
|
|
|
pending = pending[:max_per_tick]
|
|
|
|
|
|
t0 = asyncio.get_running_loop().time()
|
|
|
try:
|
|
|
send = ws.send
|
|
|
for cam_id, fid, payload in pending:
|
|
|
await send(payload)
|
|
|
last_fid[cam_id] = fid
|
|
|
except Exception:
|
|
|
dead.add(ws)
|
|
|
continue
|
|
|
finally:
|
|
|
st["last_sent"] = asyncio.get_running_loop().time()
|
|
|
|
|
|
send_time = st["last_sent"] - t0
|
|
|
if (send_time > send_slow_sec) or (buf_sz > buf_soft):
|
|
|
st["min_interval"] = min(st["min_interval"] * 1.25, min_interval_slow)
|
|
|
else:
|
|
|
st["min_interval"] = max(st["min_interval"] * 0.90, min_interval_fast)
|
|
|
|
|
|
for d in dead:
|
|
|
video_clients.discard(d)
|
|
|
client_state.pop(d, None)
|
|
|
|
|
|
except Exception as exc:
|
|
|
logger.error("video_broadcaster error: %s", exc)
|
|
|
|
|
|
await asyncio.sleep(loop_tick)
|
|
|
|
|
|
|
|
|
# ===================== CONTROL WS =====================
|
|
|
|
|
|
async def _ctrl_send(ws, payload: Dict[str, Any]) -> None:
|
|
|
try:
|
|
|
await ws.send(json_dumps(payload))
|
|
|
except Exception:
|
|
|
try:
|
|
|
await ws.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
vue_control_clients.discard(ws)
|
|
|
|
|
|
|
|
|
async def handle_vue_control(ws, path=None) -> None:
|
|
|
addr = getattr(ws, "remote_address", None)
|
|
|
logger.info("[CTRL WS] client connected: %s path=%s", addr, path)
|
|
|
vue_control_clients.add(ws)
|
|
|
try:
|
|
|
# начальная инициализация фронта
|
|
|
await _ctrl_send(ws, {"type": "init", "data": [len(detected_cameras)] + sorted(detected_cameras)})
|
|
|
|
|
|
async for msg in ws:
|
|
|
try:
|
|
|
data = json_loads(msg)
|
|
|
except Exception:
|
|
|
logger.warning("[CTRL WS] bad JSON from %s: %r", addr, msg)
|
|
|
continue
|
|
|
|
|
|
mtype = data.get("type")
|
|
|
|
|
|
# === переключение стратегии авто/ручной
|
|
|
if mtype == "policy":
|
|
|
auto = data.get("auto")
|
|
|
logger.info("[CTRL WS] policy request from %s: auto=%r", addr, auto)
|
|
|
if isinstance(auto, bool):
|
|
|
try:
|
|
|
import importlib
|
|
|
import ptz_tracker_modular.config as _cfg # type: ignore
|
|
|
_cfg.AUTO_STRATEGY = auto
|
|
|
await _ctrl_send(ws, {"type": "policy_ack", "auto": _cfg.AUTO_STRATEGY})
|
|
|
logger.info("[CTRL WS] policy set to auto=%r", _cfg.AUTO_STRATEGY)
|
|
|
except Exception as e:
|
|
|
await _ctrl_send(ws, {"type": "policy_ack", "ok": False, "error": str(e)})
|
|
|
logger.exception("[CTRL WS] policy error: %s", e)
|
|
|
continue
|
|
|
|
|
|
# === полноэкран / выбор камеры
|
|
|
if mtype == "numberCamera":
|
|
|
try:
|
|
|
payload_data = data.get("data", {})
|
|
|
canvas_id = payload_data.get("canvasId", None)
|
|
|
if isinstance(canvas_id, str):
|
|
|
canvas_id = int(canvas_id.strip())
|
|
|
elif isinstance(canvas_id, float):
|
|
|
canvas_id = int(canvas_id)
|
|
|
elif not isinstance(canvas_id, int):
|
|
|
raise ValueError("canvasId must be int/str/float")
|
|
|
|
|
|
payload = {"type": "numberCamera", "data": {"canvasId": canvas_id}}
|
|
|
if detection_queue is not None:
|
|
|
detection_queue.put_nowait(payload)
|
|
|
await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": True})
|
|
|
logger.info("[CTRL WS] numberCamera -> %s", canvas_id)
|
|
|
else:
|
|
|
await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": False, "error": "no_queue"})
|
|
|
logger.warning("[CTRL WS] numberCamera: no_queue")
|
|
|
except Exception as e:
|
|
|
logger.exception("[CTRL WS] numberCamera error: %s", e)
|
|
|
await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": False, "error": str(e)})
|
|
|
continue
|
|
|
|
|
|
# === дворник (ISAPI)
|
|
|
if mtype == "wiper":
|
|
|
if trigger_wiper_once is None:
|
|
|
await _ctrl_send(ws, {"type": "wiper_ack", "ok": False, "error": "wiper not available"})
|
|
|
logger.warning("[CTRL WS] wiper requested but not available")
|
|
|
continue
|
|
|
try:
|
|
|
payload = data.get("data") or {}
|
|
|
cam = int(payload.get("cam"))
|
|
|
sec = int(payload.get("sec", 3))
|
|
|
logger.info("[CTRL WS] WIPER request from %s -> cam=%s sec=%s", addr, cam, sec)
|
|
|
|
|
|
ok = await trigger_wiper_once(cam, sec)
|
|
|
await _ctrl_send(ws, {"type": "wiper_ack", "ok": bool(ok), "cam": cam, "sec": sec})
|
|
|
logger.info("[CTRL WS] WIPER ack ok=%r cam=%s sec=%s", bool(ok), cam, sec)
|
|
|
except Exception as e:
|
|
|
logger.exception("[CTRL WS] wiper error: %s", e)
|
|
|
await _ctrl_send(ws, {"type": "wiper_ack", "ok": False, "error": str(e)})
|
|
|
continue
|
|
|
|
|
|
# === добавить КАМЕРУ (создать пару PTZ+PAN по IP)
|
|
|
if mtype == "add_camera":
|
|
|
"""
|
|
|
Ожидаемый payload:
|
|
|
{
|
|
|
"type": "add_camera",
|
|
|
"data": {
|
|
|
"ip": "...", "username": "...", "password": "...",
|
|
|
"preset1": "1", "preset2": "2", # для PTZ
|
|
|
"ptz_channel": 1, # можно прислать 1 или 2 — игнорится, мы создаём пару
|
|
|
"sweep_sign": 1
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
try:
|
|
|
payload = data.get("data") or {}
|
|
|
logger.info("[CTRL WS] add_camera from %s: %s", addr, payload)
|
|
|
if "ip" not in payload:
|
|
|
raise ValueError("missing 'ip'")
|
|
|
# создаём/обновляем пару записей
|
|
|
ptz_id, cams = add_or_update_camera_pair(payload)
|
|
|
# пересобираем производные структуры
|
|
|
try:
|
|
|
config.reload_cameras()
|
|
|
except Exception as e:
|
|
|
logger.warning("reload_cameras failed: %s", e)
|
|
|
|
|
|
await _ctrl_send(ws, {
|
|
|
"type": "add_camera_ack",
|
|
|
"ok": True,
|
|
|
"ptz_id": ptz_id,
|
|
|
"count": len(cams),
|
|
|
})
|
|
|
logger.info("[CTRL WS] add_camera OK -> ptz_id=%s total=%s", ptz_id, len(cams))
|
|
|
except Exception as e:
|
|
|
logger.exception("[CTRL WS] add_camera error: %s", e)
|
|
|
await _ctrl_send(ws, {"type": "add_camera_ack", "ok": False, "error": str(e)})
|
|
|
continue
|
|
|
|
|
|
# === список камер по запросу (опционально)
|
|
|
if mtype == "list_cameras":
|
|
|
try:
|
|
|
cams = get_all_cameras()
|
|
|
await _ctrl_send(ws, {"type": "list_cameras_ack", "ok": True, "cameras": cams})
|
|
|
logger.info("[CTRL WS] list_cameras -> %s items", len(cams))
|
|
|
except Exception as e:
|
|
|
await _ctrl_send(ws, {"type": "list_cameras_ack", "ok": False, "error": str(e)})
|
|
|
logger.exception("[CTRL WS] list_cameras error: %s", e)
|
|
|
continue
|
|
|
|
|
|
logger.debug("[CTRL WS] unknown message type: %s", mtype)
|
|
|
|
|
|
except Exception as exc:
|
|
|
logger.warning("[CTRL WS] error: %s", exc)
|
|
|
finally:
|
|
|
vue_control_clients.discard(ws)
|
|
|
logger.info("[CTRL WS] client disconnected: %s", addr)
|
|
|
|
|
|
|
|
|
# ===================== CONTROL STATUS BROADCASTER =====================
|
|
|
|
|
|
async def control_broadcaster(interval_sec: float = 0.25) -> None:
|
|
|
"""
|
|
|
Периодически шлёт фронтенду агрегированный статус по всем камерам.
|
|
|
Формат сообщения:
|
|
|
{
|
|
|
"type": "camera_status_batch",
|
|
|
"items": [
|
|
|
{
|
|
|
"cam": 0,
|
|
|
"az": 123.4, "tilt": -12.3,
|
|
|
"mode": "auto", "zoom_state": 0,
|
|
|
"bearing_geo": 278.1,
|
|
|
"delta_from_preset1_geo": -5.2,
|
|
|
"attack_bearing_geo": 260.0,
|
|
|
"attack_compass": "W",
|
|
|
"last_seen": 1699999999.12
|
|
|
}, ...
|
|
|
]
|
|
|
}
|
|
|
"""
|
|
|
while True:
|
|
|
try:
|
|
|
if not vue_control_clients:
|
|
|
await asyncio.sleep(interval_sec)
|
|
|
continue
|
|
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
|
for cam, st in list(ptz_states.items()):
|
|
|
try:
|
|
|
items.append({
|
|
|
"cam": cam,
|
|
|
"az": st.get("az_deg"),
|
|
|
"tilt": st.get("tilt_deg"),
|
|
|
"mode": st.get("mode"),
|
|
|
"zoom_state": st.get("zoom_state"),
|
|
|
"bearing_geo": st.get("bearing_curr_geo"),
|
|
|
"delta_from_preset1_geo": st.get("delta_from_preset1_geo"),
|
|
|
"attack_bearing_geo": st.get("attack_bearing_geo"),
|
|
|
"attack_compass": st.get("attack_compass"),
|
|
|
"last_seen": st.get("last_seen"),
|
|
|
})
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
if items:
|
|
|
payload = {"type": "camera_status_batch", "items": items}
|
|
|
msg = json_dumps(payload)
|
|
|
dead: set[Any] = set()
|
|
|
for ws in list(vue_control_clients):
|
|
|
try:
|
|
|
await ws.send(msg)
|
|
|
except Exception:
|
|
|
dead.add(ws)
|
|
|
for d in dead:
|
|
|
vue_control_clients.discard(d)
|
|
|
|
|
|
# (опционально) краткая сводка в лог
|
|
|
if getattr(config, "LOG_STATUS_BATCH", False):
|
|
|
ptz_ids = set(getattr(config, "PTZ_CAM_IDS", []))
|
|
|
line_parts = []
|
|
|
for it in items:
|
|
|
cid = it.get("cam")
|
|
|
if cid in ptz_ids:
|
|
|
az = it.get("az")
|
|
|
b = it.get("bearing_geo")
|
|
|
d = it.get("delta_from_preset1_geo")
|
|
|
a = it.get("attack_bearing_geo")
|
|
|
s_az = f"az={az:.1f}°" if isinstance(az, (int, float)) else "az=n/a"
|
|
|
s_b = f" geo={b:.1f}°" if isinstance(b, (int, float)) else ""
|
|
|
s_d = f" dP1={d:+.1f}°" if isinstance(d, (int, float)) else ""
|
|
|
s_a = f" atk={a:.1f}°" if isinstance(a, (int, float)) else ""
|
|
|
line_parts.append(f"cam{cid}: {s_az}{s_b}{s_d}{s_a}")
|
|
|
if line_parts:
|
|
|
logger.info("[STATUS] " + " | ".join(line_parts))
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.debug("control_broadcaster error: %s", e)
|
|
|
|
|
|
await asyncio.sleep(interval_sec)
|