|
|
from __future__ import annotations
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
from typing import Any, Tuple, Optional
|
|
|
from . import state, config
|
|
|
|
|
|
CV2_JPEG_PARAMS = [
|
|
|
int(cv2.IMWRITE_JPEG_QUALITY), int(config.PREVIEW_JPEG_QUALITY),
|
|
|
int(cv2.IMWRITE_JPEG_PROGRESSIVE), 0,
|
|
|
int(cv2.IMWRITE_JPEG_OPTIMIZE), 0
|
|
|
]
|
|
|
|
|
|
|
|
|
def encode_jpeg_bgr(img_bgr: np.ndarray) -> Optional[bytes]:
|
|
|
ok, jpeg = cv2.imencode(".jpg", img_bgr, CV2_JPEG_PARAMS)
|
|
|
return jpeg.tobytes() if ok else None
|
|
|
|
|
|
|
|
|
def maybe_downscale(img: np.ndarray, target_w: int = config.PREVIEW_TARGET_W) -> Tuple[np.ndarray, float, float]:
|
|
|
h, w = img.shape[:2]
|
|
|
if w <= target_w:
|
|
|
return img, 1.0, 1.0
|
|
|
scale = target_w / float(w)
|
|
|
return cv2.resize(img, (target_w, int(h * scale)), interpolation=cv2.INTER_AREA), scale, scale
|
|
|
|
|
|
|
|
|
# ----------------------------- HUD (новое) -----------------------------
|
|
|
|
|
|
def _fmt(v: Optional[float], suffix: str = "°", nd: int = 1) -> str:
|
|
|
if v is None:
|
|
|
return "—"
|
|
|
try:
|
|
|
return f"{float(v):.{nd}f}{suffix}"
|
|
|
except Exception:
|
|
|
return "—"
|
|
|
|
|
|
def _bearing_to_compass(b: Optional[float]) -> str:
|
|
|
if b is None:
|
|
|
return "—"
|
|
|
names = ["N","NNE","NE","ENE","E","ESE","SE","SSE",
|
|
|
"S","SSW","SW","WSW","W","WNW","NW","NNW"]
|
|
|
i = int(((float(b) % 360.0) + 11.25) // 22.5) % 16
|
|
|
return names[i]
|
|
|
|
|
|
def _draw_text_block(img: np.ndarray, lines: list[str], org=(10, 24)) -> None:
|
|
|
"""
|
|
|
Рисует компактный полупрозрачный блок с текстом в левом верхнем углу.
|
|
|
"""
|
|
|
x0, y0 = org
|
|
|
pad_x, pad_y = 8, 6
|
|
|
line_h = 22
|
|
|
# вычислим ширину по самой длинной строке
|
|
|
max_w = 0
|
|
|
for t in lines:
|
|
|
(tw, th), _ = cv2.getTextSize(t, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
|
|
|
max_w = max(max_w, tw)
|
|
|
box_w = max_w + pad_x * 2
|
|
|
box_h = line_h * len(lines) + pad_y * 2
|
|
|
|
|
|
x1, y1 = x0, y0 - 18
|
|
|
x2, y2 = x1 + box_w, y1 + box_h
|
|
|
x2 = min(x2, img.shape[1] - 1)
|
|
|
y2 = min(y2, img.shape[0] - 1)
|
|
|
|
|
|
# полупрозрачный фон
|
|
|
overlay = img.copy()
|
|
|
cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 0, 0), -1)
|
|
|
cv2.addWeighted(overlay, 0.35, img, 0.65, 0, img)
|
|
|
|
|
|
# текст
|
|
|
y = y1 + pad_y + 16
|
|
|
for t in lines:
|
|
|
cv2.putText(img, t, (x1 + pad_x, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA)
|
|
|
y += line_h
|
|
|
|
|
|
def draw_status_hud(cam_id: int, img_bgr: np.ndarray) -> None:
|
|
|
"""
|
|
|
Отрисовывает HUD поверх кадра:
|
|
|
Bearing (гео), Δ от preset1, Attack (курс+румб), raw pan/tilt.
|
|
|
Ничего не делает, если данных нет.
|
|
|
"""
|
|
|
st = state.ptz_states.get(cam_id, {})
|
|
|
bearing = st.get("bearing_curr_geo")
|
|
|
delta = st.get("delta_from_preset1_geo")
|
|
|
attack = st.get("attack_bearing_geo")
|
|
|
attack_compass = st.get("attack_compass")
|
|
|
az_deg = st.get("az_deg")
|
|
|
tilt_deg = st.get("tilt_deg")
|
|
|
|
|
|
lines = []
|
|
|
if bearing is not None:
|
|
|
lines.append(f"Bearing: {_fmt(bearing)} ({_bearing_to_compass(bearing)})")
|
|
|
if delta is not None:
|
|
|
sign = "+" if float(delta) >= 0 else ""
|
|
|
lines.append(f"ΔPreset1: {sign}{_fmt(delta)}")
|
|
|
if attack is not None or attack_compass:
|
|
|
comp = attack_compass if isinstance(attack_compass, str) else _bearing_to_compass(attack)
|
|
|
lines.append(f"Attack: {_fmt(attack)} ({comp})")
|
|
|
if az_deg is not None or tilt_deg is not None:
|
|
|
lines.append(f"raw pan/tilt: {_fmt(az_deg)} / {_fmt(tilt_deg)}")
|
|
|
|
|
|
if lines:
|
|
|
_draw_text_block(img_bgr, lines, org=(10, 28))
|
|
|
|
|
|
|
|
|
# --------------------------- публикация ----------------------------
|
|
|
|
|
|
def publish_preview(cam_id: int, img: np.ndarray) -> None:
|
|
|
"""
|
|
|
Публикатор JPEG-превью. При наличии флага config.PREVIEW_DRAW_HUD
|
|
|
поверх кадра будет нарисован компактный HUD с курсом/дельтой/атакой.
|
|
|
"""
|
|
|
if getattr(config, "PREVIEW_DRAW_HUD", False):
|
|
|
try:
|
|
|
draw_status_hud(cam_id, img)
|
|
|
except Exception:
|
|
|
# в превью важнее стабильность, ошибки HUD не должны мешать стриму
|
|
|
pass
|
|
|
|
|
|
if config.PUBLISH_ONLY_IF_CLIENTS and not state.video_clients:
|
|
|
return
|
|
|
|
|
|
jpeg = encode_jpeg_bgr(img)
|
|
|
if not jpeg:
|
|
|
return
|
|
|
fid = state.frame_id_by_cam.get(cam_id, 0) + 1
|
|
|
state.frame_id_by_cam[cam_id] = fid
|
|
|
state.latest_jpeg_by_cam[cam_id] = (fid, jpeg)
|