from __future__ import annotations import time import math import logging from typing import Any, List, Tuple import numpy as np import torch import cv2 from . import config, state from .utils import clipf, wrap_deg, deg_to_compass from .preview import maybe_downscale, publish_preview from . import alarms from .notify import notify_detected from .ptz_io import move_ptz_bounded, call_autofocus, stop_ptz from .patrol import return_to_preset, ensure_patrol_resume logger = logging.getLogger("PTZTracker") _device = "cuda" if torch.cuda.is_available() else "cpu" _targets_tensor = torch.tensor(config.TARGET_CLASSES, device=_device) # === Zoom / устойчивость === _Z_HOLD_AFTER_LOSS = getattr(config, "ZOOM_LOSS_HOLD_SEC", 0.70) _Z_SIGN_HYST = getattr(config, "ZOOM_SIGN_HYST", 0.020) _Z_MIN_CMD = getattr(config, "Z_MIN_CMD", 0.05) _Z_IN_RAMP = getattr(config, "Z_RAMP_TIME", 0.45) _Z_FF_GAIN = getattr(config, "Z_FF_GAIN", 0.55) _Z_P = getattr(config, "Z_P", 2.4) _Z_I = getattr(config, "Z_I", 0.16) _Z_INT_CLAMP = getattr(config, "Z_INT_CLAMP", 0.15) _Z_SPEED_LIMIT_OUT = getattr(config, "Z_SPEED_LIMIT_OUT", -0.65) _Z_SPEED_LIMIT_IN = getattr(config, "Z_SPEED_LIMIT_IN", 0.45) _Z_CENTER_GATE_IN = getattr(config, "Z_CENTER_GATE_BASE", 0.06) _Z_CENTER_GATE_SCALE = getattr(config, "Z_CENTER_GATE_SCALE", 0.20) _Z_UNLOCK_EDGE = getattr(config, "Z_UNLOCK_EDGE", 0.050) _Z_PAN_BOOST_IN = getattr(config, "Z_PAN_BOOST_WHEN_IN", getattr(config, "Z_PAN_BOOST", 1.35)) # === TRACK lifecycle === TRACK_END_LOSS_SEC = float(getattr(config, "TRACK_END_LOSS_SEC", 0.90)) # сколько ждать без детекции, чтобы завершить TRACK TRACK_END_RETURN_COOLDOWN_SEC = float(getattr(config, "TRACK_END_RETURN_COOLDOWN_SEC", 2.5)) # анти-спам goto TRACK_END_RESUME_DELAY_SEC = float(getattr(config, "TRACK_END_RESUME_DELAY_SEC", 1.0)) # после парковки def _publish_with_boxes( cam_id: int, img_bgr: np.ndarray, boxes_xyxy: List[Tuple[int, int, int, int]], labels: List[str], ) -> None: if config.PUBLISH_ONLY_IF_CLIENTS and not state.video_clients: return down, sx, sy = maybe_downscale(img_bgr, config.PREVIEW_TARGET_W) if boxes_xyxy: for (x1, y1, x2, y2), lab in zip(boxes_xyxy, labels): if sx != 1.0 or sy != 1.0: x1 = int(x1 * sx); y1 = int(y1 * sy) x2 = int(x2 * sx); y2 = int(y2 * sy) cv2.rectangle(down, (x1, y1), (x2, y2), (0, 255, 0), 2, lineType=cv2.LINE_4) cv2.putText( down, lab, (x1, max(0, y1 - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1, cv2.LINE_4, ) publish_preview(cam_id, down) def choose_auto_action(cam_idx: int, now: float) -> str: st = state.ptz_states[cam_idx] dt_cam = now - st["last_seen"] if dt_cam <= config.SHORT_LOSS_SEC: return "FOLLOW_SHORT" if dt_cam <= config.LONG_LOSS_SEC: return "SEARCH" ema_g = state.detect_stats_global["ema_interval"] if ema_g is None: return "SEARCH" dyn_thr = max(config.ABS_MIN_FALLBACK, 3.0 * float(ema_g)) dyn_thr = min(dyn_thr, config.ABS_MAX_FALLBACK) if config.FALLBACK_PRESET_CAP and config.FALLBACK_PRESET_CAP > 0: dyn_thr = min(dyn_thr, config.FALLBACK_PRESET_CAP) if dt_cam > dyn_thr: return "FALLBACK" return "SEARCH" if ema_g < 20.0 else "HOLD" def _maybe_return_and_resume(cam_idx: int, reason: str, min_interval_sec: float) -> bool: st = state.ptz_states[cam_idx] now = time.time() last = float(st.get("last_return_at", 0.0) or 0.0) if (now - last) < float(min_interval_sec): return False st["last_return_at"] = now return_to_preset(cam_idx, reason) ensure_patrol_resume(cam_idx, delay=1.0) return True def _enter_track(cam_idx: int) -> None: st = state.ptz_states[cam_idx] if not st.get("tracking_active"): # запоминаем, был ли патруль активен — чтобы потом восстановить st["_patrol_was_active_before_track"] = bool(st.get("patrol_active", False)) st["tracking_active"] = True st["mode"] = "TRACK" # патрульные команды должны замолчать st["patrol_active"] = False st["sync_wait"] = False def _end_track_and_park(cam_idx: int, *, reason: str) -> None: st = state.ptz_states[cam_idx] now = time.time() # если уже едем на пресет — не дублируем if now < float(st.get("goto_in_progress_until", 0.0) or 0.0): return last = float(st.get("last_track_end_return_at", 0.0) or 0.0) if (now - last) < TRACK_END_RETURN_COOLDOWN_SEC: return st["last_track_end_return_at"] = now # СРАЗУ ставим "за нято goto", чтобы второй источник не успел добавить ещё один goto settle = float(getattr(config, "GOTO_SETTLE_SEC", 2.5)) st["goto_in_progress_until"] = now + settle st["tracking_active"] = False st["mode"] = "IDLE" notify_detected(cam_idx, False, force=True) stop_ptz(cam_idx) return_to_preset(cam_idx, reason=reason) ensure_patrol_resume(cam_idx, delay=max(0.8, TRACK_END_RESUME_DELAY_SEC)) def postprocess_control(cam_idx: int, frame_bgr: np.ndarray, res: Any, is_ptz: bool) -> None: boxes_t = getattr(res, "boxes", None) h, w = frame_bgr.shape[:2] w_inv, h_inv = (1.0 / w), (1.0 / h) now = time.time() st = state.ptz_states.setdefault(cam_idx, {}) st.setdefault("mode", "auto" if config.AUTO_STRATEGY else "manual") st.setdefault("last_seen", 0.0) st.setdefault("loss_since", None) st.setdefault("last_bbox", None) st.setdefault("prev_t", None) st.setdefault("prev_cx", 0.5) st.setdefault("prev_cy", 0.5) st.setdefault("vx", 0.0) st.setdefault("vy", 0.0) st.setdefault("ix", 0.0) st.setdefault("iy", 0.0) st.setdefault("w_frac_ema", None) st.setdefault("prev_w_frac", None) st.setdefault("w_growth", 0.0) st.setdefault("zoom_state", 0) st.setdefault("zoom_prev_cmd", 0.0) st.setdefault("zoom_int", 0.0) st.setdefault("zoom_last_change", 0.0) st.setdefault("zoom_ramp_start", 0.0) st.setdefault("zoom_lock", False) st.setdefault("lock_candidate_since", None) st.setdefault("last_zoom_seen", 0.0) st.setdefault("last_dx", 0.0) st.setdefault("last_dy", 0.0) st.setdefault("stable_frames", 0) st.setdefault("fallback_done", False) st.setdefault("patrol_active", False) st.setdefault("tracking_active", False) st.setdefault("last_return_at", 0.0) st.setdefault("last_track_end_return_at", 0.0) st.setdefault("attack_origin_pan", None) st.setdefault("prev_pan_deg", None) st.setdefault("prev_pan_t", None) st.setdefault("last_heading_log", 0.0) _cfg = config.CAMERA_CONFIG.get(int(cam_idx), {}) if hasattr(config, "CAMERA_CONFIG") else {} _PAN_SIGN: float = float(_cfg.get("pan_sign", 1) or 1) _TILT_SIGN: float = float(_cfg.get("tilt_sign", -1) or -1) MAX_ANN_BOXES = 12 det_xyxy: List[Tuple[int, int, int, int]] = [] det_labels: List[str] = [] best_target = None if boxes_t is not None and len(boxes_t) > 0: confs = boxes_t.conf clss = boxes_t.cls mask = (confs >= config.DETECT_CONF) & torch.isin(clss, _targets_tensor) if mask.any(): xyxy_all = boxes_t.xyxy[mask].detach().cpu().numpy().astype(int) conf_all = confs[mask].detach().cpu().numpy() cls_all = clss[mask].detach().cpu().numpy().astype(int) x1 = xyxy_all[:, 0]; y1 = xyxy_all[:, 1] x2 = xyxy_all[:, 2]; y2 = xyxy_all[:, 3] cy = (y1 + y2) * 0.5 * h_inv score_all = conf_all + 0.05 * ((x2 - x1) * w_inv) + ((cy < 0.45).astype(np.float32) * 0.12) order = score_all.argsort()[::-1] topk = order[:MAX_ANN_BOXES] for i in topk: lab = f"{config.CLASS_NAMES.get(int(cls_all[i]), 'obj')} {float(conf_all[i]):.2f}" det_xyxy.append((int(xyxy_all[i, 0]), int(xyxy_all[i, 1]), int(xyxy_all[i, 2]), int(xyxy_all[i, 3]))) det_labels.append(lab) ibest = int(order[0]) best_target = (xyxy_all[ibest].tolist(), float(conf_all[ibest]), int(cls_all[ibest])) # ========================= # ===== есть детекция ====== # ========================= if best_target is not None: (x1, y1, x2, y2), best_conf, _ = best_target st["loss_since"] = None st["last_bbox"] = (int(x1), int(y1), int(x2), int(y2)) st["last_zoom_seen"] = now st["last_seen"] = now _publish_with_boxes(cam_idx, frame_bgr, det_xyxy, det_labels) # --------- Панорама ---------- if not is_ptz: notify_detected(cam_idx, True) cx = (x1 + x2) * 0.5; cy = (y1 + y2) * 0.5 cxn = cx * w_inv; cyn = cy * h_inv if st["prev_t"] is not None: dt = max(now - st["prev_t"], 1e-3) st["vx"] = 0.85 * st["vx"] + 0.15 * ((cxn - st["prev_cx"]) / dt) st["vy"] = 0.85 * st["vy"] + 0.15 * ((cyn - st["prev_cy"]) / dt) st["prev_t"], st["prev_cx"], st["prev_cy"] = now, cxn, cyn try: from . import geo_autocal geo_autocal.feed_pano_sample(cam_idx, cxn, now) except Exception: pass base_az = float(config.get_cam_base_azimuth_deg(cam_idx)) hfov = float(config.get_cam_hfov_deg(cam_idx)) bearing = wrap_deg(base_az + (cxn - 0.5) * hfov) if st.get("attack_origin_pan") is None: st["attack_origin_pan"] = bearing st["prev_pan_deg"] = bearing st["prev_pan_t"] = now st["last_heading_log"] = 0.0 origin = float(st["attack_origin_pan"]) cur_compass = deg_to_compass(float(bearing)) org_compass = deg_to_compass(origin) delta = wrap_deg(float(bearing) - origin) cw_ccw = "CW" if delta >= 0.0 else "CCW" pan_rate = None prev_t = st.get("prev_pan_t") prev_p = st.get("prev_pan_deg") if prev_t is not None and prev_p is not None: dtp = max(1e-3, now - float(prev_t)) pan_rate = wrap_deg(float(bearing) - float(prev_p)) / dtp st["prev_pan_deg"] = float(bearing) st["prev_pan_t"] = now if (now - float(st.get("last_heading_log", 0.0))) >= 0.7: if pan_rate is None: logger.info( "[ATTACK/PANO] cam %d from %s (%.1f°) -> %s (%.1f°), Δ=%.1f° %s", cam_idx, org_compass, origin, cur_compass, float(bearing), float(delta), cw_ccw ) else: logger.info( "[ATTACK/PANO] cam %d from %s (%.1f°) -> %s (%.1f°), Δ=%.1f° %s, rate=%.1f°/s", cam_idx, org_compass, origin, cur_compass, float(bearing), float(delta), cw_ccw, float(pan_rate) ) st["last_heading_log"] = now ptz_id = config.PAN_TO_PTZ.get(cam_idx) if ptz_id is not None: pst = state.ptz_states.setdefault(ptz_id, {}) pst["last_seen"] = now pst["loss_since"] = None pst["last_return_at"] = max(float(pst.get("last_return_at", 0.0) or 0.0), now) alarms.ingest_detection(cam_idx, True) return # --------- PTZ: включаем TRACK сразу ---------- _enter_track(cam_idx) notify_detected(cam_idx, True) cx = (x1 + x2) * 0.5; cy = (y1 + y2) * 0.5 cxn = cx * w_inv; cyn = cy * h_inv w_frac = (x2 - x1) * w_inv prev_t = st.get("prev_t") if prev_t is not None: dt = max(now - float(prev_t), 1e-3) vx_i = (cxn - st["prev_cx"]) / dt vy_i = (cyn - st["prev_cy"]) / dt vel_smooth = clipf(float(getattr(config, "VEL_SMOOTH", 0.85)), 0.0, 0.99) st["vx"] = vel_smooth * st["vx"] + (1.0 - vel_smooth) * vx_i st["vy"] = vel_smooth * st["vy"] + (1.0 - vel_smooth) * vy_i wf = st["w_frac_ema"] alpha = getattr(config, "W_EMA_ALPHA", 0.30) st["w_frac_ema"] = w_frac if wf is None else (1.0 - alpha) * wf + alpha * w_frac w_smooth = st["w_frac_ema"] if st["prev_w_frac"] is not None and prev_t is not None: dtw = max(now - float(prev_t), 1e-3) st["w_growth"] = (w_smooth - st["prev_w_frac"]) / dtw else: st["w_growth"] = 0.0 # optional clamp to avoid spikes from detector jitter wg_clamp = float(getattr(config, "W_GROWTH_CLAMP", 2.0)) st["w_growth"] = clipf(float(st.get("w_growth", 0.0) or 0.0), -wg_clamp, wg_clamp) st["prev_w_frac"] = w_smooth # update prev_* only after derivative computations st["prev_t"], st["prev_cx"], st["prev_cy"] = now, cxn, cyn speed = abs(st["vx"]) + abs(st["vy"]) far_factor = clipf(1.0 - max(w_smooth, (y2 - y1) * h_inv), 0.0, 1.0) base_lead = float(getattr(config, "BASE_LEAD", 0.12)) lead_gain = float(getattr(config, "LEAD_GAIN", 0.35)) lead_max = float(getattr(config, "LEAD_MAX", 0.30)) lead_hi = max(base_lead, lead_max) lead_time = clipf(base_lead + lead_gain * speed * far_factor, base_lead, lead_hi) # If target is far from center, disable aggressive lead to avoid overshoot during large slews. err_mag = max(abs(cxn - 0.5), abs(cyn - 0.5)) lead_off = float(getattr(config, "LEAD_OFF_ERR", 0.18)) if err_mag > lead_off: lead_time = base_lead pred_min = float(getattr(config, "PRED_CLAMP_MIN", 0.05)) pred_max = float(getattr(config, "PRED_CLAMP_MAX", 0.95)) cxn_pred = clipf(cxn + st["vx"] * lead_time, pred_min, pred_max) cyn_pred = clipf(cyn + st["vy"] * lead_time, pred_min, pred_max) dx = cxn_pred - 0.5 dy = cyn_pred - 0.5 sm_alpha = getattr(config, "AIM_SMOOTH", 0.55) sm_dx = st["last_dx"] * (1 - sm_alpha) + dx * sm_alpha sm_dy = st["last_dy"] * (1 - sm_alpha) + dy * sm_alpha if abs(sm_dx) > getattr(config, "DEAD_ZONE", 0.018): st["ix"] = clipf(st["ix"] + sm_dx * 0.02, -0.20, 0.20) else: st["ix"] *= 0.9 if abs(sm_dy) > getattr(config, "DEAD_ZONE", 0.018): st["iy"] = clipf(st["iy"] + sm_dy * 0.02, -0.20, 0.20) else: st["iy"] *= 0.9 ddx = sm_dx - st["last_dx"] ddy = sm_dy - st["last_dy"] st["last_dx"], st["last_dy"] = sm_dx, sm_dy conf_norm = clipf((best_conf - config.DETECT_CONF) / max(1e-3, 0.40 - config.DETECT_CONF), 0.0, 1.0) aim_gain = 0.6 + 0.4 * conf_norm pan_speed = clipf( (getattr(config, "AIM_P", 1.85) * sm_dx + getattr(config, "AIM_I", 0.07) * st["ix"] + getattr(config, "AIM_D", 0.15) * ddx) * aim_gain, -1.0, 1.0 ) tilt_speed = clipf( (getattr(config, "AIM_P", 1.85) * sm_dy + getattr(config, "AIM_I", 0.07) * st["iy"] + getattr(config, "AIM_D", 0.15) * ddy) * aim_gain, -1.0, 1.0 ) center_err = max(abs(dx), abs(dy)) left = x1 * w_inv; right = 1.0 - x2 * w_inv top = y1 * h_inv; bottom = 1.0 - y2 * h_inv min_margin = min(left, right, top, bottom) # === zoom logic === state_zoom = st.get("zoom_state", 0) can_switch = (now - float(st.get("zoom_last_change", 0.0) or 0.0)) >= float(getattr(config, "Z_MIN_SWITCH_INTERVAL", 0.50)) gate_dyn = max(0.05, _Z_CENTER_GATE_IN + _Z_CENTER_GATE_SCALE * (1.0 - clipf(w_smooth, 0.0, 1.0))) center_good = center_err < gate_dyn vel = abs(st["vx"]) + abs(st["vy"]) z_target = (1.0 - clipf(vel / max(1e-6, float(getattr(config, "Z_VEL_REF", 0.45))), 0.0, 1.0)) * float(getattr(config, "Z_TARGET_SLOW", 0.16)) \ + clipf(vel / max(1e-6, float(getattr(config, "Z_VEL_REF", 0.45))), 0.0, 1.0) * float(getattr(config, "Z_TARGET_FAST", 0.12)) z_band_min = z_target - float(getattr(config, "Z_DELTA_IN", 0.008)) z_band_max = z_target + float(getattr(config, "Z_DELTA_OUT", 0.060)) out_edge_crit = min_margin < float(getattr(config, "Z_OUT_EDGE", 0.07)) near_edge = (min_margin < 0.05) if st.get("zoom_lock", False): if ( out_edge_crit or (vel > float(getattr(config, "Z_UNLOCK_VEL_THR", 0.060))) or (abs(float(st.get("w_growth", 0.0) or 0.0)) > float(getattr(config, "Z_UNLOCK_GROWTH_THR", 0.015))) or (center_err > float(getattr(config, "Z_UNLOCK_CENTER_THR", 0.100))) or (min_margin < _Z_UNLOCK_EDGE) ): st["zoom_lock"] = False st["lock_candidate_since"] = None else: locked_candidate = ( center_good and (vel < float(getattr(config, "Z_UNLOCK_VEL_THR", 0.060))) and (abs(float(st.get("w_growth", 0.0) or 0.0)) < float(getattr(config, "Z_FREEZE_GROWTH_THR", 0.004))) and (min_margin >= float(getattr(config, "Z_SAFETY_EDGE", 0.05))) ) if locked_candidate: if st.get("lock_candidate_since") is None: st["lock_candidate_since"] = now elif (now - float(st["lock_candidate_since"])) >= float(getattr(config, "Z_LOCK_ARM_SECS", 0.8)): st["zoom_lock"] = True else: st["lock_candidate_since"] = None want_in = (w_smooth < (z_band_min - _Z_SIGN_HYST)) and center_good and (not near_edge) and (not st.get("zoom_lock", False)) want_out = (w_smooth > (z_band_max + _Z_SIGN_HYST)) or out_edge_crit next_state = state_zoom if state_zoom >= 0 and want_in and can_switch: next_state = +1 elif state_zoom <= 0 and want_out and can_switch: next_state = -1 elif not want_in and not want_out: next_state = 0 if next_state != state_zoom: st["zoom_state"] = next_state st["zoom_last_change"] = now err_w = z_target - w_smooth if abs(err_w) < float(getattr(config, "Z_DEADBAND", 0.010)): err_w = 0.0 st["zoom_int"] = clipf(float(st.get("zoom_int", 0.0) or 0.0) + err_w * 0.04, -_Z_INT_CLAMP, _Z_INT_CLAMP) zoom_pi = _Z_P * err_w + _Z_I * st["zoom_int"] zoom_ff = _Z_FF_GAIN * (-(float(st.get("w_growth", 0.0) or 0.0))) zoom_cmd = clipf(zoom_pi + zoom_ff, _Z_SPEED_LIMIT_OUT, _Z_SPEED_LIMIT_IN) if st.get("zoom_lock", False): zoom_cmd = min(zoom_cmd, -0.20) if out_edge_crit else 0.0 st["zoom_int"] *= 0.90 else: if st["zoom_state"] > 0: zoom_cmd = max(zoom_cmd, 0.0) elif st["zoom_state"] < 0: zoom_cmd = min(zoom_cmd, 0.0) else: zoom_cmd = 0.0 if abs(zoom_cmd) < _Z_MIN_CMD: zoom_cmd = 0.0 cmd_mag_est = max(abs(pan_speed), abs(tilt_speed)) if (cmd_mag_est > float(getattr(config, "Z_VELOCITY_GATE", 0.45))) and (zoom_cmd > 0.0): zoom_cmd = 0.0 err_gate = float(getattr(config, "Z_ERR_GATE", 0.18)) if (max(abs(sm_dx), abs(sm_dy)) > err_gate) and (zoom_cmd > 0.0): zoom_cmd = 0.0 if near_edge and zoom_cmd > 0.0: zoom_cmd = 0.0 if min_margin < float(getattr(config, "Z_SAFETY_EDGE", 0.05)): zoom_cmd = min(zoom_cmd, -0.25) prev_z = float(st.get("zoom_prev_cmd", 0.0) or 0.0) if zoom_cmd > 0.0: if prev_z <= 0.0: st["zoom_ramp_start"] = now frac = clipf((now - float(st.get("zoom_ramp_start", now) or now)) / max(1e-3, _Z_IN_RAMP), 0.0, 1.0) zoom_cmd *= (0.25 + 0.75 * frac) else: st["zoom_ramp_start"] = 0.0 st["zoom_prev_cmd"] = zoom_cmd if zoom_cmd > 0.0 and center_good and not near_edge: call_autofocus(cam_idx) dx_abs, dy_abs = abs(dx), abs(dy) if (dx_abs < 0.040) and (dy_abs < 0.040): st["stable_frames"] = int(st.get("stable_frames", 0) or 0) + 1 else: st["stable_frames"] = 0 st["fallback_done"] = False adj_pan, adj_tilt = pan_speed, tilt_speed if zoom_cmd > 0.0: adj_pan = clipf(adj_pan * _Z_PAN_BOOST_IN, -1.0, 1.0) adj_tilt = clipf(adj_tilt * _Z_PAN_BOOST_IN, -1.0, 1.0) move_ptz_bounded(cam_idx, _PAN_SIGN * adj_pan, _TILT_SIGN * adj_tilt, zoom_cmd) alarms.ingest_detection(cam_idx, True) return # ========================= # ===== детекций нет ====== # ========================= alarms.ingest_detection(cam_idx, False) notify_detected(cam_idx, False) # если мы были в TRACK и детекция пропала — заканчиваем TRACK и паркуем камеру в пресет if is_ptz and st.get("tracking_active"): # считаем, что "потеряли" если нет детекции > TRACK_END_LOSS_SEC if (now - float(st.get("last_seen", 0.0) or 0.0)) >= TRACK_END_LOSS_SEC: _end_track_and_park(cam_idx, reason="track_end_park") _publish_with_boxes(cam_idx, frame_bgr, [], []) return # ниже — твоя старая логика “потерь/возврата” (оставляем) st["zoom_lock"] = False st["lock_candidate_since"] = None if st.get("loss_since") is None: st["loss_since"] = now st["attack_origin_pan"] = None recently_tracked = (st.get("last_seen") is not None) and ((now - float(st["last_seen"])) < 10.0) near_edge_flag = False lb = st.get("last_bbox") if lb is not None: x1, y1, x2, y2 = lb left = x1 * w_inv; right = 1.0 - x2 * w_inv top = y1 * h_inv; bottom = 1.0 - y2 * h_inv if ( (left < config.EDGE_LOSS_MARGIN) or (right < config.EDGE_LOSS_MARGIN) or (top < config.VERT_LOSS_MARGIN) or (bottom < config.VERT_LOSS_MARGIN) ): near_edge_flag = True if is_ptz and ((now - float(st.get("last_zoom_seen", 0.0) or 0.0)) < _Z_HOLD_AFTER_LOSS): move_ptz_bounded( cam_idx, _PAN_SIGN * (float(st.get("last_dx", 0.0) or 0.0) * 0.6), _TILT_SIGN * (float(st.get("last_dy", 0.0) or 0.0) * 0.6), float(getattr(config, "RECOVER_ZOOM_OUT", -0.35)) * 0.25, ) _publish_with_boxes(cam_idx, frame_bgr, [], []) return edge_grace = float(getattr(config, "EDGE_LOSS_GRACE_SEC", 1.8)) timed_out = (st.get("loss_since") is not None) and ((now - float(st["loss_since"])) >= float(config.LOSS_TO_PRESET_SEC)) near_edge_and_lost = ( near_edge_flag and (st.get("loss_since") is not None) and ((now - float(st["loss_since"])) >= edge_grace) ) if is_ptz and recently_tracked and (near_edge_and_lost or timed_out): _maybe_return_and_resume( cam_idx, "loss_or_timeout", float(getattr(config, "LOSS_RETURN_COOLDOWN_SEC", 8.0)), ) _publish_with_boxes(cam_idx, frame_bgr, [], []) return if config.AUTO_STRATEGY and is_ptz: action = choose_auto_action(cam_idx, now) # если патруль активен и не TRACK — не мешаем патрулю if st.get("patrol_active", False) and st.get("mode") in ("IDLE", "SEARCH"): st["stable_frames"] = 0 # ---------- устойчивое поведение при кратких провалах детекции ---------- last_seen = float(st.get("last_seen", 0.0) or 0.0) dt_loss = now - last_seen def _predict_dxdy() -> Tuple[float, float]: cx0 = float(st.get("prev_cx", 0.5) or 0.5) cy0 = float(st.get("prev_cy", 0.5) or 0.5) vx = float(st.get("vx", 0.0) or 0.0) vy = float(st.get("vy", 0.0) or 0.0) w0 = st.get("w_frac_ema") w0 = float(w0) if w0 is not None else 0.08 # типичный "далёкий" объект speed = float((vx * vx + vy * vy) ** 0.5) far_factor = clipf(1.0 - w0, 0.0, 1.0) base_lead = float(getattr(config, "BASE_LEAD", 0.12)) lead_gain = float(getattr(config, "LEAD_GAIN", 0.35)) lead_max = float(getattr(config, "LEAD_MAX", 0.30)) lead_hi = max(base_lead, lead_max) lead_time = clipf(base_lead + lead_gain * speed * far_factor, base_lead, lead_hi) err_mag = max(abs(cx0 - 0.5), abs(cy0 - 0.5)) lead_off = float(getattr(config, "LEAD_OFF_ERR", 0.18)) if err_mag > lead_off: lead_time = base_lead pred_min = float(getattr(config, "PRED_CLAMP_MIN", 0.05)) pred_max = float(getattr(config, "PRED_CLAMP_MAX", 0.95)) cxp = clipf(cx0 + vx * lead_time, pred_min, pred_max) cyp = clipf(cy0 + vy * lead_time, pred_min, pred_max) dx = cxp - 0.5 dy = cyp - 0.5 dcl = float(getattr(config, "PRED_DX_CLAMP", 0.35)) return clipf(dx, -dcl, dcl), clipf(dy, -dcl, dcl) pred_dx, pred_dy = _predict_dxdy() short_thr = float(getattr(config, "SHORT_LOSS_SEC", 0.25)) long_thr = float(getattr(config, "LONG_LOSS_SEC", 1.2)) if dt_loss <= short_thr: decay = 1.0 zoom_loss = 0.0 # freeze zoom на коротких провалах elif dt_loss <= long_thr: tau = float(getattr(config, "LOSS_DECAY_TAU", 0.35)) decay = float(math.exp(-(dt_loss - short_thr) / max(1e-3, tau))) zoom_loss = float(getattr(config, "RECOVER_ZOOM_OUT", -0.35)) * 0.35 # мягко расширяем кадр else: decay = 0.0 zoom_loss = float(getattr(config, "RECOVER_ZOOM_OUT", -0.35)) * 0.6 st["stable_frames"] = 0 # HOLD_PREDICT: пока цель могла "мигнуть" — ведём по прогнозу, не накапливая задержку if dt_loss <= long_thr: # обновим last_dx/last_dy прогнозом, чтобы другие ветки (если сработают) не тянули старьё st["last_dx"], st["last_dy"] = float(pred_dx), float(pred_dy) move_ptz_bounded( cam_idx, _PAN_SIGN * float(pred_dx) * float(decay), _TILT_SIGN * float(pred_dy) * float(decay), float(zoom_loss), ) _publish_with_boxes(cam_idx, frame_bgr, [], []) return if action == "FOLLOW_SHORT": move_ptz_bounded( cam_idx, _PAN_SIGN * float(st.get("last_dx", 0.0) or 0.0), _TILT_SIGN * float(st.get("last_dy", 0.0) or 0.0), float(getattr(config, "RECOVER_ZOOM_OUT", -0.35)) * 0.6, ) elif action in ("SEARCH", "HOLD"): if action == "HOLD" and (now - float(st.get("last_seen", now) or now)) < 8.0: st["last_dx"] = float(st.get("last_dx", 0.0) or 0.0) * 0.85 st["last_dy"] = float(st.get("last_dy", 0.0) or 0.0) * 0.85 move_ptz_bounded( cam_idx, _PAN_SIGN * float(st.get("last_dx", 0.0) or 0.0), _TILT_SIGN * float(st.get("last_dy", 0.0) or 0.0), float(getattr(config, "RECOVER_ZOOM_OUT", -0.35)) * 0.4, ) else: stop_ptz(cam_idx) elif action == "FALLBACK": if not st.get("patrol_active", False): _maybe_return_and_resume( cam_idx, "fallback", float(getattr(config, "FALLBACK_RETURN_COOLDOWN_SEC", 12.0)), ) st["stable_frames"] = 0 _publish_with_boxes(cam_idx, frame_bgr, [], [])