# calibration.py from __future__ import annotations import asyncio import logging import math from typing import Optional, Tuple from . import config, state from .geom import ang_diff_signed, normalize360 from .ptz_io import read_ptz_azimuth_deg, goto_preset_token, get_ptz_status # Опционально: запись в cameras.toml try: from .toml_persist import ( persist_north_offset, # (cam_id, north_deg, source) -> bool persist_geo_fields, # (cam_id, fields: dict[str, float], source) -> bool ) except Exception: persist_north_offset = None # type: ignore[assignment] persist_geo_fields = None # type: ignore[assignment] logger = logging.getLogger("PTZTracker") # ============================== # Внутренние утилиты # ============================== async def _wait_az_stable(cam: int, timeout: float = 2.5, tol_deg: float = 0.3) -> Optional[float]: """ Ждём стабилизации pan: возвращает угол, когда изменения <= tol_deg. """ loop = asyncio.get_event_loop() t0 = loop.time() last = None stable = 0 while loop.time() - t0 < timeout: a = read_ptz_azimuth_deg(cam) if a is None: await asyncio.sleep(0.12) continue a = normalize360(a) if last is None or abs(ang_diff_signed(a, last)) > tol_deg: stable = 0 else: stable += 1 if stable >= 1: return a last = a await asyncio.sleep(0.12) return last async def get_absolute_ptz_position(cam_idx: int) -> Optional[Tuple[float, float]]: """ Читает абсолютные оценки pan/tilt (deg), pan нормализует в 0..360. FIX: tilt у части Hikvision может не возвращаться/не парситься. Для калибровки сектора критичен pan, поэтому: - если pan есть, а tilt нет -> tilt=0.0 """ pan, tilt = get_ptz_status(cam_idx) if pan is None: return None pan_n = normalize360(float(pan)) tilt_f = float(tilt) if tilt is not None else 0.0 return pan_n, tilt_f def to_geo_bearing(pan_abs_cam_deg: float, north_offset_deg: float | None) -> float: """geo = pan_abs + north_offset (в 0..360).""" return normalize360(pan_abs_cam_deg + (north_offset_deg or 0.0)) def shortest_signed(a: float, b: float) -> float: """Подписанное отклонение угла a -> b в (-180; 180].""" return ang_diff_signed(b, a) async def compute_north_offset_on_preset1(cam_idx: int, ref_bearing_geo: float) -> Optional[float]: """ Находясь на preset1: north_offset = ref_bearing_geo - pan_abs_cam. """ a = await _wait_az_stable(cam_idx) if a is None: return None return normalize360(ref_bearing_geo - a) async def get_bearing_curr_geo(cam_idx: int) -> Optional[float]: """ Текущий гео-азимут взгляда (0..360), если есть north_offset_deg. """ cfg = config.CAMERA_CONFIG[cam_idx] north = cfg.get("north_offset_deg") if north is None: return None a = await _wait_az_stable(cam_idx) if a is None: return None return to_geo_bearing(a, float(north)) async def get_delta_from_preset1_geo(cam_idx: int) -> Optional[float]: """ Подписанная дельта от preset1 в гео-СК. Положит. — вправо. """ cfg = config.CAMERA_CONFIG[cam_idx] p1_geo = cfg.get("preset1_deg_geo") if p1_geo is None: return None b = await get_bearing_curr_geo(cam_idx) if b is None: return None return shortest_signed(p1_geo, b) async def calibrate_presets(cam_idx: int) -> bool: """ Снимаем pan/tilt на preset1/preset2 и выставляем сектор во «внутренних» углах. FIX: - не требуем tilt (если его нет — tilt=0.0) - сохраняем preset*_deg / sector_* в cameras.toml через persist_geo_fields (если доступно) - НЕ запускаем патруль здесь (иначе легко получить "дерганье" и спам пресетов) """ cfg = config.CAMERA_CONFIG[cam_idx] p1, p2 = cfg.get("preset1"), cfg.get("preset2") if not (p1 and p2): return False start_az = read_ptz_azimuth_deg(cam_idx) logger.info("Calibrating presets for camera %s", cam_idx) # preset1 goto_preset_token(cam_idx, p1) await asyncio.sleep(0.6) _ = await _wait_az_stable(cam_idx) pos1 = await get_absolute_ptz_position(cam_idx) if pos1 is None: logger.error("Failed to get pan for preset1 on camera %s", cam_idx) return False # preset2 goto_preset_token(cam_idx, p2) await asyncio.sleep(0.6) _ = await _wait_az_stable(cam_idx) pos2 = await get_absolute_ptz_position(cam_idx) if pos2 is None: logger.error("Failed to get pan for preset2 on camera %s", cam_idx) return False az1, tilt1 = pos1 az2, tilt2 = pos2 cfg["preset1_deg"] = float(az1) cfg["preset2_deg"] = float(az2) cfg["preset1_tilt_deg"] = float(tilt1) cfg["preset2_tilt_deg"] = float(tilt2) span = ang_diff_signed(az2, az1) if span >= 0: sector_min = az1 - config.SECTOR_MARGIN_DEG sector_max = az2 + config.SECTOR_MARGIN_DEG else: sector_min = az2 - config.SECTOR_MARGIN_DEG sector_max = az1 + config.SECTOR_MARGIN_DEG cfg["sector_min_deg"] = float(normalize360(sector_min)) cfg["sector_max_deg"] = float(normalize360(sector_max)) logger.info( "Camera %s sector calibrated: %.2f°..%.2f° (p1=%.2f°, p2=%.2f°)", cam_idx, cfg["sector_min_deg"], cfg["sector_max_deg"], az1, az2 ) # Persist в cameras.toml (если доступно) if callable(persist_geo_fields): try: persist_geo_fields( cam_idx, { "preset1_deg": float(az1), "preset2_deg": float(az2), "preset1_tilt_deg": float(tilt1), "preset2_tilt_deg": float(tilt2), "sector_min_deg": float(cfg["sector_min_deg"]), "sector_max_deg": float(cfg["sector_max_deg"]), }, source="calibrate_presets", ) except Exception as e: logger.warning("[CAL] persist_geo_fields failed for cam %s: %s", cam_idx, e) # вернуть ближе к исходному положению if start_az is not None: token = p1 if abs(ang_diff_signed(start_az, az1)) <= abs(ang_diff_signed(start_az, az2)) else p2 else: token = p1 goto_preset_token(cam_idx, token) return True async def calibrate_presets_geo(cam_idx: int, ref_bearing_geo: float) -> bool: """ 1) preset1 → pan/tilt → preset1_deg/preset1_tilt_deg 2) north_offset = ref_bearing_geo - pan_abs(preset1) → сохранить 3) preset2 → pan/tilt 4) preset*_deg_geo, sector_*_geo → сохранить """ cfg = config.CAMERA_CONFIG[cam_idx] p1, p2 = cfg.get("preset1"), cfg.get("preset2") if not (p1 and p2): logger.error("[GEO] Geo-calibration requires preset1 and preset2 on camera %s", cam_idx) return False start_az = read_ptz_azimuth_deg(cam_idx) logger.info("[GEO] Calibrating presets (with north) for camera %s", cam_idx) # --- preset1 goto_preset_token(cam_idx, p1) await asyncio.sleep(0.6) _ = await _wait_az_stable(cam_idx) pos1 = await get_absolute_ptz_position(cam_idx) if pos1 is None: logger.error("[GEO] Failed to get pan for preset1 on camera %s", cam_idx) return False az1, tilt1 = pos1 # --- north offset ref_bearing_geo = normalize360(ref_bearing_geo) north_offset = await compute_north_offset_on_preset1(cam_idx, ref_bearing_geo) if north_offset is None: logger.error("[GEO] Failed to compute north_offset for camera %s", cam_idx) return False north_offset = normalize360(north_offset) # записать в TOML (если доступно) if callable(persist_north_offset): try: changed = persist_north_offset(cam_idx, north_offset, source="calibrate_presets_geo") if changed: logger.info("[GEO] cameras.toml updated with north_offset for cam %s (%.2f°)", cam_idx, north_offset) else: logger.info("[GEO] north_offset for cam %s already up-to-date (%.2f°)", cam_idx, north_offset) except Exception as e: logger.warning("[GEO] persist_north_offset failed for cam %s: %s", cam_idx, e) else: logger.warning("[GEO] persist_north_offset is unavailable; north won't be written to cameras.toml") # --- preset2 goto_preset_token(cam_idx, p2) await asyncio.sleep(0.6) _ = await _wait_az_stable(cam_idx) pos2 = await get_absolute_ptz_position(cam_idx) if pos2 is None: logger.error("[GEO] Failed to get pan for preset2 on camera %s", cam_idx) return False az2, tilt2 = pos2 # --- внутренние углы cfg["preset1_deg"] = float(az1) cfg["preset2_deg"] = float(az2) cfg["preset1_tilt_deg"] = float(tilt1) cfg["preset2_tilt_deg"] = float(tilt2) # --- гео-углы пресетов cfg["north_offset_deg"] = float(north_offset) cfg["preset1_deg_geo"] = to_geo_bearing(az1, north_offset) cfg["preset2_deg_geo"] = to_geo_bearing(az2, north_offset) # --- сектор в гео span_geo = ang_diff_signed(cfg["preset2_deg_geo"], cfg["preset1_deg_geo"]) if span_geo >= 0: sector_min_geo = cfg["preset1_deg_geo"] - config.SECTOR_MARGIN_DEG sector_max_geo = cfg["preset2_deg_geo"] + config.SECTOR_MARGIN_DEG else: sector_min_geo = cfg["preset2_deg_geo"] - config.SECTOR_MARGIN_DEG sector_max_geo = cfg["preset1_deg_geo"] + config.SECTOR_MARGIN_DEG cfg["sector_min_deg_geo"] = normalize360(sector_min_geo) cfg["sector_max_deg_geo"] = normalize360(sector_max_geo) # сохранить полезные geo-поля в TOML (опц.) if callable(persist_geo_fields): try: persist_geo_fields( cam_idx, { "preset1_deg": float(az1), "preset2_deg": float(az2), "preset1_tilt_deg": float(tilt1), "preset2_tilt_deg": float(tilt2), "north_offset_deg": float(north_offset), "preset1_deg_geo": float(cfg["preset1_deg_geo"]), "preset2_deg_geo": float(cfg["preset2_deg_geo"]), "sector_min_deg_geo": float(cfg["sector_min_deg_geo"]), "sector_max_deg_geo": float(cfg["sector_max_deg_geo"]), }, source="calibrate_presets_geo", ) except Exception: pass logger.info( "[GEO] Camera %s north_offset=%.2f°, p1_geo=%.2f°, p2_geo=%.2f°, sector_geo=%.1f°..%.1f°", cam_idx, cfg["north_offset_deg"], cfg["preset1_deg_geo"], cfg["preset2_deg_geo"], cfg["sector_min_deg_geo"], cfg["sector_max_deg_geo"] ) # вернуть ближе к исходному token = p1 if start_az is not None: token = p1 if abs(ang_diff_signed(start_az, az1)) <= abs(ang_diff_signed(start_az, az2)) else p2 goto_preset_token(cam_idx, token) return True async def autocal_pan_sign(cam: int): """ Определение знака панорамирования через небольшой толчок вправо. """ a0 = read_ptz_azimuth_deg(cam) if a0 is None: return from .ptz_io import move_ptz, stop_ptz move_ptz(cam, +0.12, 0.0, 0.0) await asyncio.sleep(0.25) stop_ptz(cam) await asyncio.sleep(0.20) a1 = read_ptz_azimuth_deg(cam) if a1 is None: return delta = ang_diff_signed(a1, a0) if abs(delta) < 0.05: logger.info( "[PAN_SIGN] cam %s: no movement (near wall?) — keep pan_sign=%s", cam, config.CAMERA_CONFIG[cam].get("pan_sign") ) return config.CAMERA_CONFIG[cam]["pan_sign"] = +1 if delta > 0 else -1 logger.info("[PAN_SIGN] cam %s set to %s (delta=%.2f°)", cam, config.CAMERA_CONFIG[cam]["pan_sign"], delta) async def park_to_patrol_start(): """ Поставить камеры на старт патруля согласно PATROL_START_DIR: если +1 — preset1, если -1 — preset2. """ from .ptz_io import stop_ptz, goto_preset_token for cam in config.PTZ_ORDER: cfg = config.CAMERA_CONFIG[cam] dir_sign = config.PATROL_START_DIR.get(cam, +1) start_token = cfg.get("preset1") if dir_sign >= 0 else cfg.get("preset2") if start_token: stop_ptz(cam) goto_preset_token(cam, start_token) await asyncio.sleep(1.2) def _pano_center_geo_from_state(pano_id: int) -> Optional[float]: """ Достаём последний абсолютный азимут центра панорамы из state. Поддерживаем несколько возможных ключей. """ s = getattr(state, "pano_states", {}).get(pano_id) or {} for k in ("center_bearing_deg", "bearing_deg", "geo_bearing_deg", "attack_bearing_deg"): v = s.get(k) if isinstance(v, (int, float)): return normalize360(float(v)) # fallback: иногда кладут в ptz_states[pano_id]["pan_deg_geo"] ss = getattr(state, "ptz_states", {}).get(pano_id) or {} v = ss.get("pan_deg_geo") if isinstance(v, (int, float)): return normalize360(float(v)) return None async def auto_calibrate_north_from_pano_edges(ptz_id: int) -> Optional[float]: """ Вычисляет и сохраняет north_offset_deg для PTZ, используя спаренную панораму: - pano_id = config.PTZ_TO_PAN[ptz_id] - center_geo = центр панорамы из state (0..360) - hfov = ширина обзора панорамы в градусах (из cameras.toml) - preset1/preset2 считаем левой/правой границей (или наоборот при preset1_is_left=false) - измеряем внутренние pan на обоих пресетах (a1, a2) - считаем два независимых north: n1=left_geo-a1, n2=right_geo-a2 - берём среднее на окружности и пишем в TOML Вернёт north_offset_deg или None. """ # 0) привязка PTZ↔PANO pano_id = config.PTZ_TO_PAN.get(ptz_id) if pano_id is None: logger.warning("[AUTO-NORTH] PTZ %s: no paired pano in PTZ_TO_PAN", ptz_id) return None # 1) центр панорамы center_geo = _pano_center_geo_from_state(pano_id) if center_geo is None: logger.warning("[AUTO-NORTH] PTZ %s: pano %s center bearing is unknown", ptz_id, pano_id) return None # 2) HFOV панорамы hfov = config.get_cam_hfov_deg(pano_id) if not isinstance(hfov, (int, float)) or hfov <= 0: logger.warning("[AUTO-NORTH] PTZ %s: pano %s HFOV unknown", ptz_id, pano_id) return None left_geo = normalize360(center_geo - hfov * 0.5) right_geo = normalize360(center_geo + hfov * 0.5) cfg = config.CAMERA_CONFIG.get(ptz_id, {}) p1, p2 = cfg.get("preset1"), cfg.get("preset2") if not (p1 and p2): logger.warning("[AUTO-NORTH] PTZ %s: presets 1/2 are required", ptz_id) return None # какие пресеты — левый/правый край? p1_is_left = cfg.get("preset1_is_left") if p1_is_left is None: # если явно не задано — ориентируемся по sweep_sign (или pan_sign) sweep = int(cfg.get("sweep_sign", cfg.get("pan_sign", 1)) or 1) p1_is_left = True if sweep >= 0 else False target_geo_for_p1 = left_geo if p1_is_left else right_geo target_geo_for_p2 = right_geo if p1_is_left else left_geo # helper: стабилизация pan async def _stable_pan(cam: int) -> Optional[float]: return await _wait_az_stable(cam, timeout=2.5, tol_deg=0.3) # 3) измеряем пресеты goto_preset_token(ptz_id, p1); await asyncio.sleep(0.6) a1 = await _stable_pan(ptz_id) if a1 is None: logger.warning("[AUTO-NORTH] PTZ %s: cannot read pan on preset1", ptz_id) return None goto_preset_token(ptz_id, p2); await asyncio.sleep(0.6) a2 = await _stable_pan(ptz_id) if a2 is None: logger.warning("[AUTO-NORTH] PTZ %s: cannot read pan on preset2", ptz_id) return None # 4) два независимых north + среднее на окружности n1 = normalize360(target_geo_for_p1 - a1) n2 = normalize360(target_geo_for_p2 - a2) sx = math.cos(math.radians(n1)) + math.cos(math.radians(n2)) sy = math.sin(math.radians(n1)) + math.sin(math.radians(n2)) north = normalize360(math.degrees(math.atan2(sy, sx))) if (sx != 0 or sy != 0) else n1 # 5) сохраняем в TOML и рантайм if callable(persist_north_offset): try: persist_north_offset(ptz_id, north, source="auto_from_pano_edges") except Exception: pass cfg["north_offset_deg"] = north # опционально сохраним справочно preset1_ref_geo_deg if callable(persist_geo_fields): try: persist_geo_fields(ptz_id, {"preset1_ref_geo_deg": target_geo_for_p1}, source="auto_from_pano_edges") except Exception: pass logger.info( "[AUTO-NORTH] PTZ %s: north_offset=%.2f°, p1->%.2f°, p2->%.2f° (center=%.2f°, hfov=%.1f°)", ptz_id, north, target_geo_for_p1, target_geo_for_p2, center_geo, hfov ) return north async def autocal_geo_from_panorama(cam_id: int) -> bool: north = await auto_calibrate_north_from_pano_edges(cam_id) return north is not None