# alarms.py from __future__ import annotations import asyncio import json import logging from datetime import datetime, timezone from typing import Any, Dict, Optional, List, Tuple # ====== Импорты локальных модулей с fallback (пакет/скрипт) ====== try: from . import config as _config from . import state as _state # ВАЖНО: модуль, а не значения # для «живого» чтения камер из cameras.toml try: from . import cameras_io # type: ignore except Exception: cameras_io = None # type: ignore except Exception: # запуск из корня проекта import config as _config # type: ignore import state as _state # type: ignore try: import cameras_io # type: ignore except Exception: cameras_io = None # type: ignore config = _config logger = logging.getLogger("PTZTracker") # ===================================================================== # Параметры (из config.py, с безопасными дефолтами) # ===================================================================== # Пороги гистерезиса по кадрам ALARM_COUNT_ON: int = int(getattr(config, "ALARM_COUNT_ON", 20)) ALARM_COUNT_OFF: int = int(getattr(config, "ALARM_COUNT_OFF", 80)) # WS для одно-битного флага video_alarm ALARM_WS_URL: str = getattr(config, "ALARM_WS_URL", "ws://127.0.0.1/api/core/ws") # HTTP bulk (всегда http/https, без /ws в конце) ALARM_HTTP_ENABLE: bool = bool(getattr(config, "ALARM_HTTP_ENABLE", True)) ALARM_HTTP_BASE: str = getattr(config, "ALARM_HTTP_BASE", "http://127.0.0.1/api/core") # bulk путь БЕЗ {mac} ALARM_HTTP_PATH: str = getattr(config, "ALARM_HTTP_PATH", "/videodata") # минутный heartbeat (тот же bulk формат) ALARM_HEARTBEAT_SEND_EMPTY: bool = bool(getattr(config, "ALARM_HEARTBEAT_SEND_EMPTY", True)) ALARM_HEARTBEAT_SEC: int = int(getattr(config, "ALARM_HEARTBEAT_SEC", 60)) # Микробатчинг очередей HTTP_BATCH_MAX: int = int(getattr(config, "ALARM_HTTP_BATCH_MAX", 16)) HTTP_BATCH_MS: int = int(getattr(config, "ALARM_HTTP_BATCH_MS", 120)) WS_BATCH_MAX: int = int(getattr(config, "ALARM_WS_BATCH_MAX", 16)) WS_BATCH_MS: int = int(getattr(config, "ALARM_WS_BATCH_MS", 120)) # ===================================================================== # Очереди # ===================================================================== def _ensure_ws_queue() -> asyncio.Queue: if _state.alarm_queue is None: _state.alarm_queue = asyncio.Queue() logger.debug("[ALARM] created alarm_queue") return _state.alarm_queue _http_queue: Optional[asyncio.Queue] = None def _ensure_http_queue() -> asyncio.Queue: global _http_queue if _http_queue is None: _http_queue = asyncio.Queue() logger.debug("[ALARM][HTTP] created _http_queue") return _http_queue # ===================================================================== # Утилиты # ===================================================================== def _now_iso_ms_utc() -> str: return ( datetime.now(timezone.utc) .isoformat(timespec="milliseconds") .replace("+00:00", "Z") ) def _norm_mac(s: str) -> str: s = (s or "").strip().upper().replace("-", ":") parts = s.split(":") if len(parts) != 6 or any(len(p) != 2 for p in parts): raise ValueError(f"Bad MAC: {s!r}") return s def _get_cam_mac(cam_idx: int) -> Optional[str]: # 1) предпочтительно читаем cameras_io (актуально по файлу) if cameras_io is not None: try: rec = cameras_io.get_all().get(int(cam_idx)) # type: ignore[attr-defined] mac = (rec or {}).get("mac") if isinstance(mac, str) and mac: return _norm_mac(mac) except Exception: pass # 2) фоллбэк — config (если кто-то синхронизирует его отдельно) try: mac = (_config.CAMERA_CONFIG[int(cam_idx)]).get("mac") # type: ignore[attr-defined] except Exception: mac = None if not isinstance(mac, str) or not mac: return None try: return _norm_mac(mac) except Exception: return None def _build_swagger_camera(mac: str, alarm: bool, status: bool = True) -> Dict[str, Any]: return {"mac": _norm_mac(mac), "status": bool(status), "alarm": bool(alarm)} def _build_swagger_payload(cameras: List[Dict[str, Any]]) -> Dict[str, Any]: return {"registeredAt": _now_iso_ms_utc(), "cameras": cameras} def _sanitize_http_base(v: str) -> str: """Лечим ошибки вида ws://... или хвост /ws у базы HTTP.""" v = (v or "").strip() if v.startswith("ws://"): v = "http://" + v[5:] if v.startswith("wss://"): v = "https://" + v[6:] if v.endswith("/ws"): v = v[:-3] return v.rstrip("/") def _bulk_snapshot_for_all_cams() -> Tuple[str, Dict[str, Any]]: """ Собираем bulk-payload по всем камерам (mac/status/alarm). Возвращает (url, json_body). """ # первоисточник камер if cameras_io is not None: try: src = cameras_io.get_all() # type: ignore[attr-defined] except Exception: src = None else: src = None if src is None: src = getattr(config, "CAMERA_CONFIG", {}) detected = getattr(_state, "detected_cameras", set()) or set() cameras_payload: List[Dict[str, Any]] = [] try: items = sorted(src.items(), key=lambda kv: int(kv[0])) except Exception: items = list((src or {}).items()) for cid, cfg in items: try: cam_id = int(cid) except Exception: continue mac = (cfg or {}).get("mac") if not isinstance(mac, str) or not mac: continue try: mac_n = _norm_mac(mac) except Exception: continue online = bool(cam_id in detected) armed = bool((_H.get(cam_id) or {}).get("armed", False)) cameras_payload.append(_build_swagger_camera(mac_n, alarm=armed, status=online)) base = _sanitize_http_base(ALARM_HTTP_BASE) path = str(ALARM_HTTP_PATH) # heartbeat/bulk путь обязан быть БЕЗ {mac} if "{mac}" in path: path = path.replace("{mac}", "") url = f"{base}/{path.lstrip('/')}" return url, _build_swagger_payload(cameras_payload) def _dispatch_ws(minimal: Dict[str, Any]) -> None: q = _ensure_ws_queue() loop = _state.MAIN_LOOP if loop is not None: try: loop.call_soon_threadsafe(q.put_nowait, minimal) return except Exception: pass q.put_nowait(minimal) def _dispatch_http(url: str, body: Dict[str, Any]) -> None: q = _ensure_http_queue() loop = _state.MAIN_LOOP item = {"url": url, "json": body} if loop is not None: try: loop.call_soon_threadsafe(q.put_nowait, item) return except Exception: pass q.put_nowait(item) # ===================================================================== # Гистерезис по кадрам (состояние «armed» на камеру) # ===================================================================== _H: Dict[int, Dict[str, Any]] = {} # cam_idx -> {"on":int, "off":int, "armed":bool} def _hyst_state(cam_idx: int) -> Dict[str, Any]: st = _H.get(cam_idx) if st is None: st = {"on": 0, "off": 0, "armed": False} _H[cam_idx] = st return st def _prune_hysteresis(valid_ids: set[int]) -> None: """Удалить состояния для камер, которых больше нет в конфиге.""" for cid in list(_H.keys()): if cid not in valid_ids: _H.pop(cid, None) def ingest_detection(cam_idx: int, detected: bool) -> None: """ True -> есть детект на кадре False -> нет детекта При изменении «armed»: - шлём WS одно событие {"type":"video_alarm","data": } (для совместимости) - параллельно шлём HTTP bulk со всеми камерами (mac/status/alarm) """ st = _hyst_state(cam_idx) if detected: st["on"] += 1 st["off"] = 0 if (not st["armed"]) and st["on"] >= ALARM_COUNT_ON: st["armed"] = True _emit_alarm_change(True) else: st["off"] += 1 st["on"] = 0 if st["armed"] and st["off"] >= ALARM_COUNT_OFF: st["armed"] = False _emit_alarm_change(False) # ===================================================================== # Эмиттер: WS + HTTP bulk # ===================================================================== def _emit_alarm_change(alarm_state: bool) -> None: # 1) WS минимальный флаг (оставляем — «как раньше») minimal = {"type": "video_alarm", "data": bool(alarm_state)} logger.info("[ALARM][WS][ENQ] data=%s", minimal["data"]) _dispatch_ws(minimal) # 2) HTTP bulk по всем камерам if ALARM_HTTP_ENABLE: url, body = _bulk_snapshot_for_all_cams() cams_cnt = len(body.get("cameras") or []) logger.info("[ALARM][HTTP-BULK][ENQ] url=%s cameras=%d", url, cams_cnt) _dispatch_http(url, body) # ===================================================================== # Дополнительно: прямой smoke-тест по конкретному MAC (bulk + опц. WS) # ===================================================================== def queue_alarm_mac(mac: str, alarm: bool, also_ws: bool = True) -> None: """ Вручную положить тревогу: на WS один бит, а на HTTP — bulk со всеми камерами, где у камеры с данным MAC выставлен alarm, остальные — по текущему _H/online. """ try: mac_n = _norm_mac(mac) except Exception: logger.error("[ALARM][SMOKE] bad MAC: %r", mac) return # WS (минимальный формат) — опционально, чтобы сразу засветить on/off if also_ws: minimal = {"type": "video_alarm", "data": bool(alarm)} logger.info("[ALARM][WS][ENQ][SMOKE] data=%s", minimal["data"]) _dispatch_ws(minimal) # HTTP bulk — строим полный срез и принудительно меняем alarm у данного MAC if not ALARM_HTTP_ENABLE: return url, body = _bulk_snapshot_for_all_cams() # подменим/добавим запись с mac_n cams = body.get("cameras") or [] found = False for c in cams: if str(c.get("mac", "")).upper() == mac_n: c["alarm"] = bool(alarm) found = True break if not found: # если в конфиге нет — добавим как online с заданной тревогой cams.append(_build_swagger_camera(mac_n, alarm=bool(alarm), status=True)) body["cameras"] = cams logger.info("[ALARM][HTTP-BULK][ENQ][SMOKE] url=%s cameras=%d", url, len(cams)) _dispatch_http(url, body) # ===================================================================== # Отправители: WS и HTTP # ===================================================================== async def _wait_state_ready(kind: str = "WS") -> None: t0 = asyncio.get_running_loop().time() while _state.MAIN_LOOP is None or (kind == "WS" and _state.alarm_queue is None): await asyncio.sleep(0.05) if asyncio.get_running_loop().time() - t0 > 5.0: if _state.MAIN_LOOP is None: _state.MAIN_LOOP = asyncio.get_running_loop() if kind == "WS" and _state.alarm_queue is None: _ensure_ws_queue() break async def alarm_sender() -> None: """ Отправка video_alarm по WS c микробатчем (сливаем X сообщений за WS_BATCH_MS и отправляем последнее значение). """ await _wait_state_ready("WS") import websockets q = _ensure_ws_queue() backoff = 1.0 while True: try: logger.info("[ALARM][WS] Connecting to %s ...", ALARM_WS_URL) async with websockets.connect( ALARM_WS_URL, ping_interval=20, close_timeout=1.0, compression=None, ) as ws: logger.info("[ALARM][WS] Connected") backoff = 1.0 while True: item = await q.get() try: # коалесцируем и берём последнее значение last = item t0 = asyncio.get_running_loop().time() bucket = 1 while bucket < WS_BATCH_MAX and (asyncio.get_running_loop().time() - t0) < (WS_BATCH_MS / 1000.0): try: nxt = q.get_nowait() last = nxt bucket += 1 except asyncio.QueueEmpty: await asyncio.sleep(0.01) data_bit = bool(last.get("data", False)) if isinstance(last, dict) else bool(last) to_send = {"type": "video_alarm", "data": data_bit} await ws.send(json.dumps(to_send, separators=(",", ":"))) logger.info("[ALARM][WS][SENT] data=%s (batched=%d)", data_bit, bucket) except Exception as exc: logger.error("[ALARM][WS] send failed: %s", exc) try: q.put_nowait(item) except Exception: pass break finally: try: q.task_done() except Exception: pass except Exception as exc: logger.error("[ALARM][WS] connection error: %s", exc) await asyncio.sleep(backoff) logger.info("[ALARM][WS] reconnect in %.1fs", backoff) backoff = min(backoff * 2, 10.0) async def alarm_http_sender() -> None: """ POST bulk-payload на HTTP с коалесцированием по URL (оставляем последний body для данного URL). """ if not ALARM_HTTP_ENABLE: logger.info("[ALARM][HTTP] disabled") return import requests from requests.adapters import HTTPAdapter try: from urllib3.util.retry import Retry # type: ignore except Exception: Retry = None # type: ignore sess = requests.Session() sess.trust_env = False if Retry is not None: adapter = HTTPAdapter( pool_connections=16, pool_maxsize=16, max_retries=Retry( total=2, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504], raise_on_status=False, ), ) else: adapter = HTTPAdapter(pool_connections=16, pool_maxsize=16, max_retries=0) sess.mount("http://", adapter) sess.mount("https://", adapter) headers = {"Content-Type": "application/json", "Connection": "keep-alive"} q = _ensure_http_queue() loop = asyncio.get_running_loop() def _merge_http_bucket(bucket: List[Dict[str, Any]]) -> Tuple[str, Dict[str, Any]]: # Все элементы в bucket — с одинаковым URL. Берём ПОСЛЕДНИЙ body. url = bucket[0].get("url", "") last_body = bucket[-1].get("json") or {} return url, last_body while True: item = await q.get() try: bucket = [item] t0 = asyncio.get_running_loop().time() while len(bucket) < HTTP_BATCH_MAX and (asyncio.get_running_loop().time() - t0) < (HTTP_BATCH_MS / 1000.0): try: nxt = q.get_nowait() if nxt.get("url") != bucket[0].get("url"): # другой URL — вернём назад q.put_nowait(nxt) break bucket.append(nxt) except asyncio.QueueEmpty: await asyncio.sleep(0.01) url_raw, body = _merge_http_bucket(bucket) base = _sanitize_http_base(ALARM_HTTP_BASE) # если кто-то положил в элемент абсолютный url — используем его; иначе соберём из base if url_raw.startswith("http://") or url_raw.startswith("https://"): url = url_raw else: path = str(ALARM_HTTP_PATH).replace("{mac}", "") url = f"{base}/{path.lstrip('/')}" cams_cnt = len(body.get("cameras") or []) logger.info("[ALARM][HTTP-BULK][SEND] url=%s cameras=%d", url, cams_cnt) def _post(): return sess.post(url, json=body, headers=headers, timeout=3.0) resp = await loop.run_in_executor(None, _post) if 200 <= resp.status_code < 300: logger.info("[ALARM][HTTP-BULK][OK] %s (%s) cameras=%d", url, resp.status_code, cams_cnt) else: logger.warning("[ALARM][HTTP-BULK][FAIL] %s HTTP %s", url, resp.status_code) logger.debug("[ALARM][HTTP-BULK][RESP] %s", (resp.text or "")[:200]) # вернуть последний элемент на ретрай try: q.put_nowait(bucket[-1]) except Exception: pass except Exception as exc: logger.error("[ALARM][HTTP-BULK] error: %s", exc) try: q.put_nowait(item) except Exception: pass finally: # аккуратно закрываем все task_done for _ in range(len(bucket)): try: q.task_done() except Exception: pass # ===================================================================== # Периодический heartbeat HTTP со всеми камерами (alarm=false) # ===================================================================== async def periodic_status_sender() -> None: """ Раз в ALARM_HEARTBEAT_SEC отправляет массив камер на HTTP-адрес БЕЗ {mac}. Формат: { "registeredAt": "...Z", "cameras": [{"mac": "...", "status": true|false, "alarm": false}, ...] } """ if not ALARM_HTTP_ENABLE: logger.info("[ALARM][HEARTBEAT] disabled because HTTP disabled") return import requests from requests.adapters import HTTPAdapter try: from urllib3.util.retry import Retry # type: ignore except Exception: Retry = None # type: ignore period = int(ALARM_HEARTBEAT_SEC) send_empty = bool(ALARM_HEARTBEAT_SEND_EMPTY) base = _sanitize_http_base(ALARM_HTTP_BASE) hb_path = str(ALARM_HTTP_PATH) if "{mac}" in hb_path: logger.warning("[ALARM][HEARTBEAT] ALARM_HTTP_PATH содержит {mac}; это путь для bulk. Уберу {mac}.") hb_path = hb_path.replace("{mac}", "") url = f"{base}/{hb_path.lstrip('/')}" # подготовим HTTP клиент sess = requests.Session() sess.trust_env = False if Retry is not None: adapter = HTTPAdapter( pool_connections=8, pool_maxsize=8, max_retries=Retry( total=2, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504], raise_on_status=False, ), ) else: adapter = HTTPAdapter(pool_connections=8, pool_maxsize=8, max_retries=0) sess.mount("http://", adapter) sess.mount("https://", adapter) headers = {"Content-Type": "application/json", "Connection": "keep-alive"} loop = asyncio.get_running_loop() def _snapshot_cameras() -> Tuple[List[Dict[str, Any]], set[int]]: """Вернёт (список-камер-для-отправки, множество валидных id)""" cams: List[Dict[str, Any]] = [] valid_ids: set[int] = set() # предпочтительно читаем из cameras_io, иначе — из config src = None if cameras_io is not None: try: src = cameras_io.get_all() # type: ignore[attr-defined] except Exception: src = None if src is None: src = getattr(config, "CAMERA_CONFIG", {}) try: items = sorted(src.items(), key=lambda kv: int(kv[0])) except Exception: items = list((src or {}).items()) detected = getattr(_state, "detected_cameras", set()) or set() for cid, cfg in items: try: cid_i = int(cid) valid_ids.add(cid_i) except Exception: continue mac = (cfg or {}).get("mac") if isinstance(mac, str): try: mac_n = _norm_mac(mac) cams.append(_build_swagger_camera(mac_n, alarm=False, status=(cid_i in detected))) except Exception: # плохой MAC — пропускаем pass return cams, valid_ids while True: try: cameras, valid_ids = _snapshot_cameras() # Чистим гистерезис от удалённых камер _prune_hysteresis(valid_ids) if cameras or send_empty: body = _build_swagger_payload(cameras) cams_cnt = len(cameras) logger.info("[ALARM][HEARTBEAT][SEND] url=%s cameras=%d", url, cams_cnt) def _post(): return sess.post(url, json=body, headers=headers, timeout=3.0) resp = await loop.run_in_executor(None, _post) if 200 <= resp.status_code < 300: logger.info("[ALARM][HEARTBEAT][OK] %s (%s) cameras=%d", url, resp.status_code, cams_cnt) else: logger.warning("[ALARM][HEARTBEAT][FAIL] %s HTTP %s", url, resp.status_code) logger.debug("[ALARM][HEARTBEAT][RESP] %s", (resp.text or "")[:200]) else: logger.info("[ALARM][HEARTBEAT] skip: no cameras (send_empty=False)") except Exception as exc: logger.error("[ALARM][HEARTBEAT] error: %s", exc) finally: await asyncio.sleep(max(5, period))