|
|
from __future__ import annotations
|
|
|
from pathlib import Path
|
|
|
from typing import Any, Dict, List, Tuple, Optional
|
|
|
import os
|
|
|
import tomllib
|
|
|
import logging
|
|
|
logger = logging.getLogger("PTZTracker")
|
|
|
|
|
|
|
|
|
try:
|
|
|
from . import config # type: ignore
|
|
|
except Exception:
|
|
|
config = None # type: ignore
|
|
|
|
|
|
|
|
|
# ------------------------------ Порядок ключей ------------------------------
|
|
|
|
|
|
CAM_KEYS_ORDER = [
|
|
|
"ip", "username", "password", "ptz_channel",
|
|
|
"preset1", "preset2", "sweep_sign",
|
|
|
"preset1_deg", "preset2_deg",
|
|
|
"preset1_tilt_deg", "preset2_tilt_deg",
|
|
|
"sector_min_deg", "sector_max_deg",
|
|
|
"pan_sign", "tilt_sign", "pan_offset_deg", "tilt_offset_deg",
|
|
|
"mac",
|
|
|
|
|
|
"north_offset_deg",
|
|
|
"preset1_deg_geo", "preset2_deg_geo",
|
|
|
"sector_min_deg_geo", "sector_max_deg_geo",
|
|
|
]
|
|
|
|
|
|
# Мягкие дефолты для новых полей (и некоторых старых)
|
|
|
DEFAULTS: Dict[str, Any] = {
|
|
|
"ptz_channel": 1,
|
|
|
"preset1": None,
|
|
|
"preset2": None,
|
|
|
"sweep_sign": 1,
|
|
|
"preset1_deg": None,
|
|
|
"preset2_deg": None,
|
|
|
"preset1_tilt_deg": None,
|
|
|
"preset2_tilt_deg": None,
|
|
|
"sector_min_deg": None,
|
|
|
"sector_max_deg": None,
|
|
|
"pan_sign": None,
|
|
|
"tilt_sign": None,
|
|
|
"pan_offset_deg": 0.0,
|
|
|
"tilt_offset_deg": 0.0,
|
|
|
"mac": None,
|
|
|
"north_offset_deg": None,
|
|
|
"preset1_deg_geo": None,
|
|
|
"preset2_deg_geo": None,
|
|
|
"sector_min_deg_geo": None,
|
|
|
"sector_max_deg_geo": None,
|
|
|
}
|
|
|
|
|
|
|
|
|
# ------------------------------ IO helpers ------------------------------
|
|
|
|
|
|
def _cameras_file_path() -> Path:
|
|
|
env = os.getenv("CAMERAS_FILE")
|
|
|
if env:
|
|
|
return Path(env).resolve()
|
|
|
|
|
|
here = Path(__file__).resolve()
|
|
|
for parent in [here.parent, *here.parents]:
|
|
|
if parent.name == "ptz_tracker_modular":
|
|
|
return (parent / "cameras.toml").resolve()
|
|
|
|
|
|
# fallback — рядом с текущим модулем
|
|
|
return (here.parent / "cameras.toml").resolve()
|
|
|
|
|
|
|
|
|
def _apply_defaults(cfg: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""Мягко подставляет дефолты; в файл None не пишем."""
|
|
|
for k, v in DEFAULTS.items():
|
|
|
cfg.setdefault(k, v)
|
|
|
return cfg
|
|
|
|
|
|
|
|
|
def _load_all() -> Dict[int, Dict[str, Any]]:
|
|
|
path = _cameras_file_path()
|
|
|
if not path.exists():
|
|
|
return {}
|
|
|
if path.suffix.lower() != ".toml":
|
|
|
raise RuntimeError("Для веб-редактирования ожидается TOML (cameras.toml)")
|
|
|
with open(path, "rb") as f:
|
|
|
data = tomllib.load(f)
|
|
|
arr = data.get("camera") or data.get("cameras") or []
|
|
|
res: Dict[int, Dict[str, Any]] = {}
|
|
|
for item in arr:
|
|
|
cid = int(item["id"])
|
|
|
res[cid] = _apply_defaults(dict(item))
|
|
|
return res
|
|
|
|
|
|
|
|
|
def _dump_toml(cams: Dict[int, Dict[str, Any]]) -> str:
|
|
|
lines: List[str] = []
|
|
|
for cid in sorted(cams.keys()):
|
|
|
cfg = dict(cams[cid])
|
|
|
lines.append("[[camera]]")
|
|
|
lines.append(f"id = {cid}")
|
|
|
|
|
|
# Сначала печатаем ключи в фиксированном порядке
|
|
|
for k in CAM_KEYS_ORDER:
|
|
|
if k not in cfg or cfg[k] is None:
|
|
|
continue
|
|
|
v = cfg[k]
|
|
|
if isinstance(v, str):
|
|
|
v = v.replace("\\", "\\\\").replace('"', '\\"')
|
|
|
lines.append(f'{k} = "{v}"')
|
|
|
elif isinstance(v, bool):
|
|
|
lines.append(f'{k} = {"true" if v else "false"}')
|
|
|
else:
|
|
|
lines.append(f"{k} = {v}")
|
|
|
|
|
|
# Затем доп. ключи, которых нет в CAM_KEYS_ORDER
|
|
|
for k, v in cfg.items():
|
|
|
if k in CAM_KEYS_ORDER or k == "id":
|
|
|
continue
|
|
|
if v is None:
|
|
|
continue
|
|
|
if isinstance(v, str):
|
|
|
v = v.replace("\\", "\\\\").replace('"', '\\"')
|
|
|
lines.append(f'{k} = "{v}"')
|
|
|
elif isinstance(v, bool):
|
|
|
lines.append(f'{k} = {"true" if v else "false"}')
|
|
|
else:
|
|
|
lines.append(f"{k} = {v}")
|
|
|
|
|
|
lines.append("") # пустая строка между камерами
|
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
|
|
|
def _write_all(cams: Dict[int, Dict[str, Any]]) -> None:
|
|
|
path = _cameras_file_path()
|
|
|
txt = _dump_toml(cams)
|
|
|
path.write_text(txt, encoding="utf-8")
|
|
|
|
|
|
|
|
|
# ------------------------------ Публичные утилиты ------------------------------
|
|
|
|
|
|
def get_all() -> Dict[int, Dict[str, Any]]:
|
|
|
"""Вернёт все камеры с подставленными дефолтами (в памяти)."""
|
|
|
return _load_all()
|
|
|
|
|
|
|
|
|
def get(cid: int) -> Optional[Dict[str, Any]]:
|
|
|
"""Вернёт конфиг одной камеры или None."""
|
|
|
return _load_all().get(int(cid))
|
|
|
|
|
|
|
|
|
def update(cid: int, patch: Dict[str, Any]) -> Dict[int, Dict[str, Any]]:
|
|
|
"""
|
|
|
Патчит одну камеру по id и сохраняет файл.
|
|
|
Возвращает обновлённый словарь всех камер.
|
|
|
"""
|
|
|
cid = int(cid)
|
|
|
cams = _load_all()
|
|
|
if cid not in cams:
|
|
|
raise KeyError(f"Camera id {cid} not found")
|
|
|
base = cams[cid]
|
|
|
|
|
|
# нормализация типов
|
|
|
if "ptz_channel" in patch and patch["ptz_channel"] is not None:
|
|
|
patch["ptz_channel"] = int(patch["ptz_channel"])
|
|
|
if "sweep_sign" in patch and patch["sweep_sign"] is not None:
|
|
|
patch["sweep_sign"] = int(patch["sweep_sign"])
|
|
|
|
|
|
base.update(patch)
|
|
|
cams[cid] = _apply_defaults(base)
|
|
|
_write_all(cams)
|
|
|
return cams
|
|
|
|
|
|
|
|
|
# ------------------------------ CRUD (твоя логика) ------------------------------
|
|
|
|
|
|
def add_or_update_camera(cam: Dict[str, Any]) -> Tuple[int, Dict[int, Dict[str, Any]]]:
|
|
|
"""
|
|
|
Одиночная запись (как у тебя было).
|
|
|
cam: {id? , ip, username, password, ptz_channel, preset1, preset2, sweep_sign, ...}
|
|
|
Если id не задан – берём max+1. Возвращает (real_id, updated_dict).
|
|
|
"""
|
|
|
cams = _load_all()
|
|
|
|
|
|
# Вычисляем id
|
|
|
real_id = int(cam["id"]) if ("id" in cam and cam["id"] is not None) else (
|
|
|
(max(cams.keys()) + 1) if cams else 0
|
|
|
)
|
|
|
|
|
|
# Нормализация/дефолты
|
|
|
rec = dict(cam)
|
|
|
rec["id"] = real_id
|
|
|
if "ptz_channel" in rec and rec["ptz_channel"] is not None:
|
|
|
rec["ptz_channel"] = int(rec["ptz_channel"])
|
|
|
if "sweep_sign" in rec and rec["sweep_sign"] is not None:
|
|
|
rec["sweep_sign"] = int(rec["sweep_sign"])
|
|
|
|
|
|
cams[real_id] = _apply_defaults(rec)
|
|
|
_write_all(cams)
|
|
|
|
|
|
# Аккуратный лог: покажем, какие ID по этому IP уже есть (ch=1/ch=2)
|
|
|
ip = rec.get("ip")
|
|
|
ptz_id = None
|
|
|
pan_id = None
|
|
|
try:
|
|
|
for cid, c in cams.items():
|
|
|
if c.get("ip") == ip:
|
|
|
ch = int(c.get("ptz_channel", 1))
|
|
|
if ch == 1:
|
|
|
ptz_id = cid
|
|
|
elif ch == 2:
|
|
|
pan_id = cid
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
logger.info(
|
|
|
f"[CFG] add_or_update_camera ip={ip} -> saved id={real_id} ch={rec.get('ptz_channel', 1)}; "
|
|
|
f"present: PTZ id={ptz_id}, PAN id={pan_id} (total={len(cams)})"
|
|
|
)
|
|
|
|
|
|
return real_id, cams
|
|
|
|
|
|
|
|
|
def delete_camera(cid: int) -> Dict[int, Dict[str, Any]]:
|
|
|
cams = _load_all()
|
|
|
if cid in cams:
|
|
|
del cams[cid]
|
|
|
_write_all(cams)
|
|
|
return cams
|
|
|
|
|
|
|
|
|
# ------------------------------ ПАРНАЯ запись PTZ+PAN ------------------------------
|
|
|
|
|
|
def add_or_update_camera_pair(cam: Dict[str, Any]) -> Tuple[int, Dict[int, Dict[str, Any]]]:
|
|
|
"""
|
|
|
Создаёт/обновляет ДВЕ записи на один IP:
|
|
|
- PTZ (канал 1) с пресетами preset1/preset2
|
|
|
- PAN (канал 2) без пресетов
|
|
|
Если одна из записей существует — аккуратно обновит её и досоздаст вторую.
|
|
|
Возвращает (ptz_id, весь словарь камер).
|
|
|
"""
|
|
|
cams = _load_all()
|
|
|
|
|
|
# Входные данные
|
|
|
ip = cam["ip"]
|
|
|
username = cam.get("username")
|
|
|
password = cam.get("password")
|
|
|
sweep_sign = int(cam.get("sweep_sign", 1))
|
|
|
preset1 = cam.get("preset1")
|
|
|
preset2 = cam.get("preset2")
|
|
|
mac = cam.get("mac")
|
|
|
|
|
|
have = {cid: c for cid, c in cams.items() if c.get("ip") == ip}
|
|
|
by_ch: Dict[int, Tuple[int, Dict[str, Any]]] = {}
|
|
|
for cid, c in have.items():
|
|
|
try:
|
|
|
ch = int(c.get("ptz_channel", 1))
|
|
|
except Exception:
|
|
|
ch = 1
|
|
|
by_ch[ch] = (cid, c)
|
|
|
|
|
|
# --- PTZ (ch = 1) ---
|
|
|
ptz_patch = {
|
|
|
"ip": ip,
|
|
|
"username": username,
|
|
|
"password": password,
|
|
|
"ptz_channel": 1,
|
|
|
"preset1": preset1,
|
|
|
"preset2": preset2,
|
|
|
"sweep_sign": sweep_sign,
|
|
|
"mac": mac,
|
|
|
}
|
|
|
if 1 in by_ch:
|
|
|
ptz_id, base = by_ch[1]
|
|
|
# не затираем None-ами
|
|
|
for k, v in ptz_patch.items():
|
|
|
if v is not None:
|
|
|
base[k] = v
|
|
|
cams[ptz_id] = _apply_defaults(base)
|
|
|
else:
|
|
|
ptz_id = (max(cams.keys()) + 1) if cams else 0
|
|
|
rec = {"id": ptz_id, **ptz_patch}
|
|
|
cams[ptz_id] = _apply_defaults(rec)
|
|
|
|
|
|
# --- PAN (ch = 2) ---
|
|
|
pan_patch = {
|
|
|
"ip": ip,
|
|
|
"username": username,
|
|
|
"password": password,
|
|
|
"ptz_channel": 2,
|
|
|
"preset1": None,
|
|
|
"preset2": None,
|
|
|
"sweep_sign": sweep_sign,
|
|
|
"mac": mac,
|
|
|
}
|
|
|
if 2 in by_ch:
|
|
|
pan_id, base2 = by_ch[2]
|
|
|
# не затираем None-ами
|
|
|
for k, v in pan_patch.items():
|
|
|
if v is not None:
|
|
|
base2[k] = v
|
|
|
cams[pan_id] = _apply_defaults(base2)
|
|
|
else:
|
|
|
# подберём свободный id, не наступая на ptz_id
|
|
|
new_id = (max(cams.keys()) + 1) if cams else (1 if ptz_id == 0 else 0)
|
|
|
if new_id == ptz_id:
|
|
|
new_id += 1
|
|
|
rec2 = {"id": new_id, **pan_patch}
|
|
|
cams[new_id] = _apply_defaults(rec2)
|
|
|
|
|
|
_write_all(cams)
|
|
|
return ptz_id, cams
|