# ptz_tracker_modular/servers.py from __future__ import annotations import asyncio import logging import struct from typing import Any, Dict, Tuple, List import websockets # --- конфиг/утилы/состояние --- try: from . import config from .utils import json_dumps, json_loads from .state import ( video_clients, vue_control_clients, latest_jpeg_by_cam, detected_cameras, detection_queue, ptz_states, ) from .preview import publish_preview, maybe_downscale from .cameras_io import add_or_update_camera_pair, get_all as get_all_cameras except Exception: # запуск вне пакета (не рекомендуется) import config # type: ignore from utils import json_dumps, json_loads # type: ignore from state import ( # type: ignore video_clients, vue_control_clients, latest_jpeg_by_cam, detected_cameras, detection_queue, ptz_states, ) from preview import publish_preview, maybe_downscale # type: ignore from cameras_io import add_or_update_camera_pair, get_all as get_all_cameras # type: ignore # >>> импорт триггера дворника (ISAPI) try: from .wiper import trigger_wiper_once except Exception: # запасной путь try: from wiper import trigger_wiper_once # type: ignore except Exception: trigger_wiper_once = None # type: ignore logger = logging.getLogger("PTZTracker") PACKER_HI = struct.Struct(">HI") # cam_id:uint16, frame_id:uint32 # ===================== VIDEO WS ===================== async def video_ws_handler(ws, path=None) -> None: addr = getattr(ws, "remote_address", None) logger.info("[VIDEO WS] client connected: %s path=%s", addr, path) video_clients.add(ws) try: await ws.wait_closed() finally: video_clients.discard(ws) logger.info("[VIDEO WS] client disconnected: %s", addr) async def video_broadcaster() -> None: """ Раздает JPEG-кадры всем подписчикам video WS. Держит темп для каждого клиента отдельно и выкидывает тех, у кого буфер распух. """ client_state: Dict[Any, Dict[str, Any]] = {} min_interval_fast = config.VIDEO_MIN_INTERVAL_FAST min_interval_slow = config.VIDEO_MIN_INTERVAL_SLOW send_slow_sec = 0.02 buf_soft = 4 * 1024 * 1024 buf_hard = 8 * 1024 * 1024 loop_tick = 1.0 / 200.0 while True: try: if not video_clients or not latest_jpeg_by_cam: await asyncio.sleep(loop_tick) continue now = asyncio.get_running_loop().time() dead: set[Any] = set() for ws in list(video_clients): st = client_state.setdefault( ws, {"last_fid": {}, "min_interval": min_interval_fast, "last_sent": 0.0} ) if (now - st["last_sent"]) < st["min_interval"]: continue transport = getattr(ws, "transport", None) buf_sz = transport.get_write_buffer_size() if transport else 0 if buf_sz > buf_hard: try: await ws.close() except Exception: pass dead.add(ws) continue pending: List[Tuple[int, int, bytes]] = [] last_fid: Dict[int, int] = st["last_fid"] for cam_id, (fid, jpeg) in latest_jpeg_by_cam.items(): if last_fid.get(cam_id, -1) != fid: payload = PACKER_HI.pack(int(cam_id), int(fid)) + jpeg pending.append((cam_id, fid, payload)) if config.SEND_DUPLICATES_WHEN_IDLE and not pending: for cam_id, (fid, jpeg) in latest_jpeg_by_cam.items(): payload = PACKER_HI.pack(int(cam_id), int(fid)) + jpeg pending.append((cam_id, fid, payload)) if not pending: continue max_per_tick = 12 if st["min_interval"] <= (1.0 / 30.0) else 6 if len(pending) > max_per_tick: pending.sort(key=lambda x: x[1], reverse=True) pending = pending[:max_per_tick] t0 = asyncio.get_running_loop().time() try: send = ws.send for cam_id, fid, payload in pending: await send(payload) last_fid[cam_id] = fid except Exception: dead.add(ws) continue finally: st["last_sent"] = asyncio.get_running_loop().time() send_time = st["last_sent"] - t0 if (send_time > send_slow_sec) or (buf_sz > buf_soft): st["min_interval"] = min(st["min_interval"] * 1.25, min_interval_slow) else: st["min_interval"] = max(st["min_interval"] * 0.90, min_interval_fast) for d in dead: video_clients.discard(d) client_state.pop(d, None) except Exception as exc: logger.error("video_broadcaster error: %s", exc) await asyncio.sleep(loop_tick) # ===================== CONTROL WS ===================== async def _ctrl_send(ws, payload: Dict[str, Any]) -> None: try: await ws.send(json_dumps(payload)) except Exception: try: await ws.close() except Exception: pass vue_control_clients.discard(ws) async def handle_vue_control(ws, path=None) -> None: addr = getattr(ws, "remote_address", None) logger.info("[CTRL WS] client connected: %s path=%s", addr, path) vue_control_clients.add(ws) try: # начальная инициализация фронта await _ctrl_send(ws, {"type": "init", "data": [len(detected_cameras)] + sorted(detected_cameras)}) async for msg in ws: try: data = json_loads(msg) except Exception: logger.warning("[CTRL WS] bad JSON from %s: %r", addr, msg) continue mtype = data.get("type") # === переключение стратегии авто/ручной if mtype == "policy": auto = data.get("auto") logger.info("[CTRL WS] policy request from %s: auto=%r", addr, auto) if isinstance(auto, bool): try: import importlib import ptz_tracker_modular.config as _cfg # type: ignore _cfg.AUTO_STRATEGY = auto await _ctrl_send(ws, {"type": "policy_ack", "auto": _cfg.AUTO_STRATEGY}) logger.info("[CTRL WS] policy set to auto=%r", _cfg.AUTO_STRATEGY) except Exception as e: await _ctrl_send(ws, {"type": "policy_ack", "ok": False, "error": str(e)}) logger.exception("[CTRL WS] policy error: %s", e) continue # === полноэкран / выбор камеры if mtype == "numberCamera": try: payload_data = data.get("data", {}) canvas_id = payload_data.get("canvasId", None) if isinstance(canvas_id, str): canvas_id = int(canvas_id.strip()) elif isinstance(canvas_id, float): canvas_id = int(canvas_id) elif not isinstance(canvas_id, int): raise ValueError("canvasId must be int/str/float") payload = {"type": "numberCamera", "data": {"canvasId": canvas_id}} if detection_queue is not None: detection_queue.put_nowait(payload) await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": True}) logger.info("[CTRL WS] numberCamera -> %s", canvas_id) else: await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": False, "error": "no_queue"}) logger.warning("[CTRL WS] numberCamera: no_queue") except Exception as e: logger.exception("[CTRL WS] numberCamera error: %s", e) await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": False, "error": str(e)}) continue # === дворник (ISAPI) if mtype == "wiper": if trigger_wiper_once is None: await _ctrl_send(ws, {"type": "wiper_ack", "ok": False, "error": "wiper not available"}) logger.warning("[CTRL WS] wiper requested but not available") continue try: payload = data.get("data") or {} cam = int(payload.get("cam")) sec = int(payload.get("sec", 3)) logger.info("[CTRL WS] WIPER request from %s -> cam=%s sec=%s", addr, cam, sec) ok = await trigger_wiper_once(cam, sec) await _ctrl_send(ws, {"type": "wiper_ack", "ok": bool(ok), "cam": cam, "sec": sec}) logger.info("[CTRL WS] WIPER ack ok=%r cam=%s sec=%s", bool(ok), cam, sec) except Exception as e: logger.exception("[CTRL WS] wiper error: %s", e) await _ctrl_send(ws, {"type": "wiper_ack", "ok": False, "error": str(e)}) continue # === добавить КАМЕРУ (создать пару PTZ+PAN по IP) if mtype == "add_camera": """ Ожидаемый payload: { "type": "add_camera", "data": { "ip": "...", "username": "...", "password": "...", "preset1": "1", "preset2": "2", # для PTZ "ptz_channel": 1, # можно прислать 1 или 2 — игнорится, мы создаём пару "sweep_sign": 1 } } """ try: payload = data.get("data") or {} logger.info("[CTRL WS] add_camera from %s: %s", addr, payload) if "ip" not in payload: raise ValueError("missing 'ip'") # создаём/обновляем пару записей ptz_id, cams = add_or_update_camera_pair(payload) # пересобираем производные структуры try: config.reload_cameras() except Exception as e: logger.warning("reload_cameras failed: %s", e) await _ctrl_send(ws, { "type": "add_camera_ack", "ok": True, "ptz_id": ptz_id, "count": len(cams), }) logger.info("[CTRL WS] add_camera OK -> ptz_id=%s total=%s", ptz_id, len(cams)) except Exception as e: logger.exception("[CTRL WS] add_camera error: %s", e) await _ctrl_send(ws, {"type": "add_camera_ack", "ok": False, "error": str(e)}) continue # === список камер по запросу (опционально) if mtype == "list_cameras": try: cams = get_all_cameras() await _ctrl_send(ws, {"type": "list_cameras_ack", "ok": True, "cameras": cams}) logger.info("[CTRL WS] list_cameras -> %s items", len(cams)) except Exception as e: await _ctrl_send(ws, {"type": "list_cameras_ack", "ok": False, "error": str(e)}) logger.exception("[CTRL WS] list_cameras error: %s", e) continue logger.debug("[CTRL WS] unknown message type: %s", mtype) except Exception as exc: logger.warning("[CTRL WS] error: %s", exc) finally: vue_control_clients.discard(ws) logger.info("[CTRL WS] client disconnected: %s", addr) # ===================== CONTROL STATUS BROADCASTER ===================== async def control_broadcaster(interval_sec: float = 0.25) -> None: """ Периодически шлёт фронтенду агрегированный статус по всем камерам. Формат сообщения: { "type": "camera_status_batch", "items": [ { "cam": 0, "az": 123.4, "tilt": -12.3, "mode": "auto", "zoom_state": 0, "bearing_geo": 278.1, "delta_from_preset1_geo": -5.2, "attack_bearing_geo": 260.0, "attack_compass": "W", "last_seen": 1699999999.12 }, ... ] } """ while True: try: if not vue_control_clients: await asyncio.sleep(interval_sec) continue items: List[Dict[str, Any]] = [] for cam, st in list(ptz_states.items()): try: items.append({ "cam": cam, "az": st.get("az_deg"), "tilt": st.get("tilt_deg"), "mode": st.get("mode"), "zoom_state": st.get("zoom_state"), "bearing_geo": st.get("bearing_curr_geo"), "delta_from_preset1_geo": st.get("delta_from_preset1_geo"), "attack_bearing_geo": st.get("attack_bearing_geo"), "attack_compass": st.get("attack_compass"), "last_seen": st.get("last_seen"), }) except Exception: continue if items: payload = {"type": "camera_status_batch", "items": items} msg = json_dumps(payload) dead: set[Any] = set() for ws in list(vue_control_clients): try: await ws.send(msg) except Exception: dead.add(ws) for d in dead: vue_control_clients.discard(d) # (опционально) краткая сводка в лог if getattr(config, "LOG_STATUS_BATCH", False): ptz_ids = set(getattr(config, "PTZ_CAM_IDS", [])) line_parts = [] for it in items: cid = it.get("cam") if cid in ptz_ids: az = it.get("az") b = it.get("bearing_geo") d = it.get("delta_from_preset1_geo") a = it.get("attack_bearing_geo") s_az = f"az={az:.1f}°" if isinstance(az, (int, float)) else "az=n/a" s_b = f" geo={b:.1f}°" if isinstance(b, (int, float)) else "" s_d = f" dP1={d:+.1f}°" if isinstance(d, (int, float)) else "" s_a = f" atk={a:.1f}°" if isinstance(a, (int, float)) else "" line_parts.append(f"cam{cid}: {s_az}{s_b}{s_d}{s_a}") if line_parts: logger.info("[STATUS] " + " | ".join(line_parts)) except Exception as e: logger.debug("control_broadcaster error: %s", e) await asyncio.sleep(interval_sec)