from __future__ import annotations import asyncio import logging from typing import Optional, Tuple from . import config, state from .geom import normalize360, ang_diff_signed from .ptz_io import read_ptz_azimuth_deg, goto_preset_token from .toml_persist import persist_geo_fields logger = logging.getLogger("PTZTracker") def sector_contains(az_deg: float, sector_min_deg: float, sector_max_deg: float) -> bool: """Проверка попадания азимута в сектор. Сектор задаётся двумя границами по кругу [0..360): - если sector_min_deg <= sector_max_deg: обычный интервал - если sector_min_deg > sector_max_deg: сектор «пересекает 0» (wrap-around) """ az = normalize360(float(az_deg)) a = normalize360(float(sector_min_deg)) b = normalize360(float(sector_max_deg)) if a <= b: return a <= az <= b # wrap через 0: например [345..147] return az >= a or az <= b def sector_clamp(az_deg: float, sector_min_deg: float, sector_max_deg: float) -> float: """Если азимут вне сектора — возвращаем ближайшую границу, иначе az_deg.""" az = normalize360(float(az_deg)) a = normalize360(float(sector_min_deg)) b = normalize360(float(sector_max_deg)) if sector_contains(az, a, b): return az # выберем ближайшую границу по угловой дистанции da = abs(ang_diff_signed(a, az)) db = abs(ang_diff_signed(b, az)) return a if da <= db else b def _compute_sector_bounds_from_hfov(ptz_cam_id: int) -> Optional[Tuple[float, float, float]]: pano_id = config.PTZ_TO_PAN.get(ptz_cam_id) if pano_id is None: return None ptz_cfg = config.CAMERA_CONFIG[ptz_cam_id] pano_cfg = config.CAMERA_CONFIG[pano_id] center = normalize360(float(ptz_cfg.get("pan_offset_deg", 0.0))) hfov = float(pano_cfg.get("bullet_hfov_deg", 89.0)) half = max(0.0, hfov * 0.5 - config.SECTOR_MARGIN_DEG) left = normalize360(center - half) right = normalize360(center + half) return (center, left, right) async def sector_autocal_from_presets() -> None: if not config.USE_PRESET_EDGES_FOR_SECTOR: return for cam in config.PTZ_CAM_IDS: cfg = config.CAMERA_CONFIG[cam] p1, p2 = cfg.get("preset1"), cfg.get("preset2") if not (p1 and p2): continue goto_preset_token(cam, p1) await asyncio.sleep(0.8) a = read_ptz_azimuth_deg(cam) goto_preset_token(cam, p2) await asyncio.sleep(0.8) b = read_ptz_azimuth_deg(cam) if a is None or b is None: logger.warning("[SECTOR] cam %s: can't read azimuths", cam) continue span = abs(ang_diff_signed(b, a)) center = normalize360(a + ang_diff_signed(b, a) * 0.5) half = max(0.0, span * 0.5 - config.SECTOR_MARGIN_DEG) left = normalize360(center - half) right = normalize360(center + half) st = state.ptz_states[cam] st["sector_center_deg"] = center st["sector_left_deg"] = left st["sector_right_deg"] = right st["sector_min_deg"] = left st["sector_max_deg"] = right cfg["pan_offset_deg"] = center cfg["sector_min_deg"] = left cfg["sector_max_deg"] = right cfg["preset1_deg"] = a cfg["preset2_deg"] = b # сохраняем значения в cameras.toml persist_geo_fields(cam, { "preset1_deg": a, "preset2_deg": b, "sector_min_deg": left, "sector_max_deg": right, "sector_left_deg": left, "sector_right_deg": right, "pan_offset_deg": center, }, source="sector_autocal") # обновим HFOV у пушки, если можем pano_id = config.PTZ_TO_PAN.get(cam) if pano_id is not None: config.CAMERA_CONFIG[pano_id]["bullet_hfov_deg"] = span logger.info("[SECTOR] cam %s: span=%.1f°, center=%.1f°, L=%.1f°, R=%.1f°", cam, span, center, left, right) def sector_init_on_startup() -> None: if config.USE_PRESET_EDGES_FOR_SECTOR: for cam in config.PTZ_CAM_IDS: st = state.ptz_states[cam] if st.get("sector_left_deg") is None: b = _compute_sector_bounds_from_hfov(cam) if b is None: continue center, left, right = b st["sector_center_deg"] = center st["sector_left_deg"] = left st["sector_right_deg"] = right st["sector_min_deg"] = left st["sector_max_deg"] = right return for cam in config.PTZ_CAM_IDS: b = _compute_sector_bounds_from_hfov(cam) if b is None: continue center, left, right = b st = state.ptz_states[cam] st["sector_center_deg"] = center st["sector_left_deg"] = left st["sector_right_deg"] = right st["sector_min_deg"] = left st["sector_max_deg"] = right