From 7f04b1ddb1ffc3ba58675a2356cb785af23f64c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D1=80=D0=BE=D1=81=D0=BB=D0=B0=D0=B2=20=D0=9A=D0=B0?= =?UTF-8?q?=D1=80=D1=82=D0=B0=D1=88=D0=BE=D0=B2?= Date: Thu, 12 Feb 2026 16:13:11 +0700 Subject: [PATCH] First commit --- bytetrack_min.py | 340 +++++++++++++++++ fpv_lock_bytetrack.py | 849 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1189 insertions(+) create mode 100644 bytetrack_min.py create mode 100644 fpv_lock_bytetrack.py diff --git a/bytetrack_min.py b/bytetrack_min.py new file mode 100644 index 0000000..faf2962 --- /dev/null +++ b/bytetrack_min.py @@ -0,0 +1,340 @@ +# bytetrack_min.py +import numpy as np + +def tlbr_to_xyah(tlbr): + x1, y1, x2, y2 = tlbr + w = max(1.0, x2 - x1) + h = max(1.0, y2 - y1) + cx = x1 + w * 0.5 + cy = y1 + h * 0.5 + a = w / h + return np.array([cx, cy, a, h], dtype=np.float32) + +def xyah_to_tlbr(xyah): + cx, cy, a, h = [float(v) for v in xyah] + h = max(2.0, h) + w = max(2.0, a * h) + x1 = cx - w * 0.5 + y1 = cy - h * 0.5 + x2 = cx + w * 0.5 + y2 = cy + h * 0.5 + return np.array([x1, y1, x2, y2], dtype=np.float32) + +def iou_tlbr(a, b): + ax1, ay1, ax2, ay2 = a + bx1, by1, bx2, by2 = b + ix1, iy1 = max(ax1, bx1), max(ay1, by1) + ix2, iy2 = min(ax2, bx2), min(ay2, by2) + iw = max(0.0, ix2 - ix1) + ih = max(0.0, iy2 - iy1) + inter = iw * ih + if inter <= 0: + return 0.0 + area_a = max(1.0, (ax2 - ax1)) * max(1.0, (ay2 - ay1)) + area_b = max(1.0, (bx2 - bx1)) * max(1.0, (by2 - by1)) + union = area_a + area_b - inter + return float(inter / union) if union > 0 else 0.0 + +def iou_matrix(tracks_tlbr, dets_tlbr): + if len(tracks_tlbr) == 0 or len(dets_tlbr) == 0: + return np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32) + M = np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32) + for i, t in enumerate(tracks_tlbr): + for j, d in enumerate(dets_tlbr): + M[i, j] = iou_tlbr(t, d) + return M + +# Hungarian assignment (min-cost). We pass cost = 1 - IoU. +def hungarian(cost): + cost = cost.copy() + n, m = cost.shape + N = max(n, m) + + pad = np.zeros((N, N), dtype=np.float32) + pad[:n, :m] = cost + big = float(cost.max() + 1.0) if cost.size else 1.0 + if n < N: + pad[n:, :] = big + if m < N: + pad[:, m:] = big + cost = pad + + N = cost.shape[0] + u = np.zeros(N, dtype=np.float32) + v = np.zeros(N, dtype=np.float32) + p = np.zeros(N, dtype=np.int32) + way = np.zeros(N, dtype=np.int32) + + for i in range(1, N): + p[0] = i + j0 = 0 + minv = np.full(N, np.inf, dtype=np.float32) + used = np.zeros(N, dtype=bool) + way.fill(0) + + while True: + used[j0] = True + i0 = p[j0] + delta = np.inf + j1 = 0 + for j in range(1, N): + if not used[j]: + cur = cost[i0, j] - u[i0] - v[j] + if cur < minv[j]: + minv[j] = cur + way[j] = j0 + if minv[j] < delta: + delta = minv[j] + j1 = j + for j in range(N): + if used[j]: + u[p[j]] += delta + v[j] -= delta + else: + minv[j] -= delta + j0 = j1 + if p[j0] == 0: + break + + while True: + j1 = way[j0] + p[j0] = p[j1] + j0 = j1 + if j0 == 0: + break + + assignment = -np.ones(N, dtype=np.int32) + for j in range(1, N): + if p[j] != 0: + assignment[p[j]] = j + + row_to_col = assignment[:n] + matches = [] + for r, c in enumerate(row_to_col): + if 0 <= c < m: + matches.append((r, int(c))) + return matches + +class KalmanXYAH: + # State: [cx,cy,a,h, vcx,vcy,va,vh] + def __init__(self): + self.ndim = 4 + self.dim_x = 8 + self._motion_mat = np.eye(self.dim_x, dtype=np.float32) + for i in range(self.ndim): + self._motion_mat[i, self.ndim + i] = 1.0 + self._update_mat = np.eye(self.ndim, self.dim_x, dtype=np.float32) + + # Noise (tunable) + self.std_pos = np.array([2.0, 2.0, 1e-2, 2.5], dtype=np.float32) + self.std_vel = np.array([6.0, 6.0, 1e-2, 6.0], dtype=np.float32) + + def initiate(self, measurement_xyah): + mean = np.zeros((self.dim_x,), dtype=np.float32) + mean[:4] = measurement_xyah + cov = np.eye(self.dim_x, dtype=np.float32) + cov[:4, :4] *= 50.0 + cov[4:, 4:] *= 200.0 + return mean, cov + + def predict(self, mean, cov, dt=1.0): + motion_mat = self._motion_mat.copy() + for i in range(self.ndim): + motion_mat[i, self.ndim + i] = float(dt) + + std_pos = self.std_pos + std_vel = self.std_vel + Q = np.diag(np.concatenate([std_pos**2, std_vel**2]).astype(np.float32)) + + mean = motion_mat @ mean + cov = motion_mat @ cov @ motion_mat.T + Q + return mean, cov + + def update(self, mean, cov, measurement_xyah): + R = np.diag((self.std_pos ** 2).astype(np.float32)) + S = self._update_mat @ cov @ self._update_mat.T + R + K = cov @ self._update_mat.T @ np.linalg.inv(S) + y = measurement_xyah - (self._update_mat @ mean) + mean = mean + K @ y + cov = (np.eye(self.dim_x, dtype=np.float32) - K @ self._update_mat) @ cov + return mean, cov + +class TrackState: + Tracked = 1 + Lost = 2 + Removed = 3 + +class STrack: + _next_id = 1 + + def __init__(self, tlbr, score, kf: KalmanXYAH): + self.kf = kf + self.mean = None + self.cov = None + self.tlbr = np.array(tlbr, dtype=np.float32) + self.score = float(score) + + self.track_id = STrack._next_id + STrack._next_id += 1 + + self.state = TrackState.Tracked + self.is_activated = False + + self.frame_id = 0 + self.start_frame = 0 + + self.time_since_update = 0 + self.hits = 0 + + def activate(self, frame_id): + self.frame_id = frame_id + self.start_frame = frame_id + self.time_since_update = 0 + xyah = tlbr_to_xyah(self.tlbr) + self.mean, self.cov = self.kf.initiate(xyah) + self.is_activated = True + self.hits = 1 + self.state = TrackState.Tracked + + def predict(self, dt=1.0): + if self.mean is None: + return + self.mean, self.cov = self.kf.predict(self.mean, self.cov, dt=dt) + self.tlbr = xyah_to_tlbr(self.mean[:4]) + self.time_since_update += 1 + + def update(self, tlbr, score, frame_id): + self.frame_id = frame_id + self.time_since_update = 0 + self.score = float(score) + self.hits += 1 + xyah = tlbr_to_xyah(tlbr) + self.mean, self.cov = self.kf.update(self.mean, self.cov, xyah) + self.tlbr = xyah_to_tlbr(self.mean[:4]) + self.state = TrackState.Tracked + self.is_activated = True + + def mark_lost(self): + self.state = TrackState.Lost + + def mark_removed(self): + self.state = TrackState.Removed + +class BYTETracker: + def __init__( + self, + track_high_thresh=0.35, + track_low_thresh=0.12, + new_track_thresh=0.35, + match_thresh=0.70, + track_buffer=50, + min_hits=2, + ): + self.high = float(track_high_thresh) + self.low = float(track_low_thresh) + self.new = float(new_track_thresh) + self.match_thresh = float(match_thresh) + self.track_buffer = int(track_buffer) + self.min_hits = int(min_hits) + + self.kf = KalmanXYAH() + self.frame_id = 0 + + self.tracked = [] + self.lost = [] + self.removed = [] + + def update(self, dets_tlbr_score, dt=1.0): + self.frame_id += 1 + frame_id = self.frame_id + + if dets_tlbr_score is None: + dets_tlbr_score = np.zeros((0, 5), dtype=np.float32) + dets = dets_tlbr_score.astype(np.float32, copy=False) + + if len(dets) == 0: + scores = np.zeros((0,), dtype=np.float32) + else: + scores = dets[:, 4] + + high_mask = scores >= self.high + low_mask = (scores >= self.low) & (scores < self.high) + + dets_high = dets[high_mask] + dets_low = dets[low_mask] + + for t in self.tracked: + t.predict(dt=dt) + + matches_a, u_trk_a, u_det_a = self._associate(self.tracked, dets_high, iou_thresh=self.match_thresh) + + for ti, di in matches_a: + trk = self.tracked[ti] + d = dets_high[di] + trk.update(d[:4], d[4], frame_id) + + remaining_tracks = [self.tracked[i] for i in u_trk_a] + matches_b, u_trk_b, _u_det_b = self._associate( + remaining_tracks, dets_low, iou_thresh=max(0.10, self.match_thresh - 0.15) + ) + + for ti, di in matches_b: + trk = remaining_tracks[ti] + d = dets_low[di] + trk.update(d[:4], d[4], frame_id) + + unmatched_after_b = [remaining_tracks[i] for i in u_trk_b] + for trk in unmatched_after_b: + trk.mark_lost() + + new_tracked = [t for t in self.tracked if t.state == TrackState.Tracked] + newly_lost = [t for t in self.tracked if t.state == TrackState.Lost] + self.tracked = new_tracked + self.lost.extend(newly_lost) + + for di in u_det_a: + d = dets_high[di] + if float(d[4]) >= self.new: + nt = STrack(d[:4], d[4], self.kf) + nt.activate(frame_id) + self.tracked.append(nt) + + kept_lost = [] + for t in self.lost: + if (frame_id - t.frame_id) <= self.track_buffer: + kept_lost.append(t) + else: + t.mark_removed() + self.removed.append(t) + self.lost = kept_lost + + out = [] + for t in self.tracked: + if t.hits >= self.min_hits: + out.append(t) + return out + + def _associate(self, tracks, dets, iou_thresh): + if len(tracks) == 0 or len(dets) == 0: + return [], list(range(len(tracks))), list(range(len(dets))) + + trk_boxes = [t.tlbr for t in tracks] + det_boxes = [d[:4] for d in dets] + + ious = iou_matrix(trk_boxes, det_boxes) + cost = 1.0 - ious + + matches = hungarian(cost) + + matched = [] + u_trk = set(range(len(tracks))) + u_det = set(range(len(dets))) + + thr = float(iou_thresh) + for ti, di in matches: + if ious[ti, di] >= thr: + matched.append((ti, di)) + u_trk.discard(ti) + u_det.discard(di) + + return matched, sorted(list(u_trk)), sorted(list(u_det)) diff --git a/fpv_lock_bytetrack.py b/fpv_lock_bytetrack.py new file mode 100644 index 0000000..1e0682e --- /dev/null +++ b/fpv_lock_bytetrack.py @@ -0,0 +1,849 @@ +import cv2 +import numpy as np +import time +import threading +from collections import deque +import torch +from ultralytics import YOLO + +from bytetrack_min import BYTETracker # <-- add this file nearby + + +# USER SETTINGS + +MODEL_PATH = r"C:\Users\Legion\PycharmProjects\YOLOTrain\.venv\runs\detect\train30\weights\best.pt" +DEVICE = 0 +USE_HALF = True + +USE_VIDEO_FILE = True +VIDEO_PATH = r"C:\Users\Legion\PycharmProjects\YOLOTrain\videoMAI\Высший пилотаж.mp4" +VIDEO_REALTIME = True + +CAM_INDEX = 0 +CAP_BACKEND = cv2.CAP_DSHOW # or cv2.CAP_MSMF + +SHOW_OUTPUT = True +WINDOW_NAME = "FPV LOCK PAL + ByteTrack" +TARGET_OUT_FPS = 20 # best-effort pacing + +# Effective resolution policy (for analog PAL) +EFFECTIVE_W = 720 +EFFECTIVE_H = 576 +FORCE_EFFECTIVE_PAL = True + +# YOLO inference settings +CONF = 0.22 # confidence used INSIDE YOLO inference call +IMG_SIZE_ROI = 384 +IMG_SIZE_FULL = 640 +MAX_DET = 60 +TARGET_CLASS_ID = 0 # set int if you want only one class + +# Filters (small drones far away) +MIN_BOX_AREA = 30 +MAX_BOX_AREA = 0 +ASPECT_RANGE = (0.15, 6.0) + +# FSM +CONFIRM_HITS = 2 +MAX_MISSES = 10 +RECOVER_FULLSCAN_EVERY = 18 +RECOVER_FORCED_DET_EVERY = 2 + +# ROI sizing (in effective pixels) +BASE_RADIUS = 150 +MAX_RADIUS = 520 +SPEED_RADIUS_FACTOR = 18.0 +FORWARD_BIAS = 1.8 +SIDE_BIAS = 1.2 + +# KLT optical flow tracker (effective) +KLT_MAX_CORNERS = 140 +KLT_QUALITY = 0.01 +KLT_MIN_DIST = 6 +KLT_WIN = (21, 21) +KLT_MAX_LEVEL = 3 +KLT_OUTLIER_THRESH = 10.0 + +# KLT validity gates (anti-drift) +KLT_OK_Q = 0.35 +KLT_OK_PTS = 30 +KLT_REINIT_MIN_POINTS = 30 + +# YOLO safety +YOLO_NO_DET_LIMIT = 3 +YOLO_FRESH_SEC = 0.50 +YOLO_FORCE_DET_WHEN_WEAK = True + +# Appearance gate (HSV hist) in effective image +USE_HSV_GATE = False +HSV_HIST_BINS = (16, 16) +HSV_GATE_MIN_SIM = 0.25 +HSV_UPDATE_EVERY = 25 + +# Async YOLO worker +YOLO_QUEUE_MAX = 1 + +# Debug / draw +DEBUG = True +DRAW_ALL_BOXES = True +DRAW_LOCK_BOX = True +DRAW_KALMAN = True +DRAW_KLT_POINTS = False +DRAW_BT_TRACKS = True +BT_DRAW_ONLY_CONFIRMED = False + + +# ByteTrack SETTINGS (important!) + + +BT_HIGH = 0.35 +BT_LOW = 0.12 +BT_NEW = 0.35 +BT_MATCH_IOU = 0.70 +BT_BUFFER = 50 +BT_MIN_HITS = 1 + + +# HELPERS + + +def clamp(x, a, b): + return max(a, min(b, x)) + +def box_center(box): + x1, y1, x2, y2 = box + return np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32) + +def box_wh(box): + x1, y1, x2, y2 = box + return np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32) + +def box_area(box): + w, h = box_wh(box) + return float(w * h) + +def iou(a, b): + ax1, ay1, ax2, ay2 = a + bx1, by1, bx2, by2 = b + ix1, iy1 = max(ax1, bx1), max(ay1, by1) + ix2, iy2 = min(ax2, bx2), min(ay2, by2) + inter = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1) + if inter <= 0: + return 0.0 + union = box_area(a) + box_area(b) - inter + return 0.0 if union <= 0 else float(inter / union) + +def clip_box(box, w, h): + x1, y1, x2, y2 = box + x1 = clamp(float(x1), 0.0, float(w - 2)) + y1 = clamp(float(y1), 0.0, float(h - 2)) + x2 = clamp(float(x2), 1.0, float(w - 1)) + y2 = clamp(float(y2), 1.0, float(h - 1)) + if x2 <= x1 + 1: + x2 = x1 + 2 + if y2 <= y1 + 1: + y2 = y1 + 2 + return np.array([x1, y1, x2, y2], dtype=np.float32) + +def crop_roi(frame, roi_box): + x1, y1, x2, y2 = map(int, roi_box) + return frame[y1:y2, x1:x2], x1, y1 + +def compute_hsv_hist(frame, box): + x1, y1, x2, y2 = map(int, box) + patch = frame[y1:y2, x1:x2] + if patch.size == 0: + return None + hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV) + hist = cv2.calcHist([hsv], [0, 1], None, list(HSV_HIST_BINS), [0, 180, 0, 256]) + cv2.normalize(hist, hist, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX) + return hist + +def hsv_sim(h1, h2): + if h1 is None or h2 is None: + return 0.0 + d = cv2.compareHist(h1, h2, cv2.HISTCMP_BHATTACHARYYA) + return float(1.0 - d) + +def filter_yolo_boxes_with_scores(result, offset_x=0, offset_y=0, min_conf=0.12): + dets_out = [] + if result.boxes is None or len(result.boxes) == 0: + return dets_out + + xyxy = result.boxes.xyxy.detach().cpu().numpy() + confs = result.boxes.conf.detach().cpu().numpy() + clss = result.boxes.cls.detach().cpu().numpy().astype(int) + + for b, c, cls_id in zip(xyxy, confs, clss): + score = float(c) + if score < float(min_conf): + continue + if TARGET_CLASS_ID is not None and cls_id != TARGET_CLASS_ID: + continue + + x1, y1, x2, y2 = map(float, b) + ww = max(0.0, x2 - x1) + hh = max(0.0, y2 - y1) + if ww <= 1.0 or hh <= 1.0: + continue + + a = ww * hh + if a < MIN_BOX_AREA: + continue + if MAX_BOX_AREA > 0 and a > MAX_BOX_AREA: + continue + + ar = ww / hh + if not (ASPECT_RANGE[0] <= ar <= ASPECT_RANGE[1]): + continue + + dets_out.append(np.array([x1 + offset_x, y1 + offset_y, x2 + offset_x, y2 + offset_y, score], dtype=np.float32)) + + return dets_out + + +# --- scale mapping between original and effective --- + +def get_effective_frame(orig_bgr): + oh, ow = orig_bgr.shape[:2] + if not FORCE_EFFECTIVE_PAL: + return orig_bgr, 1.0, 1.0 + eff = cv2.resize(orig_bgr, (EFFECTIVE_W, EFFECTIVE_H), interpolation=cv2.INTER_AREA) + sx = EFFECTIVE_W / float(ow) + sy = EFFECTIVE_H / float(oh) + return eff, sx, sy + +def unscale_box(box_eff, sx, sy): + x1, y1, x2, y2 = box_eff + return np.array([x1 / sx, y1 / sy, x2 / sx, y2 / sy], dtype=np.float32) + +# ========================= +# KALMAN 8D (cx,cy,w,h + velocities) with dt +# ========================= + +class Kalman8D: + def __init__(self): + self.x = np.zeros((8, 1), dtype=np.float32) + self.P = np.eye(8, dtype=np.float32) * 50.0 + + self.H = np.zeros((4, 8), dtype=np.float32) + self.H[0, 0] = 1.0 + self.H[1, 1] = 1.0 + self.H[2, 2] = 1.0 + self.H[3, 3] = 1.0 + + self.R = np.eye(4, dtype=np.float32) + self.R[0, 0] = 16.0 + self.R[1, 1] = 16.0 + self.R[2, 2] = 25.0 + self.R[3, 3] = 25.0 + + self.initialized = False + self.q_pos = 2.0 + self.q_size = 3.0 + self.q_vel = 4.0 + + def F(self, dt): + F = np.eye(8, dtype=np.float32) + F[0, 4] = dt + F[1, 5] = dt + F[2, 6] = dt + F[3, 7] = dt + return F + + def Q(self, dt): + q = np.zeros((8, 8), dtype=np.float32) + s1 = dt * dt + s2 = dt + q[0, 0] = self.q_pos * s1 + q[1, 1] = self.q_pos * s1 + q[2, 2] = self.q_size * s1 + q[3, 3] = self.q_size * s1 + q[4, 4] = self.q_vel * s2 + q[5, 5] = self.q_vel * s2 + q[6, 6] = self.q_vel * s2 + q[7, 7] = self.q_vel * s2 + return q + + def predict(self, dt): + dt = float(max(1e-3, dt)) + F = self.F(dt) + self.x = F @ self.x + self.P = F @ self.P @ F.T + self.Q(dt) + return self.x[:4].flatten() + + def update(self, z): + z = np.array(z, dtype=np.float32).reshape(4, 1) + y = z - self.H @ self.x + S = self.H @ self.P @ self.H.T + self.R + K = self.P @ self.H.T @ np.linalg.inv(S) + self.x = self.x + K @ y + I = np.eye(8, dtype=np.float32) + self.P = (I - K @ self.H) @ self.P + + def init_from_box(self, box): + cx, cy = box_center(box) + w, h = box_wh(box) + self.x[:] = 0 + self.x[0, 0] = cx + self.x[1, 0] = cy + self.x[2, 0] = w + self.x[3, 0] = h + self.P = np.eye(8, dtype=np.float32) * 50.0 + self.initialized = True + + def to_box(self): + cx = float(self.x[0, 0]) + cy = float(self.x[1, 0]) + w = float(max(2.0, self.x[2, 0])) + h = float(max(2.0, self.x[3, 0])) + return np.array([cx - w * 0.5, cy - h * 0.5, cx + w * 0.5, cy + h * 0.5], dtype=np.float32) + + def uncertainty(self): + return float(self.P[0, 0] + self.P[1, 1] + self.P[2, 2] + self.P[3, 3]) + +# ========================= +# KLT TRACKER (effective) +# ========================= + +class KLTTracker: + def __init__(self): + self.prev_gray = None + self.pts = None + self.box = None + self.good_count = 0 + self.quality = 0.0 + + def reset(self): + self.prev_gray = None + self.pts = None + self.box = None + self.good_count = 0 + self.quality = 0.0 + + def init(self, frame_bgr, box): + gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + x1, y1, x2, y2 = map(int, box) + mask = np.zeros_like(gray) + cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1) + + pts = cv2.goodFeaturesToTrack( + gray, + maxCorners=KLT_MAX_CORNERS, + qualityLevel=KLT_QUALITY, + minDistance=KLT_MIN_DIST, + mask=mask + ) + self.prev_gray = gray + self.pts = pts + self.box = box.copy() + self.good_count = 0 + self.quality = 0.0 + + def update(self, frame_bgr): + if self.prev_gray is None or self.pts is None or len(self.pts) == 0 or self.box is None: + self.quality = 0.0 + self.good_count = 0 + return None + + gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + next_pts, st, err = cv2.calcOpticalFlowPyrLK( + self.prev_gray, gray, self.pts, None, + winSize=KLT_WIN, + maxLevel=KLT_MAX_LEVEL, + criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) + ) + + if next_pts is None or st is None: + self.quality = 0.0 + self.good_count = 0 + self.prev_gray = gray + return None + + st = st.reshape(-1) + good_old = self.pts[st == 1].reshape(-1, 2) + good_new = next_pts[st == 1].reshape(-1, 2) + + self.good_count = int(len(good_new)) + if self.good_count < 6: + self.quality = 0.0 + self.prev_gray = gray + self.pts = good_new.reshape(-1, 1, 2) if self.good_count > 0 else None + return None + + disp = good_new - good_old + med = np.median(disp, axis=0) + d = np.linalg.norm(disp - med[None, :], axis=1) + inliers = d < KLT_OUTLIER_THRESH + in_cnt = int(np.count_nonzero(inliers)) + + if in_cnt < 6: + self.quality = 0.0 + self.prev_gray = gray + self.pts = good_new.reshape(-1, 1, 2) + return None + + disp_in = disp[inliers] + med = np.median(disp_in, axis=0) + dx, dy = float(med[0]), float(med[1]) + + x1, y1, x2, y2 = self.box + new_box = np.array([x1 + dx, y1 + dy, x2 + dx, y2 + dy], dtype=np.float32) + + frac = float(in_cnt) / float(len(inliers)) + self.quality = clamp(frac * (self.good_count / max(1, KLT_MAX_CORNERS)), 0.0, 1.0) + + self.prev_gray = gray + self.pts = good_new[inliers].reshape(-1, 1, 2) + self.box = new_box + return new_box + +# ========================= +# ASYNC YOLO WORKER (effective) +# ========================= + +class YOLOWorker: + def __init__(self, model): + self.model = model + self.req = deque(maxlen=YOLO_QUEUE_MAX) + self.res = deque(maxlen=1) + self.lock = threading.Lock() + self.running = False + self.thread = threading.Thread(target=self._loop, daemon=True) + + def start(self): + self.running = True + self.thread.start() + + def stop(self): + self.running = False + self.thread.join(timeout=1.0) + + def submit(self, frame_eff_bgr, roi_box_eff, mode, ts): + with self.lock: + self.req.append((frame_eff_bgr, roi_box_eff, mode, ts)) + + def try_get(self): + with self.lock: + if not self.res: + return None + return self.res.pop() + + def _loop(self): + while self.running: + item = None + with self.lock: + if self.req: + item = self.req.pop() + self.req.clear() + + if item is None: + time.sleep(0.001) + continue + + frame, roi_box, mode, ts = item + h, w = frame.shape[:2] + dets = [] + infer_ms = 0.0 + used_roi = False + + try: + if roi_box is not None: + roi_box = clip_box(roi_box, w, h) + crop, ox, oy = crop_roi(frame, roi_box) + if crop.size > 0: + used_roi = True + t0 = time.perf_counter() + with torch.inference_mode(): + r = self.model(crop, conf=CONF, imgsz=IMG_SIZE_ROI, verbose=False, max_det=MAX_DET)[0] + infer_ms = (time.perf_counter() - t0) * 1000.0 + dets = filter_yolo_boxes_with_scores(r, offset_x=ox, offset_y=oy, min_conf=BT_LOW) + + else: + sh, sw = frame.shape[:2] + short = min(sh, sw) + scale = 1.0 + target = IMG_SIZE_FULL + if short > target: + scale = target / float(short) + small = cv2.resize(frame, (int(sw * scale), int(sh * scale)), interpolation=cv2.INTER_AREA) + else: + small = frame + + t0 = time.perf_counter() + with torch.inference_mode(): + r = self.model(small, conf=CONF, imgsz=IMG_SIZE_FULL, verbose=False, max_det=MAX_DET)[0] + infer_ms = (time.perf_counter() - t0) * 1000.0 + + dets_s = filter_yolo_boxes_with_scores(r, 0, 0, min_conf=BT_LOW) + if scale != 1.0: + inv = 1.0 / scale + dets = [np.array([d[0]*inv, d[1]*inv, d[2]*inv, d[3]*inv, d[4]], dtype=np.float32) for d in dets_s] + else: + dets = dets_s + + except Exception: + dets = [] + infer_ms = 0.0 + + with self.lock: + self.res.append((dets, ts, mode, infer_ms, used_roi)) + +# ========================= +# MAIN +# ========================= + +def main(): + print("Loading YOLO...") + model = YOLO(MODEL_PATH) + + if torch.cuda.is_available(): + model.to(f"cuda:{DEVICE}") + if USE_HALF: + try: + model.model.half() + except Exception: + pass + + if torch.cuda.is_available(): + dummy = np.zeros((IMG_SIZE_ROI, IMG_SIZE_ROI, 3), dtype=np.uint8) + with torch.inference_mode(): + for _ in range(3): + _ = model(dummy, conf=CONF, imgsz=IMG_SIZE_ROI, verbose=False, max_det=MAX_DET) + + cap = cv2.VideoCapture(VIDEO_PATH) if USE_VIDEO_FILE else cv2.VideoCapture(CAM_INDEX, CAP_BACKEND) + if not cap.isOpened(): + print("Capture open failed") + return + + if SHOW_OUTPUT: + cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL) + + yolo_worker = YOLOWorker(model) + yolo_worker.start() + + kf = Kalman8D() + klt = KLTTracker() + + bt = BYTETracker( + track_high_thresh=BT_HIGH, + track_low_thresh=BT_LOW, + new_track_thresh=BT_NEW, + match_thresh=BT_MATCH_IOU, + track_buffer=BT_BUFFER, + min_hits=BT_MIN_HITS, + ) + target_id = None # single-target lock over ByteTrack tracks + + locked_box_eff = None + confirmed = False + hit_streak = 0 + miss_streak = 0 + ref_hist = None + + prev_ts = None + frame_id = 0 + last_det_frame = -999 + last_fullscan_frame = -999 + + yolo_no_det = 0 + last_yolo_ok_ts = -1e9 + + perf_hist = deque(maxlen=120) + yolo_hist = deque(maxlen=120) + + print("Start") + + while True: + ret, frame_orig = cap.read() + ts = time.perf_counter() + if not ret: + break + + frame_eff, sx, sy = get_effective_frame(frame_orig) + eh, ew = frame_eff.shape[:2] + + # dt + if prev_ts is None: + dt = 1.0 / 25.0 + else: + dt = float(max(1e-3, ts - prev_ts)) + prev_ts = ts + + iter_start = time.perf_counter() + + # 1) Kalman predict + pred_box_eff = None + if kf.initialized: + kf.predict(dt) + pred_box_eff = clip_box(kf.to_box(), ew, eh) + + # speed/unc (effective px) + speed = 0.0 + unc = 0.0 + if kf.initialized: + vx = float(kf.x[4, 0]); vy = float(kf.x[5, 0]) + speed = (vx * vx + vy * vy) ** 0.5 + unc = kf.uncertainty() + + # 2) KLT update (effective) with validity gates + klt_valid = False + if confirmed and locked_box_eff is not None: + if klt.prev_gray is None or klt.pts is None or len(klt.pts) < KLT_REINIT_MIN_POINTS: + klt.init(frame_eff, locked_box_eff) + + klt_box = klt.update(frame_eff) + if klt_box is not None: + klt_box = clip_box(klt_box, ew, eh) + klt_valid = (klt.quality >= KLT_OK_Q) and (klt.good_count >= KLT_OK_PTS) + if klt_valid: + cx, cy = box_center(klt_box) + bw, bh = box_wh(klt_box) + kf.update([cx, cy, bw, bh]) + locked_box_eff = klt_box + + # 3) Adaptive detect schedule + base = 6 if confirmed else RECOVER_FORCED_DET_EVERY + fast_bonus = int(clamp(speed / 28.0, 0, 4)) + unc_bonus = int(clamp(unc / 160.0, 0, 4)) + miss_bonus = int(clamp(miss_streak, 0, 3)) + klt_bonus = 2 if (confirmed and not klt_valid) else 0 + det_every = clamp(base - (fast_bonus + unc_bonus + miss_bonus + klt_bonus), 1, 10) + + yolo_fresh = (ts - last_yolo_ok_ts) <= YOLO_FRESH_SEC + if YOLO_FORCE_DET_WHEN_WEAK and confirmed and (not klt_valid) and (not yolo_fresh): + det_every = 1 + + run_det = (frame_id - last_det_frame) >= det_every + need_fullscan = (not confirmed) and (frame_id - last_fullscan_frame) >= RECOVER_FULLSCAN_EVERY + + # 4) Submit YOLO (effective) + if run_det: + roi_box = None + if (not need_fullscan) and kf.initialized: + cx = float(kf.x[0, 0]); cy = float(kf.x[1, 0]) + vx = float(kf.x[4, 0]); vy = float(kf.x[5, 0]) + vnorm = (vx * vx + vy * vy) ** 0.5 + 1e-6 + ux, uy = vx / vnorm, vy / vnorm + + radius = BASE_RADIUS + SPEED_RADIUS_FACTOR * vnorm + radius = clamp(radius, BASE_RADIUS, MAX_RADIUS) + + fx = ux * radius * (FORWARD_BIAS - 1.0) + fy = uy * radius * (FORWARD_BIAS - 1.0) + + scx = cx + fx * 0.35 + scy = cy + fy * 0.35 + rx = radius * FORWARD_BIAS + ry = radius * SIDE_BIAS + + roi_box = clip_box([scx - rx, scy - ry, scx + rx, scy + ry], ew, eh) + + mode = "ROI" if roi_box is not None else "FULL" + if need_fullscan: + roi_box = None + mode = "FULL" + last_fullscan_frame = frame_id + + yolo_worker.submit(frame_eff, roi_box, mode, ts) + last_det_frame = frame_id + + # 5) Consume YOLO result + # 5) Consume YOLO result + yolo = yolo_worker.try_get() + dets_eff = [] + infer_ms = 0.0 + used_roi = False + if yolo is not None: + dets_eff, ts_det, mode, infer_ms, used_roi = yolo + yolo_hist.append(infer_ms) + + if len(dets_eff) == 0: + yolo_no_det += 1 + else: + yolo_no_det = 0 + last_yolo_ok_ts = ts + + # ---- DEBUG: print one detection sample sometimes ---- + if DEBUG and dets_eff and (frame_id % 30 == 0): + print("sample det:", dets_eff[0]) + + # 6) ByteTrack update + choose single target + dets_np = np.array(dets_eff, dtype=np.float32) if dets_eff else np.zeros((0, 5), dtype=np.float32) + tracks = bt.update(dets_np, dt=dt) + + chosen = None + target_track = None + + # keep current target_id if possible + if target_id is not None: + for t in tracks: + if t.track_id == target_id: + target_track = t + break + + # reacquire if needed + if target_track is None and len(tracks) > 0: + if kf.initialized: + pred = clip_box(kf.to_box(), ew, eh) + pc = box_center(pred) + best = None + best_score = -1e9 + for t in tracks: + b = clip_box(t.tlbr, ew, eh) + c = box_center(b) + dist = float(np.linalg.norm(c - pc)) + i = iou(b, pred) + s = (2.0 * i) + (1.0 / (1.0 + dist)) + 0.2 * float(t.score) + if s > best_score: + best_score = s + best = t + target_track = best + else: + target_track = max(tracks, key=lambda tt: box_area(tt.tlbr)) + + # apply chosen + if target_track is not None: + target_id = target_track.track_id + chosen = clip_box(target_track.tlbr, ew, eh) + + # 7) Update Kalman / FSM using chosen (same logic) + if chosen is not None: + if not kf.initialized: + kf.init_from_box(chosen) + else: + cx, cy = box_center(chosen) + bw, bh = box_wh(chosen) + kf.update([cx, cy, bw, bh]) + + locked_box_eff = chosen + hit_streak += 1 + miss_streak = 0 + + if not confirmed and hit_streak >= CONFIRM_HITS: + confirmed = True + klt.init(frame_eff, locked_box_eff) + ref_hist = compute_hsv_hist(frame_eff, locked_box_eff) if USE_HSV_GATE else None + + else: + if not (confirmed and klt_valid): + miss_streak += 1 + hit_streak = 0 + + if confirmed and (yolo_no_det >= YOLO_NO_DET_LIMIT) and (not klt_valid): + confirmed = False + locked_box_eff = None + ref_hist = None + target_id = None + klt.reset() + miss_streak = 0 + hit_streak = 0 + yolo_no_det = 0 + + elif miss_streak >= MAX_MISSES: + confirmed = False + locked_box_eff = None + ref_hist = None + target_id = None + klt.reset() + + # 8) Draw on ORIGINAL frame (map boxes back) + locked_box_orig = None + if locked_box_eff is not None: + locked_box_orig = unscale_box(locked_box_eff, sx, sy) + locked_box_orig = clip_box(locked_box_orig, frame_orig.shape[1], frame_orig.shape[0]) + + # 8.1 Draw YOLO dets (green) + if DRAW_ALL_BOXES and dets_eff: + for d in dets_eff: + b_eff = d[:4] + b_orig = unscale_box(b_eff, sx, sy) + b_orig = clip_box(b_orig, frame_orig.shape[1], frame_orig.shape[0]) + x1, y1, x2, y2 = map(int, b_orig) + cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 255, 0), 1) + cv2.putText(frame_orig, f"{float(d[4]):.2f}", (x1, max(0, y1 - 6)), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) + + # 8.2 Draw ByteTrack tracks (cyan) + if DRAW_BT_TRACKS and tracks: + for t in tracks: + if BT_DRAW_ONLY_CONFIRMED and (t.hits < BT_MIN_HITS): + continue + b_eff = clip_box(t.tlbr, ew, eh) + b_orig = unscale_box(b_eff, sx, sy) + b_orig = clip_box(b_orig, frame_orig.shape[1], frame_orig.shape[0]) + x1, y1, x2, y2 = map(int, b_orig) + cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (255, 255, 0), 2) + cv2.putText(frame_orig, f"T{t.track_id}:{t.score:.2f}", + (x1, max(0, y1 - 8)), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) + + # 8.3 Draw locked target (red) + if DRAW_LOCK_BOX and locked_box_orig is not None: + x1, y1, x2, y2 = map(int, locked_box_orig) + cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 0, 255), 2) + cv2.putText(frame_orig, f"ID={target_id}", (x1, max(0, y1 - 10)), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + + # 8.4 Draw Kalman point (blue) + if DRAW_KALMAN and kf.initialized: + pb_eff = clip_box(kf.to_box(), ew, eh) + pb_orig = unscale_box(pb_eff, sx, sy) + cx, cy = box_center(pb_orig) + cv2.circle(frame_orig, (int(cx), int(cy)), 5, (255, 0, 0), -1) + + + if DRAW_ALL_BOXES and dets_eff: + for d in dets_eff: + b = d[:4] + bo = unscale_box(b, sx, sy) + x1, y1, x2, y2 = map(int, bo) + cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 255, 0), 1) + + status = "CONFIRMED" if confirmed else "RECOVER" + cv2.putText(frame_orig, status, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 2) + + if DEBUG: + cv2.putText(frame_orig, f"eff={ew}x{eh} miss={miss_streak} hit={hit_streak} detEvery={det_every} yNoDet={yolo_no_det}", + (20, 75), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + cv2.putText(frame_orig, f"YOLO dets={len(dets_eff)} infer={infer_ms:.1f}ms mode={'ROI' if used_roi else 'FULL'}", + (20, 105), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + cv2.putText(frame_orig, f"KLT valid={int(klt_valid)} q={klt.quality:.2f} pts={klt.good_count} speed={speed:.1f} dt={dt*1000:.1f}ms", + (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # show / pace + if SHOW_OUTPUT: + cv2.imshow(WINDOW_NAME, frame_orig) + + iter_ms = (time.perf_counter() - iter_start) * 1000.0 + perf_hist.append(iter_ms) + + target_dt = 1.0 / float(max(1, TARGET_OUT_FPS)) + elapsed = time.perf_counter() - iter_start + wait_ms = 1 + if VIDEO_REALTIME and elapsed < target_dt: + wait_ms = int((target_dt - elapsed) * 1000.0) + wait_ms = clamp(wait_ms, 1, 50) + + if cv2.waitKey(int(wait_ms)) == 27: + break + + if frame_id % 60 == 0 and perf_hist: + p50 = np.percentile(perf_hist, 50) + p95 = np.percentile(perf_hist, 95) + fps = 1000.0 / max(np.mean(perf_hist), 1e-6) + if yolo_hist: + yp50 = np.percentile(yolo_hist, 50) + yp95 = np.percentile(yolo_hist, 95) + else: + yp50, yp95 = 0.0, 0.0 + print(f"[perf] fps~{fps:.1f} iter p50={p50:.1f} p95={p95:.1f} | yolo p50={yp50:.1f} p95={yp95:.1f}") + + frame_id += 1 + + yolo_worker.stop() + cap.release() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main()