You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
9.9 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional
import os
import tomllib
import logging
logger = logging.getLogger("PTZTracker")
try:
from . import config # type: ignore
except Exception:
config = None # type: ignore
# ------------------------------ Порядок ключей ------------------------------
CAM_KEYS_ORDER = [
"ip", "username", "password", "ptz_channel",
"preset1", "preset2", "sweep_sign",
"preset1_deg", "preset2_deg",
"preset1_tilt_deg", "preset2_tilt_deg",
"sector_min_deg", "sector_max_deg",
"pan_sign", "tilt_sign", "pan_offset_deg", "tilt_offset_deg",
"mac",
"north_offset_deg",
"preset1_deg_geo", "preset2_deg_geo",
"sector_min_deg_geo", "sector_max_deg_geo",
]
# Мягкие дефолты для новых полей (и некоторых старых)
DEFAULTS: Dict[str, Any] = {
"ptz_channel": 1,
"preset1": None,
"preset2": None,
"sweep_sign": 1,
"preset1_deg": None,
"preset2_deg": None,
"preset1_tilt_deg": None,
"preset2_tilt_deg": None,
"sector_min_deg": None,
"sector_max_deg": None,
"pan_sign": None,
"tilt_sign": None,
"pan_offset_deg": 0.0,
"tilt_offset_deg": 0.0,
"mac": None,
"north_offset_deg": None,
"preset1_deg_geo": None,
"preset2_deg_geo": None,
"sector_min_deg_geo": None,
"sector_max_deg_geo": None,
}
# ------------------------------ IO helpers ------------------------------
def _cameras_file_path() -> Path:
env = os.getenv("CAMERAS_FILE")
if env:
return Path(env).resolve()
here = Path(__file__).resolve()
for parent in [here.parent, *here.parents]:
if parent.name == "ptz_tracker_modular":
return (parent / "cameras.toml").resolve()
# fallback — рядом с текущим модулем
return (here.parent / "cameras.toml").resolve()
def _apply_defaults(cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Мягко подставляет дефолты; в файл None не пишем."""
for k, v in DEFAULTS.items():
cfg.setdefault(k, v)
return cfg
def _load_all() -> Dict[int, Dict[str, Any]]:
path = _cameras_file_path()
if not path.exists():
return {}
if path.suffix.lower() != ".toml":
raise RuntimeError("Для веб-редактирования ожидается TOML (cameras.toml)")
with open(path, "rb") as f:
data = tomllib.load(f)
arr = data.get("camera") or data.get("cameras") or []
res: Dict[int, Dict[str, Any]] = {}
for item in arr:
cid = int(item["id"])
res[cid] = _apply_defaults(dict(item))
return res
def _dump_toml(cams: Dict[int, Dict[str, Any]]) -> str:
lines: List[str] = []
for cid in sorted(cams.keys()):
cfg = dict(cams[cid])
lines.append("[[camera]]")
lines.append(f"id = {cid}")
# Сначала печатаем ключи в фиксированном порядке
for k in CAM_KEYS_ORDER:
if k not in cfg or cfg[k] is None:
continue
v = cfg[k]
if isinstance(v, str):
v = v.replace("\\", "\\\\").replace('"', '\\"')
lines.append(f'{k} = "{v}"')
elif isinstance(v, bool):
lines.append(f'{k} = {"true" if v else "false"}')
else:
lines.append(f"{k} = {v}")
# Затем доп. ключи, которых нет в CAM_KEYS_ORDER
for k, v in cfg.items():
if k in CAM_KEYS_ORDER or k == "id":
continue
if v is None:
continue
if isinstance(v, str):
v = v.replace("\\", "\\\\").replace('"', '\\"')
lines.append(f'{k} = "{v}"')
elif isinstance(v, bool):
lines.append(f'{k} = {"true" if v else "false"}')
else:
lines.append(f"{k} = {v}")
lines.append("") # пустая строка между камерами
return "\n".join(lines).rstrip() + "\n"
def _write_all(cams: Dict[int, Dict[str, Any]]) -> None:
path = _cameras_file_path()
txt = _dump_toml(cams)
path.write_text(txt, encoding="utf-8")
# ------------------------------ Публичные утилиты ------------------------------
def get_all() -> Dict[int, Dict[str, Any]]:
"""Вернёт все камеры с подставленными дефолтами (в памяти)."""
return _load_all()
def get(cid: int) -> Optional[Dict[str, Any]]:
"""Вернёт конфиг одной камеры или None."""
return _load_all().get(int(cid))
def update(cid: int, patch: Dict[str, Any]) -> Dict[int, Dict[str, Any]]:
"""
Патчит одну камеру по id и сохраняет файл.
Возвращает обновлённый словарь всех камер.
"""
cid = int(cid)
cams = _load_all()
if cid not in cams:
raise KeyError(f"Camera id {cid} not found")
base = cams[cid]
# нормализация типов
if "ptz_channel" in patch and patch["ptz_channel"] is not None:
patch["ptz_channel"] = int(patch["ptz_channel"])
if "sweep_sign" in patch and patch["sweep_sign"] is not None:
patch["sweep_sign"] = int(patch["sweep_sign"])
base.update(patch)
cams[cid] = _apply_defaults(base)
_write_all(cams)
return cams
# ------------------------------ CRUD (твоя логика) ------------------------------
def add_or_update_camera(cam: Dict[str, Any]) -> Tuple[int, Dict[int, Dict[str, Any]]]:
"""
Одиночная запись (как у тебя было).
cam: {id? , ip, username, password, ptz_channel, preset1, preset2, sweep_sign, ...}
Если id не задан берём max+1. Возвращает (real_id, updated_dict).
"""
cams = _load_all()
# Вычисляем id
real_id = int(cam["id"]) if ("id" in cam and cam["id"] is not None) else (
(max(cams.keys()) + 1) if cams else 0
)
# Нормализация/дефолты
rec = dict(cam)
rec["id"] = real_id
if "ptz_channel" in rec and rec["ptz_channel"] is not None:
rec["ptz_channel"] = int(rec["ptz_channel"])
if "sweep_sign" in rec and rec["sweep_sign"] is not None:
rec["sweep_sign"] = int(rec["sweep_sign"])
cams[real_id] = _apply_defaults(rec)
_write_all(cams)
# Аккуратный лог: покажем, какие ID по этому IP уже есть (ch=1/ch=2)
ip = rec.get("ip")
ptz_id = None
pan_id = None
try:
for cid, c in cams.items():
if c.get("ip") == ip:
ch = int(c.get("ptz_channel", 1))
if ch == 1:
ptz_id = cid
elif ch == 2:
pan_id = cid
except Exception:
pass
logger.info(
f"[CFG] add_or_update_camera ip={ip} -> saved id={real_id} ch={rec.get('ptz_channel', 1)}; "
f"present: PTZ id={ptz_id}, PAN id={pan_id} (total={len(cams)})"
)
return real_id, cams
def delete_camera(cid: int) -> Dict[int, Dict[str, Any]]:
cams = _load_all()
if cid in cams:
del cams[cid]
_write_all(cams)
return cams
# ------------------------------ ПАРНАЯ запись PTZ+PAN ------------------------------
def add_or_update_camera_pair(cam: Dict[str, Any]) -> Tuple[int, Dict[int, Dict[str, Any]]]:
"""
Создаёт/обновляет ДВЕ записи на один IP:
- PTZ (канал 1) с пресетами preset1/preset2
- PAN (канал 2) без пресетов
Если одна из записей существует — аккуратно обновит её и досоздаст вторую.
Возвращает (ptz_id, весь словарь камер).
"""
cams = _load_all()
# Входные данные
ip = cam["ip"]
username = cam.get("username")
password = cam.get("password")
sweep_sign = int(cam.get("sweep_sign", 1))
preset1 = cam.get("preset1")
preset2 = cam.get("preset2")
mac = cam.get("mac")
have = {cid: c for cid, c in cams.items() if c.get("ip") == ip}
by_ch: Dict[int, Tuple[int, Dict[str, Any]]] = {}
for cid, c in have.items():
try:
ch = int(c.get("ptz_channel", 1))
except Exception:
ch = 1
by_ch[ch] = (cid, c)
# --- PTZ (ch = 1) ---
ptz_patch = {
"ip": ip,
"username": username,
"password": password,
"ptz_channel": 1,
"preset1": preset1,
"preset2": preset2,
"sweep_sign": sweep_sign,
"mac": mac,
}
if 1 in by_ch:
ptz_id, base = by_ch[1]
# не затираем None-ами
for k, v in ptz_patch.items():
if v is not None:
base[k] = v
cams[ptz_id] = _apply_defaults(base)
else:
ptz_id = (max(cams.keys()) + 1) if cams else 0
rec = {"id": ptz_id, **ptz_patch}
cams[ptz_id] = _apply_defaults(rec)
# --- PAN (ch = 2) ---
pan_patch = {
"ip": ip,
"username": username,
"password": password,
"ptz_channel": 2,
"preset1": None,
"preset2": None,
"sweep_sign": sweep_sign,
"mac": mac,
}
if 2 in by_ch:
pan_id, base2 = by_ch[2]
# не затираем None-ами
for k, v in pan_patch.items():
if v is not None:
base2[k] = v
cams[pan_id] = _apply_defaults(base2)
else:
# подберём свободный id, не наступая на ptz_id
new_id = (max(cams.keys()) + 1) if cams else (1 if ptz_id == 0 else 0)
if new_id == ptz_id:
new_id += 1
rec2 = {"id": new_id, **pan_patch}
cams[new_id] = _apply_defaults(rec2)
_write_all(cams)
return ptz_id, cams