You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
9.0 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# ptz_tracker_modular/wiper.py
from __future__ import annotations
import asyncio
import logging
from typing import Optional, Tuple
import requests
from requests.auth import HTTPDigestAuth
log = logging.getLogger("PTZTracker.WIPER")
_sess = requests.Session()
_sess.trust_env = False
try:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except Exception:
pass
# Базовое значение; можно переопределить в config.py как WIPER_MIN_INTERVAL_SEC
_DEFAULT_MIN_INTERVAL_SEC = 1.5
_last_call: dict[int, float] = {}
def _resolve_cam_id(cam_token: int) -> int:
"""cam_token = реальный ID из cameras.toml ИЛИ индекс в PTZ_CAM_IDS."""
from . import config
if cam_token in config.CAMERA_CONFIG:
return cam_token
if 0 <= cam_token < len(config.PTZ_CAM_IDS):
return config.PTZ_CAM_IDS[cam_token]
raise KeyError(f"Unknown camera token: {cam_token}")
def _base(cam_id: int):
"""Вернёт (base_url, auth, verify, timeout, meta, channel) для указанной камеры."""
from . import config as C
cfg = C.CAMERA_CONFIG[cam_id]
scheme = cfg.get("scheme") or ("https" if cfg.get("https") else "http")
host = cfg["ip"]
port = int(cfg.get("port") or (443 if scheme == "https" else 80))
# ВАЖНО: сначала берём wiper_channel, потом ptz_channel, по умолчанию 1
ch = int(cfg.get("wiper_channel", cfg.get("ptz_channel", 1)))
user = cfg.get("username") or cfg.get("user") or cfg.get("login") or "admin"
pwd = cfg.get("password") or cfg.get("pass") or cfg.get("pwd") or ""
auth = HTTPDigestAuth(user, pwd)
base = f"{scheme}://{host}:{port}"
verify = bool(cfg.get("verify_tls", False))
timeout = int(cfg.get("ptz_timeout_sec", 6))
meta = {"ip": host, "port": port, "scheme": scheme, "channel": ch, "user": user}
return base, auth, verify, timeout, meta, ch
def _log_resp(tag: str, r: requests.Response | None, exc: Exception | None = None):
"""
Единая точка логирования ответов.
Переведено в DEBUG, чтобы не шуметь в INFO. Никаких print().
Возвращает кортеж (ok, http_code|None, body_str).
"""
if r is not None:
body = (r.text or "")[:300]
# детальный лог — только в DEBUG
if log.isEnabledFor(logging.DEBUG):
log.debug("%s -> HTTP %s | %r", tag, r.status_code, body)
return (200 <= r.status_code < 300), r.status_code, body
else:
log.warning("%s -> EXC %s", tag, exc)
return False, None, str(exc) if exc else ""
def _PUT(url, *, auth, verify, timeout, data=b"", headers=None):
try:
return _sess.put(url, data=data, headers=headers or {}, auth=auth, timeout=timeout, verify=verify), None
except Exception as e:
return None, e
def _GET(url, *, auth, verify, timeout):
try:
return _sess.get(url, auth=auth, timeout=timeout, verify=verify), None
except Exception as e:
return None, e
def _try_ptz(base, ch, auth, verify, timeout):
"""
Основная линейка эндпоинтов:
1) manualWiper (длинная протирка — предпочтительный путь)
2) Wiper(single) через XML
3) auxiliary?name=wiper:single
4) auxiliary?name=wiper:on
"""
# 1) manualWiper
r, e = _PUT(f"{base}/ISAPI/PTZCtrl/channels/{ch}/manualWiper",
auth=auth, verify=verify, timeout=timeout)
ok, code, body = _log_resp("manualWiper", r, e)
if ok:
return True, "manualWiper", code, body
# 2) Wiper(single) XML
xml = b'<?xml version="1.0" encoding="utf-8"?><Wiper><wiperworkMode>single</wiperworkMode></Wiper>'
r, e = _PUT(f"{base}/ISAPI/PTZCtrl/channels/{ch}/Wiper",
auth=auth, verify=verify, timeout=timeout,
data=xml, headers={"Content-Type": "application/xml"})
ok, code, body = _log_resp("Wiper(single)", r, e)
if ok:
return True, "Wiper(single)", code, body
# 3) auxiliary wiper:single
r, e = _PUT(f"{base}/ISAPI/PTZCtrl/channels/{ch}/auxiliary?name=wiper:single",
auth=auth, verify=verify, timeout=timeout)
ok, code, body = _log_resp("auxiliary?wiper:single", r, e)
if ok:
return True, "auxiliary(wiper:single)", code, body
# 4) auxiliary wiper:on
r, e = _PUT(f"{base}/ISAPI/PTZCtrl/channels/{ch}/auxiliary?name=wiper:on",
auth=auth, verify=verify, timeout=timeout)
ok, code, body = _log_resp("auxiliary?wiper:on", r, e)
if ok:
return True, "auxiliary(wiper:on)", code, body
return False, "ptz_endpoints_failed", code, body
def _probe_io(base, auth, verify, timeout):
"""Попытка найти дискретный выход, содержащий 'wiper' в имени."""
r, e = _GET(f"{base}/ISAPI/System/IO/outputs",
auth=auth, verify=verify, timeout=timeout)
ok, code, body = _log_resp("GET IO/outputs", r, e)
if not ok or not body:
return None
import re
ports = re.findall(r"<IOPort>(.*?)</IOPort>", body, flags=re.S)
for chunk in ports:
id_m = re.search(r"<id>\s*(\d+)\s*</id>", chunk)
name_m = re.search(r"<name>\s*([^<]+)\s*</name>", chunk)
if id_m and name_m and ("wiper" in name_m.group(1).strip().lower()):
out_id = int(id_m.group(1))
log.debug("IO: found output id=%s", out_id)
return out_id
log.debug("IO: no output named 'wiper' found")
return None
def _try_io(base, out_id, auth, verify, timeout, pulse_ms: int):
xmlA = f'<?xml version="1.0" encoding="UTF-8"?><IOPortData><outputState>high</outputState><transTime>{pulse_ms}</transTime></IOPortData>'.encode("utf-8")
r, e = _PUT(f"{base}/ISAPI/System/IO/outputs/{out_id}/status",
auth=auth, verify=verify, timeout=timeout,
data=xmlA, headers={"Content-Type": "application/xml"})
ok, code, body = _log_resp(f"PUT IO/outputs/{out_id}/status", r, e)
if ok:
return True, "io_status", code, body
xmlB = f'<?xml version="1.0" encoding="UTF-8"?><trigger><pulseTime>{pulse_ms}</pulseTime></trigger>'.encode("utf-8")
r, e = _PUT(f"{base}/ISAPI/System/IO/outputs/{out_id}/trigger",
auth=auth, verify=verify, timeout=timeout,
data=xmlB, headers={"Content-Type": "application/xml"})
ok, code, body = _log_resp(f"PUT IO/outputs/{out_id}/trigger", r, e)
if ok:
return True, "io_trigger", code, body
return False, "io_failed", code, body
def run(cam_token: int, sec: int = 3) -> Tuple[bool, str, Optional[int], str, int]:
"""
Синхронный запуск дворника.
Возвращает: (ok, endpoint, http_status|None, detail, cam_id)
"""
cam_id = _resolve_cam_id(cam_token)
base, auth, verify, timeout, meta, ch = _base(cam_id)
log.info("WIPER start: cam_token=%s cam_id=%s meta=%s", cam_token, cam_id, meta)
# 1) Пытаемся через PTZ эндпоинты
ok, endpoint, code, body = _try_ptz(base, ch, auth, verify, timeout)
if ok:
return True, endpoint, code, body, cam_id
# 2) Фоллбек через дискретный выход, если нашли
out_id = _probe_io(base, auth, verify, timeout)
if out_id is not None:
pulse_ms = max(500, int(sec * 1000))
ok2, tag, code2, body2 = _try_io(base, out_id, auth, verify, timeout, pulse_ms)
return (ok2, tag, code2, body2, cam_id)
# 3) Совсем не получилось
return False, endpoint, code, body, cam_id
async def trigger_wiper_once(cam_token: int, sec: float = 3.0) -> Tuple[bool, str, Optional[int], str, int]:
"""
Асинхронный вызов с анти-дребезгом.
Супрессия теперь возвращает ok=False (чтобы фронт видел «слишком часто»).
Интервал берём из config.WIPER_MIN_INTERVAL_SEC при наличии.
"""
from . import config # локальный импорт, чтобы не тянуть конфиг на уровне модуля
loop = asyncio.get_running_loop()
cam_id = _resolve_cam_id(cam_token)
min_interval = float(getattr(config, "WIPER_MIN_INTERVAL_SEC", _DEFAULT_MIN_INTERVAL_SEC))
now = loop.time()
last = _last_call.get(cam_id, 0.0)
if (now - last) < min_interval:
delta = now - last
log.debug("WIPER suppressed: cam_id=%s delta=%.2f < %.2f", cam_id, delta, min_interval)
# ok=False, чтобы UI мог показать «слишком часто»
return False, "suppressed", None, f"too-frequent ({delta:.2f}s < {min_interval:.2f}s)", cam_id
_last_call[cam_id] = now
# нормализация sec
try:
sec_i = int(round(float(sec)))
except Exception:
sec_i = 3
if sec_i <= 0:
sec_i = 1
return await asyncio.to_thread(run, cam_token, sec_i)