You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

385 lines
16 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# ptz_tracker_modular/servers.py
from __future__ import annotations
import asyncio
import logging
import struct
from typing import Any, Dict, Tuple, List
import websockets
# --- конфиг/утилы/состояние ---
try:
from . import config
from .utils import json_dumps, json_loads
from .state import (
video_clients,
vue_control_clients,
latest_jpeg_by_cam,
detected_cameras,
detection_queue,
ptz_states,
)
from .preview import publish_preview, maybe_downscale
from .cameras_io import add_or_update_camera_pair, get_all as get_all_cameras
except Exception: # запуск вне пакета (не рекомендуется)
import config # type: ignore
from utils import json_dumps, json_loads # type: ignore
from state import ( # type: ignore
video_clients,
vue_control_clients,
latest_jpeg_by_cam,
detected_cameras,
detection_queue,
ptz_states,
)
from preview import publish_preview, maybe_downscale # type: ignore
from cameras_io import add_or_update_camera_pair, get_all as get_all_cameras # type: ignore
# >>> импорт триггера дворника (ISAPI)
try:
from .wiper import trigger_wiper_once
except Exception: # запасной путь
try:
from wiper import trigger_wiper_once # type: ignore
except Exception:
trigger_wiper_once = None # type: ignore
logger = logging.getLogger("PTZTracker")
PACKER_HI = struct.Struct(">HI") # cam_id:uint16, frame_id:uint32
# ===================== VIDEO WS =====================
async def video_ws_handler(ws, path=None) -> None:
addr = getattr(ws, "remote_address", None)
logger.info("[VIDEO WS] client connected: %s path=%s", addr, path)
video_clients.add(ws)
try:
await ws.wait_closed()
finally:
video_clients.discard(ws)
logger.info("[VIDEO WS] client disconnected: %s", addr)
async def video_broadcaster() -> None:
"""
Раздает JPEG-кадры всем подписчикам video WS.
Держит темп для каждого клиента отдельно и выкидывает тех, у кого буфер распух.
"""
client_state: Dict[Any, Dict[str, Any]] = {}
min_interval_fast = config.VIDEO_MIN_INTERVAL_FAST
min_interval_slow = config.VIDEO_MIN_INTERVAL_SLOW
send_slow_sec = 0.02
buf_soft = 4 * 1024 * 1024
buf_hard = 8 * 1024 * 1024
loop_tick = 1.0 / 200.0
while True:
try:
if not video_clients or not latest_jpeg_by_cam:
await asyncio.sleep(loop_tick)
continue
now = asyncio.get_running_loop().time()
dead: set[Any] = set()
for ws in list(video_clients):
st = client_state.setdefault(
ws, {"last_fid": {}, "min_interval": min_interval_fast, "last_sent": 0.0}
)
if (now - st["last_sent"]) < st["min_interval"]:
continue
transport = getattr(ws, "transport", None)
buf_sz = transport.get_write_buffer_size() if transport else 0
if buf_sz > buf_hard:
try:
await ws.close()
except Exception:
pass
dead.add(ws)
continue
pending: List[Tuple[int, int, bytes]] = []
last_fid: Dict[int, int] = st["last_fid"]
for cam_id, (fid, jpeg) in latest_jpeg_by_cam.items():
if last_fid.get(cam_id, -1) != fid:
payload = PACKER_HI.pack(int(cam_id), int(fid)) + jpeg
pending.append((cam_id, fid, payload))
if config.SEND_DUPLICATES_WHEN_IDLE and not pending:
for cam_id, (fid, jpeg) in latest_jpeg_by_cam.items():
payload = PACKER_HI.pack(int(cam_id), int(fid)) + jpeg
pending.append((cam_id, fid, payload))
if not pending:
continue
max_per_tick = 12 if st["min_interval"] <= (1.0 / 30.0) else 6
if len(pending) > max_per_tick:
pending.sort(key=lambda x: x[1], reverse=True)
pending = pending[:max_per_tick]
t0 = asyncio.get_running_loop().time()
try:
send = ws.send
for cam_id, fid, payload in pending:
await send(payload)
last_fid[cam_id] = fid
except Exception:
dead.add(ws)
continue
finally:
st["last_sent"] = asyncio.get_running_loop().time()
send_time = st["last_sent"] - t0
if (send_time > send_slow_sec) or (buf_sz > buf_soft):
st["min_interval"] = min(st["min_interval"] * 1.25, min_interval_slow)
else:
st["min_interval"] = max(st["min_interval"] * 0.90, min_interval_fast)
for d in dead:
video_clients.discard(d)
client_state.pop(d, None)
except Exception as exc:
logger.error("video_broadcaster error: %s", exc)
await asyncio.sleep(loop_tick)
# ===================== CONTROL WS =====================
async def _ctrl_send(ws, payload: Dict[str, Any]) -> None:
try:
await ws.send(json_dumps(payload))
except Exception:
try:
await ws.close()
except Exception:
pass
vue_control_clients.discard(ws)
async def handle_vue_control(ws, path=None) -> None:
addr = getattr(ws, "remote_address", None)
logger.info("[CTRL WS] client connected: %s path=%s", addr, path)
vue_control_clients.add(ws)
try:
# начальная инициализация фронта
await _ctrl_send(ws, {"type": "init", "data": [len(detected_cameras)] + sorted(detected_cameras)})
async for msg in ws:
try:
data = json_loads(msg)
except Exception:
logger.warning("[CTRL WS] bad JSON from %s: %r", addr, msg)
continue
mtype = data.get("type")
# === переключение стратегии авто/ручной
if mtype == "policy":
auto = data.get("auto")
logger.info("[CTRL WS] policy request from %s: auto=%r", addr, auto)
if isinstance(auto, bool):
try:
import importlib
import ptz_tracker_modular.config as _cfg # type: ignore
_cfg.AUTO_STRATEGY = auto
await _ctrl_send(ws, {"type": "policy_ack", "auto": _cfg.AUTO_STRATEGY})
logger.info("[CTRL WS] policy set to auto=%r", _cfg.AUTO_STRATEGY)
except Exception as e:
await _ctrl_send(ws, {"type": "policy_ack", "ok": False, "error": str(e)})
logger.exception("[CTRL WS] policy error: %s", e)
continue
# === полноэкран / выбор камеры
if mtype == "numberCamera":
try:
payload_data = data.get("data", {})
canvas_id = payload_data.get("canvasId", None)
if isinstance(canvas_id, str):
canvas_id = int(canvas_id.strip())
elif isinstance(canvas_id, float):
canvas_id = int(canvas_id)
elif not isinstance(canvas_id, int):
raise ValueError("canvasId must be int/str/float")
payload = {"type": "numberCamera", "data": {"canvasId": canvas_id}}
if detection_queue is not None:
detection_queue.put_nowait(payload)
await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": True})
logger.info("[CTRL WS] numberCamera -> %s", canvas_id)
else:
await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": False, "error": "no_queue"})
logger.warning("[CTRL WS] numberCamera: no_queue")
except Exception as e:
logger.exception("[CTRL WS] numberCamera error: %s", e)
await _ctrl_send(ws, {"type": "numberCamera_ack", "ok": False, "error": str(e)})
continue
# === дворник (ISAPI)
if mtype == "wiper":
if trigger_wiper_once is None:
await _ctrl_send(ws, {"type": "wiper_ack", "ok": False, "error": "wiper not available"})
logger.warning("[CTRL WS] wiper requested but not available")
continue
try:
payload = data.get("data") or {}
cam = int(payload.get("cam"))
sec = int(payload.get("sec", 3))
logger.info("[CTRL WS] WIPER request from %s -> cam=%s sec=%s", addr, cam, sec)
ok = await trigger_wiper_once(cam, sec)
await _ctrl_send(ws, {"type": "wiper_ack", "ok": bool(ok), "cam": cam, "sec": sec})
logger.info("[CTRL WS] WIPER ack ok=%r cam=%s sec=%s", bool(ok), cam, sec)
except Exception as e:
logger.exception("[CTRL WS] wiper error: %s", e)
await _ctrl_send(ws, {"type": "wiper_ack", "ok": False, "error": str(e)})
continue
# === добавить КАМЕРУ (создать пару PTZ+PAN по IP)
if mtype == "add_camera":
"""
Ожидаемый payload:
{
"type": "add_camera",
"data": {
"ip": "...", "username": "...", "password": "...",
"preset1": "1", "preset2": "2", # для PTZ
"ptz_channel": 1, # можно прислать 1 или 2 — игнорится, мы создаём пару
"sweep_sign": 1
}
}
"""
try:
payload = data.get("data") or {}
logger.info("[CTRL WS] add_camera from %s: %s", addr, payload)
if "ip" not in payload:
raise ValueError("missing 'ip'")
# создаём/обновляем пару записей
ptz_id, cams = add_or_update_camera_pair(payload)
# пересобираем производные структуры
try:
config.reload_cameras()
except Exception as e:
logger.warning("reload_cameras failed: %s", e)
await _ctrl_send(ws, {
"type": "add_camera_ack",
"ok": True,
"ptz_id": ptz_id,
"count": len(cams),
})
logger.info("[CTRL WS] add_camera OK -> ptz_id=%s total=%s", ptz_id, len(cams))
except Exception as e:
logger.exception("[CTRL WS] add_camera error: %s", e)
await _ctrl_send(ws, {"type": "add_camera_ack", "ok": False, "error": str(e)})
continue
# === список камер по запросу (опционально)
if mtype == "list_cameras":
try:
cams = get_all_cameras()
await _ctrl_send(ws, {"type": "list_cameras_ack", "ok": True, "cameras": cams})
logger.info("[CTRL WS] list_cameras -> %s items", len(cams))
except Exception as e:
await _ctrl_send(ws, {"type": "list_cameras_ack", "ok": False, "error": str(e)})
logger.exception("[CTRL WS] list_cameras error: %s", e)
continue
logger.debug("[CTRL WS] unknown message type: %s", mtype)
except Exception as exc:
logger.warning("[CTRL WS] error: %s", exc)
finally:
vue_control_clients.discard(ws)
logger.info("[CTRL WS] client disconnected: %s", addr)
# ===================== CONTROL STATUS BROADCASTER =====================
async def control_broadcaster(interval_sec: float = 0.25) -> None:
"""
Периодически шлёт фронтенду агрегированный статус по всем камерам.
Формат сообщения:
{
"type": "camera_status_batch",
"items": [
{
"cam": 0,
"az": 123.4, "tilt": -12.3,
"mode": "auto", "zoom_state": 0,
"bearing_geo": 278.1,
"delta_from_preset1_geo": -5.2,
"attack_bearing_geo": 260.0,
"attack_compass": "W",
"last_seen": 1699999999.12
}, ...
]
}
"""
while True:
try:
if not vue_control_clients:
await asyncio.sleep(interval_sec)
continue
items: List[Dict[str, Any]] = []
for cam, st in list(ptz_states.items()):
try:
items.append({
"cam": cam,
"az": st.get("az_deg"),
"tilt": st.get("tilt_deg"),
"mode": st.get("mode"),
"zoom_state": st.get("zoom_state"),
"bearing_geo": st.get("bearing_curr_geo"),
"delta_from_preset1_geo": st.get("delta_from_preset1_geo"),
"attack_bearing_geo": st.get("attack_bearing_geo"),
"attack_compass": st.get("attack_compass"),
"last_seen": st.get("last_seen"),
})
except Exception:
continue
if items:
payload = {"type": "camera_status_batch", "items": items}
msg = json_dumps(payload)
dead: set[Any] = set()
for ws in list(vue_control_clients):
try:
await ws.send(msg)
except Exception:
dead.add(ws)
for d in dead:
vue_control_clients.discard(d)
# (опционально) краткая сводка в лог
if getattr(config, "LOG_STATUS_BATCH", False):
ptz_ids = set(getattr(config, "PTZ_CAM_IDS", []))
line_parts = []
for it in items:
cid = it.get("cam")
if cid in ptz_ids:
az = it.get("az")
b = it.get("bearing_geo")
d = it.get("delta_from_preset1_geo")
a = it.get("attack_bearing_geo")
s_az = f"az={az:.1f}°" if isinstance(az, (int, float)) else "az=n/a"
s_b = f" geo={b:.1f}°" if isinstance(b, (int, float)) else ""
s_d = f" dP1={d:+.1f}°" if isinstance(d, (int, float)) else ""
s_a = f" atk={a:.1f}°" if isinstance(a, (int, float)) else ""
line_parts.append(f"cam{cid}: {s_az}{s_b}{s_d}{s_a}")
if line_parts:
logger.info("[STATUS] " + " | ".join(line_parts))
except Exception as e:
logger.debug("control_broadcaster error: %s", e)
await asyncio.sleep(interval_sec)