You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
17 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
from __future__ import annotations
# --- bootstrap for direct script run ---
# Позволяет запускать файл как обычный скрипт: python ptz_tracker_modular/main.py
# и при этом импортировать пакет ptz_tracker_modular
if __package__ is None or __package__ == "":
import sys, pathlib
pkg_root = pathlib.Path(__file__).resolve().parent.parent # папка-родитель, содержащая пакет
if str(pkg_root) not in sys.path:
sys.path.insert(0, str(pkg_root))
__package__ = "ptz_tracker_modular"
# ---------------------------------------
import asyncio
import logging
import os
import threading
from queue import Queue
from typing import Iterable
def _parse_level(name: str, default=logging.ERROR) -> int:
name = (name or "").strip().upper()
return getattr(logging, name, default)
def setup_logging() -> logging.Logger:
"""
Лог только в консоль, по умолчанию уровень ERROR.
Файлы и папка logs не создаются.
"""
# По умолчанию только ошибки; можно переопределить LOG_LEVEL=WARNING/INFO/DEBUG
level = _parse_level(os.getenv("LOG_LEVEL", "ERROR"))
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger("PTZTracker")
# приглушим лишний шум от сторонних библиотек
logging.getLogger("websockets.client").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logger.error("Logging initialized, level=%s (console only, no file)", logging.getLevelName(level))
return logger
logger = setup_logging()
from ptz_tracker_modular import config, state
from ptz_tracker_modular.streaming import (
MicroBatcher,
video_broadcaster,
cpp_ws_loop,
cpp_detection_loop,
build_servers,
)
from ptz_tracker_modular.status import ptz_status_poller
from ptz_tracker_modular.heartbeat import heartbeat_pinger
from ptz_tracker_modular.sector import sector_autocal_from_presets, sector_init_on_startup
from ptz_tracker_modular.calibration import calibrate_presets, autocal_pan_sign, park_to_patrol_start
from ptz_tracker_modular.patrol import (
patrol_init_on_startup,
patrol_supervisor_tick_bounded,
patrol_supervisor_tick,
sync_patrol_coordinator_loop,
track_return_watchdog, # <-- импорт вотчдога
)
from ptz_tracker_modular import ptz_io # содержит _ptz_worker
# (опционально) uvloop
try:
import uvloop # type: ignore
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except Exception:
pass
def _dump_alarm_config():
"""Красиво логируем ключевые параметры тревог при старте (если уровень позволяет)."""
ws_url = getattr(config, "ALARM_WS_URL", "ws://127.0.0.1:3000/ws")
ws_fmt = getattr(config, "ALARM_WS_FORMAT", "swagger")
mac_only = bool(getattr(config, "ALARM_MAC_ONLY", False))
http_en = bool(getattr(config, "ALARM_HTTP_ENABLE", False))
http_mirror = bool(getattr(config, "ALARM_HTTP_MIRROR", False))
http_base = getattr(config, "ALARM_HTTP_BASE", "http://127.0.0.1:3000")
http_path = getattr(config, "ALARM_HTTP_PATH", "/videodata")
http_tmpl = getattr(config, "ALARM_HTTP_PATH_TMPL", "/videodata")
ws_bmax = getattr(config, "ALARM_WS_BATCH_MAX", getattr(config, "WS_BATCH_MAX", 16))
ws_bms = getattr(config, "ALARM_WS_BATCH_MS", getattr(config, "WS_BATCH_MS", 120))
http_bmax = getattr(config, "ALARM_HTTP_BATCH_MAX", getattr(config, "HTTP_BATCH_MAX", 16))
http_bms = getattr(config, "ALARM_HTTP_BATCH_MS", getattr(config, "HTTP_BATCH_MS", 120))
hb_sec = int(getattr(config, "ALARM_HEARTBEAT_SEC", 60))
logger.info("[ALARM] WS_URL=%s format=%s mac_only=%s", ws_url, ws_fmt, mac_only)
logger.info("[ALARM] HTTP enable=%s mirror=%s base=%s path=%s tmpl=%s",
http_en, http_mirror, http_base, http_path, http_tmpl)
logger.info("[ALARM] Batching: WS max=%s wait_ms=%s | HTTP max=%s wait_ms=%s",
ws_bmax, ws_bms, http_bmax, http_bms)
logger.info("[ALARM] Heartbeat: every %ss to %s", hb_sec, http_path)
def watch(coro: asyncio.Future | asyncio.Task | asyncio.coroutines, name: str) -> asyncio.Task:
"""
Обёртка для задач: если задача упала, логируем исключение.
Возвращает Task (подходит для gather).
"""
async def _guard():
try:
return await coro # type: ignore[arg-type]
except asyncio.CancelledError:
logger.info("[%s] cancelled", name)
raise
except Exception as e:
logger.exception("[%s] crashed: %s", name, e)
raise
if not isinstance(coro, (asyncio.Future, asyncio.Task)):
coro = asyncio.create_task(coro) # type: ignore[assignment]
task = asyncio.create_task(_guard(), name=name) # type: ignore[arg-type]
return task
async def main() -> None:
# --- Инициализация общего состояния ---
state.MAIN_LOOP = asyncio.get_running_loop()
state.alarm_queue = asyncio.Queue()
state.detection_queue = asyncio.Queue()
state.batcher = MicroBatcher(
max_batch=config.BATCH_MIN,
max_wait_ms=2.0,
adaptive=config.ADAPTIVE_BATCH_ENABLE,
bmin=config.BATCH_MIN,
bmax=config.BATCH_MAX,
target_ms=config.BATCH_TARGET_MS,
)
# Импорт alarms ПОСЛЕ инициализации state.*, чтобы внутри не было None
from ptz_tracker_modular import alarms # -> alarms.alarm_sender(), alarms.alarm_http_sender()
_dump_alarm_config()
# PTZ worker thread + очередь команд
state.ptz_cmd_q = Queue(maxsize=2048)
state._ptz_thread = threading.Thread(
target=ptz_io._ptz_worker, name="PTZWorker", daemon=True
)
state._ptz_thread.start()
logger.info("[PTZ] worker thread started")
# --- ВАЖНО: гарантируем базовые поля стейта для PTZ, иначе patrol может skip'ать камеры ---
# (частая причина "патруль не на всех PTZ": st.get("mode") == None)
for cam_id in config.PTZ_CAM_IDS:
st = state.ensure_cam_state(cam_id, is_ptz=True)
if st.get("mode") is None:
st["mode"] = "IDLE"
# ---- КАЛИБРОВКИ ----
for cam_id in config.PTZ_CAM_IDS:
cfg = config.CAMERA_CONFIG[cam_id]
need = (
cfg.get("preset1_deg") is None
or cfg.get("preset2_deg") is None
or cfg.get("sector_min_deg") is None
or cfg.get("sector_max_deg") is None
)
if need:
ok = await calibrate_presets(cam_id)
if not ok:
logger.warning(
"Failed to calibrate camera %s, using fallback for sector clamp.", cam_id
)
if config.USE_PRESET_EDGES_FOR_SECTOR:
need_sector = any(state.ptz_states[c].get("sector_left_deg") is None for c in config.PTZ_CAM_IDS)
if need_sector:
await sector_autocal_from_presets()
sector_init_on_startup()
for cam in config.PTZ_CAM_IDS:
try:
await autocal_pan_sign(cam)
except Exception as e:
logger.debug("autocal pan_sign cam %s: %s", cam, e)
await park_to_patrol_start()
# ---- Патруль ----
have_any_ptz = bool(config.PTZ_CAM_IDS)
if have_any_ptz and config.PATROL_ENABLE_ON_START:
if config.SYNC_PATROL_ENABLE and config.SYNC_COHORTS:
logger.info(
"SYNC PTZ patrol enabled (cohorts): %s",
", ".join(config.SYNC_COHORTS.keys()),
)
else:
patrol_init_on_startup()
logger.info("PTZ patrol started (standalone)")
elif not have_any_ptz:
logger.info("PTZ patrol skipped: no PTZ cameras configured")
else:
logger.info("PTZ patrol is DISABLED on start")
logger.info("PTZ tracking + HARD sector clamp + bounded patrol (resync enabled)")
# ---- WS servers ----
ctrl_server, video_server = await build_servers()
logger.info("[CTRL WS] listening on 0.0.0.0:%s", config.VUE_CONTROL_WS_PORT)
logger.info("[VIDEO WS] listening on 0.0.0.0:%s", config.VUE_VIDEO_WS_PORT)
async def patrol_supervisor_loop():
while True:
try:
patrol_supervisor_tick_bounded()
patrol_supervisor_tick()
except Exception as e:
logger.error("patrol_supervisor error: %s", e)
await asyncio.sleep(0.10)
async def track_watchdog_loop() -> None:
"""
Supervisor для per-camera watchdog'ов.
ВАЖНО: track_return_watchdog(cam_id) внутри себя бесконечный цикл.
Поэтому здесь мы НЕ await'им его по очереди, а запускаем отдельной Task
на каждую PTZ-камеру и следим, чтобы:
- вотчдог был запущен на всех cam_id из config.PTZ_CAM_IDS,
- если вотчдог упал — логируем причину и перезапускаем,
- если список PTZ камер изменился (reload) — отменяем лишние задачи.
"""
tasks_by_cam: dict[int, asyncio.Task] = {}
restart_after: dict[int, float] = {} # cam_id -> loop.time(), когда можно рестартить
loop = asyncio.get_running_loop()
tick_sec = 1.0
restart_cooldown_sec = 2.0
try:
while True:
now = loop.time()
# Снимок актуального списка PTZ камер (может меняться после reload)
ptz_ids = list(getattr(config, "PTZ_CAM_IDS", []) or [])
ptz_set = set(ptz_ids)
# 1) Запускаем/перезапускаем вотчдоги для актуальных камер
for cam_id in ptz_ids:
# гарантируем, что стейт камеры существует (чтобы внутри watchdog не было KeyError)
try:
state.ensure_cam_state(cam_id, is_ptz=True)
except Exception as e:
logger.error("[WATCHDOG] ensure_cam_state cam=%s failed: %s", cam_id, e)
continue
t = tasks_by_cam.get(cam_id)
if t is None:
# нет задачи — создаём, если прошёл cooldown
if now >= restart_after.get(cam_id, 0.0):
tasks_by_cam[cam_id] = asyncio.create_task(
track_return_watchdog(cam_id),
name=f"track_return_watchdog[{cam_id}]",
)
continue
if t.done():
# задача завершилась — логируем (если было исключение) и планируем рестарт
try:
exc = t.exception()
except asyncio.CancelledError:
exc = None
except Exception as e:
exc = e
if exc is not None:
logger.exception(
"[WATCHDOG] cam=%s track_return_watchdog crashed: %s",
cam_id,
exc,
)
tasks_by_cam.pop(cam_id, None)
restart_after[cam_id] = now + restart_cooldown_sec
# 2) Камеры, которые больше не PTZ — отменяем их watchdog'и
for cam_id in list(tasks_by_cam.keys()):
if cam_id not in ptz_set:
t = tasks_by_cam.pop(cam_id)
t.cancel()
restart_after.pop(cam_id, None)
# Можно подсказать, что камера теперь не PTZ (не обязательно)
try:
state.ensure_cam_state(cam_id, is_ptz=False)
except Exception:
pass
await asyncio.sleep(tick_sec)
except asyncio.CancelledError:
raise
finally:
# Корректно гасим все per-camera задачи
for t in tasks_by_cam.values():
t.cancel()
if tasks_by_cam:
await asyncio.gather(*tasks_by_cam.values(), return_exceptions=True)
# ---- Опциональный smoke-тест тревоги (по первому MAC) ----
if os.getenv("ALARM_SMOKE", "0") in ("1", "true", "TRUE", "yes", "YES"):
async def _smoke():
try:
# Берём первый доступный MAC из конфигурации
macs: Iterable[str] = (
cfg.get("mac") for _id, cfg in sorted(config.CAMERA_CONFIG.items())
if isinstance(cfg.get("mac"), str)
)
mac = next((m for m in macs if m), None)
if not mac:
logger.warning("[SMOKE] no MACs in CAMERA_CONFIG — skip")
return
logger.info("[SMOKE] sending test alarms for MAC=%s", mac)
alarms.queue_alarm_mac(mac, True)
await asyncio.sleep(0.2)
alarms.queue_alarm_mac(mac, False)
logger.info("[SMOKE] done")
except Exception as e:
logger.error("[SMOKE] failed: %s", e)
asyncio.create_task(_smoke())
# ---- Таски ----
tasks: list[asyncio.Task | asyncio.Future] = [
watch(video_broadcaster(), "video_broadcaster"),
watch(cpp_ws_loop(), "cpp_ws_loop"),
watch(cpp_detection_loop(), "cpp_detection_loop"),
watch(alarms.alarm_sender(), "alarm_sender"), # WS-alarms
watch(alarms.alarm_http_sender(), "alarm_http_sender"), # HTTP-alarms (если включено)
watch(alarms.periodic_status_sender(), "periodic_status_sender"), # heartbeat-массив камер
watch(ptz_status_poller(), "ptz_status_poller"),
watch(heartbeat_pinger(), "heartbeat_pinger"),
asyncio.create_task(ctrl_server.wait_closed(), name="ctrl_ws_wait_closed"),
asyncio.create_task(video_server.wait_closed(), name="video_ws_wait_closed"),
]
# Запускаем патрульный цикл, если он включён
if config.PATROL_ENABLE_ON_START and config.PTZ_CAM_IDS:
tasks.insert(8, watch(patrol_supervisor_loop(), "patrol_supervisor_loop"))
# Вотчдог по трекингу/IDLE — запускаем всегда, если есть PTZ-камеры
if config.PTZ_CAM_IDS:
tasks.insert(8, watch(track_watchdog_loop(), "track_watchdog_loop"))
# SYNC patrol при необходимости
if config.SYNC_PATROL_ENABLE and config.SYNC_COHORTS:
tasks.insert(8, watch(sync_patrol_coordinator_loop(), "sync_patrol_coordinator_loop"))
try:
await asyncio.gather(*tasks)
finally:
ctrl_server.close()
video_server.close()
await ctrl_server.wait_closed()
await video_server.wait_closed()
def run():
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Stopped by user")
finally:
try:
if state.ptz_cmd_q is not None:
state.ptz_cmd_q.put_nowait(None) # sentinel для PTZ-воркера
if state._ptz_thread is not None:
state._ptz_thread.join(timeout=0.5)
except Exception:
pass
for _cid, _st in state.ptz_states.items():
if _st.get("zoom_reset_timer"):
try:
_st["zoom_reset_timer"].cancel()
except Exception:
pass
if _st.get("preset_timer"):
try:
_st["preset_timer"].cancel()
except Exception:
pass
logger.info("Shutdown complete")
if __name__ == "__main__":
run()