from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Tuple, Optional import os import tomllib import logging logger = logging.getLogger("PTZTracker") try: from . import config # type: ignore except Exception: config = None # type: ignore # ------------------------------ Порядок ключей ------------------------------ CAM_KEYS_ORDER = [ "ip", "username", "password", "ptz_channel", "preset1", "preset2", "sweep_sign", "preset1_deg", "preset2_deg", "preset1_tilt_deg", "preset2_tilt_deg", "sector_min_deg", "sector_max_deg", "pan_sign", "tilt_sign", "pan_offset_deg", "tilt_offset_deg", "mac", "north_offset_deg", "preset1_deg_geo", "preset2_deg_geo", "sector_min_deg_geo", "sector_max_deg_geo", ] # Мягкие дефолты для новых полей (и некоторых старых) DEFAULTS: Dict[str, Any] = { "ptz_channel": 1, "preset1": None, "preset2": None, "sweep_sign": 1, "preset1_deg": None, "preset2_deg": None, "preset1_tilt_deg": None, "preset2_tilt_deg": None, "sector_min_deg": None, "sector_max_deg": None, "pan_sign": None, "tilt_sign": None, "pan_offset_deg": 0.0, "tilt_offset_deg": 0.0, "mac": None, "north_offset_deg": None, "preset1_deg_geo": None, "preset2_deg_geo": None, "sector_min_deg_geo": None, "sector_max_deg_geo": None, } # ------------------------------ IO helpers ------------------------------ def _cameras_file_path() -> Path: env = os.getenv("CAMERAS_FILE") if env: return Path(env).resolve() here = Path(__file__).resolve() for parent in [here.parent, *here.parents]: if parent.name == "ptz_tracker_modular": return (parent / "cameras.toml").resolve() # fallback — рядом с текущим модулем return (here.parent / "cameras.toml").resolve() def _apply_defaults(cfg: Dict[str, Any]) -> Dict[str, Any]: """Мягко подставляет дефолты; в файл None не пишем.""" for k, v in DEFAULTS.items(): cfg.setdefault(k, v) return cfg def _load_all() -> Dict[int, Dict[str, Any]]: path = _cameras_file_path() if not path.exists(): return {} if path.suffix.lower() != ".toml": raise RuntimeError("Для веб-редактирования ожидается TOML (cameras.toml)") with open(path, "rb") as f: data = tomllib.load(f) arr = data.get("camera") or data.get("cameras") or [] res: Dict[int, Dict[str, Any]] = {} for item in arr: cid = int(item["id"]) res[cid] = _apply_defaults(dict(item)) return res def _dump_toml(cams: Dict[int, Dict[str, Any]]) -> str: lines: List[str] = [] for cid in sorted(cams.keys()): cfg = dict(cams[cid]) lines.append("[[camera]]") lines.append(f"id = {cid}") # Сначала печатаем ключи в фиксированном порядке for k in CAM_KEYS_ORDER: if k not in cfg or cfg[k] is None: continue v = cfg[k] if isinstance(v, str): v = v.replace("\\", "\\\\").replace('"', '\\"') lines.append(f'{k} = "{v}"') elif isinstance(v, bool): lines.append(f'{k} = {"true" if v else "false"}') else: lines.append(f"{k} = {v}") # Затем доп. ключи, которых нет в CAM_KEYS_ORDER for k, v in cfg.items(): if k in CAM_KEYS_ORDER or k == "id": continue if v is None: continue if isinstance(v, str): v = v.replace("\\", "\\\\").replace('"', '\\"') lines.append(f'{k} = "{v}"') elif isinstance(v, bool): lines.append(f'{k} = {"true" if v else "false"}') else: lines.append(f"{k} = {v}") lines.append("") # пустая строка между камерами return "\n".join(lines).rstrip() + "\n" def _write_all(cams: Dict[int, Dict[str, Any]]) -> None: path = _cameras_file_path() txt = _dump_toml(cams) path.write_text(txt, encoding="utf-8") # ------------------------------ Публичные утилиты ------------------------------ def get_all() -> Dict[int, Dict[str, Any]]: """Вернёт все камеры с подставленными дефолтами (в памяти).""" return _load_all() def get(cid: int) -> Optional[Dict[str, Any]]: """Вернёт конфиг одной камеры или None.""" return _load_all().get(int(cid)) def update(cid: int, patch: Dict[str, Any]) -> Dict[int, Dict[str, Any]]: """ Патчит одну камеру по id и сохраняет файл. Возвращает обновлённый словарь всех камер. """ cid = int(cid) cams = _load_all() if cid not in cams: raise KeyError(f"Camera id {cid} not found") base = cams[cid] # нормализация типов if "ptz_channel" in patch and patch["ptz_channel"] is not None: patch["ptz_channel"] = int(patch["ptz_channel"]) if "sweep_sign" in patch and patch["sweep_sign"] is not None: patch["sweep_sign"] = int(patch["sweep_sign"]) base.update(patch) cams[cid] = _apply_defaults(base) _write_all(cams) return cams # ------------------------------ CRUD (твоя логика) ------------------------------ def add_or_update_camera(cam: Dict[str, Any]) -> Tuple[int, Dict[int, Dict[str, Any]]]: """ Одиночная запись (как у тебя было). cam: {id? , ip, username, password, ptz_channel, preset1, preset2, sweep_sign, ...} Если id не задан – берём max+1. Возвращает (real_id, updated_dict). """ cams = _load_all() # Вычисляем id real_id = int(cam["id"]) if ("id" in cam and cam["id"] is not None) else ( (max(cams.keys()) + 1) if cams else 0 ) # Нормализация/дефолты rec = dict(cam) rec["id"] = real_id if "ptz_channel" in rec and rec["ptz_channel"] is not None: rec["ptz_channel"] = int(rec["ptz_channel"]) if "sweep_sign" in rec and rec["sweep_sign"] is not None: rec["sweep_sign"] = int(rec["sweep_sign"]) cams[real_id] = _apply_defaults(rec) _write_all(cams) # Аккуратный лог: покажем, какие ID по этому IP уже есть (ch=1/ch=2) ip = rec.get("ip") ptz_id = None pan_id = None try: for cid, c in cams.items(): if c.get("ip") == ip: ch = int(c.get("ptz_channel", 1)) if ch == 1: ptz_id = cid elif ch == 2: pan_id = cid except Exception: pass logger.info( f"[CFG] add_or_update_camera ip={ip} -> saved id={real_id} ch={rec.get('ptz_channel', 1)}; " f"present: PTZ id={ptz_id}, PAN id={pan_id} (total={len(cams)})" ) return real_id, cams def delete_camera(cid: int) -> Dict[int, Dict[str, Any]]: cams = _load_all() if cid in cams: del cams[cid] _write_all(cams) return cams # ------------------------------ ПАРНАЯ запись PTZ+PAN ------------------------------ def add_or_update_camera_pair(cam: Dict[str, Any]) -> Tuple[int, Dict[int, Dict[str, Any]]]: """ Создаёт/обновляет ДВЕ записи на один IP: - PTZ (канал 1) с пресетами preset1/preset2 - PAN (канал 2) без пресетов Если одна из записей существует — аккуратно обновит её и досоздаст вторую. Возвращает (ptz_id, весь словарь камер). """ cams = _load_all() # Входные данные ip = cam["ip"] username = cam.get("username") password = cam.get("password") sweep_sign = int(cam.get("sweep_sign", 1)) preset1 = cam.get("preset1") preset2 = cam.get("preset2") mac = cam.get("mac") have = {cid: c for cid, c in cams.items() if c.get("ip") == ip} by_ch: Dict[int, Tuple[int, Dict[str, Any]]] = {} for cid, c in have.items(): try: ch = int(c.get("ptz_channel", 1)) except Exception: ch = 1 by_ch[ch] = (cid, c) # --- PTZ (ch = 1) --- ptz_patch = { "ip": ip, "username": username, "password": password, "ptz_channel": 1, "preset1": preset1, "preset2": preset2, "sweep_sign": sweep_sign, "mac": mac, } if 1 in by_ch: ptz_id, base = by_ch[1] # не затираем None-ами for k, v in ptz_patch.items(): if v is not None: base[k] = v cams[ptz_id] = _apply_defaults(base) else: ptz_id = (max(cams.keys()) + 1) if cams else 0 rec = {"id": ptz_id, **ptz_patch} cams[ptz_id] = _apply_defaults(rec) # --- PAN (ch = 2) --- pan_patch = { "ip": ip, "username": username, "password": password, "ptz_channel": 2, "preset1": None, "preset2": None, "sweep_sign": sweep_sign, "mac": mac, } if 2 in by_ch: pan_id, base2 = by_ch[2] # не затираем None-ами for k, v in pan_patch.items(): if v is not None: base2[k] = v cams[pan_id] = _apply_defaults(base2) else: # подберём свободный id, не наступая на ptz_id new_id = (max(cams.keys()) + 1) if cams else (1 if ptz_id == 0 else 0) if new_id == ptz_id: new_id += 1 rec2 = {"id": new_id, **pan_patch} cams[new_id] = _apply_defaults(rec2) _write_all(cams) return ptz_id, cams