You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

850 lines
26 KiB
Python

import cv2
import numpy as np
import time
import threading
from collections import deque
import torch
from ultralytics import YOLO
from bytetrack_min import BYTETracker # <-- add this file nearby
# USER SETTINGS
MODEL_PATH = r"C:\Users\Legion\PycharmProjects\YOLOTrain\.venv\runs\detect\train30\weights\best.pt"
DEVICE = 0
USE_HALF = True
USE_VIDEO_FILE = True
VIDEO_PATH = r"C:\Users\Legion\PycharmProjects\YOLOTrain\videoMAI\Высший пилотаж.mp4"
VIDEO_REALTIME = True
CAM_INDEX = 0
CAP_BACKEND = cv2.CAP_DSHOW # or cv2.CAP_MSMF
SHOW_OUTPUT = True
WINDOW_NAME = "FPV LOCK PAL + ByteTrack"
TARGET_OUT_FPS = 20 # best-effort pacing
# Effective resolution policy (for analog PAL)
EFFECTIVE_W = 720
EFFECTIVE_H = 576
FORCE_EFFECTIVE_PAL = True
# YOLO inference settings
CONF = 0.22 # confidence used INSIDE YOLO inference call
IMG_SIZE_ROI = 384
IMG_SIZE_FULL = 640
MAX_DET = 60
TARGET_CLASS_ID = 0 # set int if you want only one class
# Filters (small drones far away)
MIN_BOX_AREA = 30
MAX_BOX_AREA = 0
ASPECT_RANGE = (0.15, 6.0)
# FSM
CONFIRM_HITS = 2
MAX_MISSES = 10
RECOVER_FULLSCAN_EVERY = 18
RECOVER_FORCED_DET_EVERY = 2
# ROI sizing (in effective pixels)
BASE_RADIUS = 150
MAX_RADIUS = 520
SPEED_RADIUS_FACTOR = 18.0
FORWARD_BIAS = 1.8
SIDE_BIAS = 1.2
# KLT optical flow tracker (effective)
KLT_MAX_CORNERS = 140
KLT_QUALITY = 0.01
KLT_MIN_DIST = 6
KLT_WIN = (21, 21)
KLT_MAX_LEVEL = 3
KLT_OUTLIER_THRESH = 10.0
# KLT validity gates (anti-drift)
KLT_OK_Q = 0.35
KLT_OK_PTS = 30
KLT_REINIT_MIN_POINTS = 30
# YOLO safety
YOLO_NO_DET_LIMIT = 3
YOLO_FRESH_SEC = 0.50
YOLO_FORCE_DET_WHEN_WEAK = True
# Appearance gate (HSV hist) in effective image
USE_HSV_GATE = False
HSV_HIST_BINS = (16, 16)
HSV_GATE_MIN_SIM = 0.25
HSV_UPDATE_EVERY = 25
# Async YOLO worker
YOLO_QUEUE_MAX = 1
# Debug / draw
DEBUG = True
DRAW_ALL_BOXES = True
DRAW_LOCK_BOX = True
DRAW_KALMAN = True
DRAW_KLT_POINTS = False
DRAW_BT_TRACKS = True
BT_DRAW_ONLY_CONFIRMED = False
# ByteTrack SETTINGS (important!)
BT_HIGH = 0.35
BT_LOW = 0.12
BT_NEW = 0.35
BT_MATCH_IOU = 0.70
BT_BUFFER = 50
BT_MIN_HITS = 1
# HELPERS
def clamp(x, a, b):
return max(a, min(b, x))
def box_center(box):
x1, y1, x2, y2 = box
return np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32)
def box_wh(box):
x1, y1, x2, y2 = box
return np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32)
def box_area(box):
w, h = box_wh(box)
return float(w * h)
def iou(a, b):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
inter = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1)
if inter <= 0:
return 0.0
union = box_area(a) + box_area(b) - inter
return 0.0 if union <= 0 else float(inter / union)
def clip_box(box, w, h):
x1, y1, x2, y2 = box
x1 = clamp(float(x1), 0.0, float(w - 2))
y1 = clamp(float(y1), 0.0, float(h - 2))
x2 = clamp(float(x2), 1.0, float(w - 1))
y2 = clamp(float(y2), 1.0, float(h - 1))
if x2 <= x1 + 1:
x2 = x1 + 2
if y2 <= y1 + 1:
y2 = y1 + 2
return np.array([x1, y1, x2, y2], dtype=np.float32)
def crop_roi(frame, roi_box):
x1, y1, x2, y2 = map(int, roi_box)
return frame[y1:y2, x1:x2], x1, y1
def compute_hsv_hist(frame, box):
x1, y1, x2, y2 = map(int, box)
patch = frame[y1:y2, x1:x2]
if patch.size == 0:
return None
hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
hist = cv2.calcHist([hsv], [0, 1], None, list(HSV_HIST_BINS), [0, 180, 0, 256])
cv2.normalize(hist, hist, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX)
return hist
def hsv_sim(h1, h2):
if h1 is None or h2 is None:
return 0.0
d = cv2.compareHist(h1, h2, cv2.HISTCMP_BHATTACHARYYA)
return float(1.0 - d)
def filter_yolo_boxes_with_scores(result, offset_x=0, offset_y=0, min_conf=0.12):
dets_out = []
if result.boxes is None or len(result.boxes) == 0:
return dets_out
xyxy = result.boxes.xyxy.detach().cpu().numpy()
confs = result.boxes.conf.detach().cpu().numpy()
clss = result.boxes.cls.detach().cpu().numpy().astype(int)
for b, c, cls_id in zip(xyxy, confs, clss):
score = float(c)
if score < float(min_conf):
continue
if TARGET_CLASS_ID is not None and cls_id != TARGET_CLASS_ID:
continue
x1, y1, x2, y2 = map(float, b)
ww = max(0.0, x2 - x1)
hh = max(0.0, y2 - y1)
if ww <= 1.0 or hh <= 1.0:
continue
a = ww * hh
if a < MIN_BOX_AREA:
continue
if MAX_BOX_AREA > 0 and a > MAX_BOX_AREA:
continue
ar = ww / hh
if not (ASPECT_RANGE[0] <= ar <= ASPECT_RANGE[1]):
continue
dets_out.append(np.array([x1 + offset_x, y1 + offset_y, x2 + offset_x, y2 + offset_y, score], dtype=np.float32))
return dets_out
# --- scale mapping between original and effective ---
def get_effective_frame(orig_bgr):
oh, ow = orig_bgr.shape[:2]
if not FORCE_EFFECTIVE_PAL:
return orig_bgr, 1.0, 1.0
eff = cv2.resize(orig_bgr, (EFFECTIVE_W, EFFECTIVE_H), interpolation=cv2.INTER_AREA)
sx = EFFECTIVE_W / float(ow)
sy = EFFECTIVE_H / float(oh)
return eff, sx, sy
def unscale_box(box_eff, sx, sy):
x1, y1, x2, y2 = box_eff
return np.array([x1 / sx, y1 / sy, x2 / sx, y2 / sy], dtype=np.float32)
# =========================
# KALMAN 8D (cx,cy,w,h + velocities) with dt
# =========================
class Kalman8D:
def __init__(self):
self.x = np.zeros((8, 1), dtype=np.float32)
self.P = np.eye(8, dtype=np.float32) * 50.0
self.H = np.zeros((4, 8), dtype=np.float32)
self.H[0, 0] = 1.0
self.H[1, 1] = 1.0
self.H[2, 2] = 1.0
self.H[3, 3] = 1.0
self.R = np.eye(4, dtype=np.float32)
self.R[0, 0] = 16.0
self.R[1, 1] = 16.0
self.R[2, 2] = 25.0
self.R[3, 3] = 25.0
self.initialized = False
self.q_pos = 2.0
self.q_size = 3.0
self.q_vel = 4.0
def F(self, dt):
F = np.eye(8, dtype=np.float32)
F[0, 4] = dt
F[1, 5] = dt
F[2, 6] = dt
F[3, 7] = dt
return F
def Q(self, dt):
q = np.zeros((8, 8), dtype=np.float32)
s1 = dt * dt
s2 = dt
q[0, 0] = self.q_pos * s1
q[1, 1] = self.q_pos * s1
q[2, 2] = self.q_size * s1
q[3, 3] = self.q_size * s1
q[4, 4] = self.q_vel * s2
q[5, 5] = self.q_vel * s2
q[6, 6] = self.q_vel * s2
q[7, 7] = self.q_vel * s2
return q
def predict(self, dt):
dt = float(max(1e-3, dt))
F = self.F(dt)
self.x = F @ self.x
self.P = F @ self.P @ F.T + self.Q(dt)
return self.x[:4].flatten()
def update(self, z):
z = np.array(z, dtype=np.float32).reshape(4, 1)
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
I = np.eye(8, dtype=np.float32)
self.P = (I - K @ self.H) @ self.P
def init_from_box(self, box):
cx, cy = box_center(box)
w, h = box_wh(box)
self.x[:] = 0
self.x[0, 0] = cx
self.x[1, 0] = cy
self.x[2, 0] = w
self.x[3, 0] = h
self.P = np.eye(8, dtype=np.float32) * 50.0
self.initialized = True
def to_box(self):
cx = float(self.x[0, 0])
cy = float(self.x[1, 0])
w = float(max(2.0, self.x[2, 0]))
h = float(max(2.0, self.x[3, 0]))
return np.array([cx - w * 0.5, cy - h * 0.5, cx + w * 0.5, cy + h * 0.5], dtype=np.float32)
def uncertainty(self):
return float(self.P[0, 0] + self.P[1, 1] + self.P[2, 2] + self.P[3, 3])
# =========================
# KLT TRACKER (effective)
# =========================
class KLTTracker:
def __init__(self):
self.prev_gray = None
self.pts = None
self.box = None
self.good_count = 0
self.quality = 0.0
def reset(self):
self.prev_gray = None
self.pts = None
self.box = None
self.good_count = 0
self.quality = 0.0
def init(self, frame_bgr, box):
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
x1, y1, x2, y2 = map(int, box)
mask = np.zeros_like(gray)
cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
pts = cv2.goodFeaturesToTrack(
gray,
maxCorners=KLT_MAX_CORNERS,
qualityLevel=KLT_QUALITY,
minDistance=KLT_MIN_DIST,
mask=mask
)
self.prev_gray = gray
self.pts = pts
self.box = box.copy()
self.good_count = 0
self.quality = 0.0
def update(self, frame_bgr):
if self.prev_gray is None or self.pts is None or len(self.pts) == 0 or self.box is None:
self.quality = 0.0
self.good_count = 0
return None
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
next_pts, st, err = cv2.calcOpticalFlowPyrLK(
self.prev_gray, gray, self.pts, None,
winSize=KLT_WIN,
maxLevel=KLT_MAX_LEVEL,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
if next_pts is None or st is None:
self.quality = 0.0
self.good_count = 0
self.prev_gray = gray
return None
st = st.reshape(-1)
good_old = self.pts[st == 1].reshape(-1, 2)
good_new = next_pts[st == 1].reshape(-1, 2)
self.good_count = int(len(good_new))
if self.good_count < 6:
self.quality = 0.0
self.prev_gray = gray
self.pts = good_new.reshape(-1, 1, 2) if self.good_count > 0 else None
return None
disp = good_new - good_old
med = np.median(disp, axis=0)
d = np.linalg.norm(disp - med[None, :], axis=1)
inliers = d < KLT_OUTLIER_THRESH
in_cnt = int(np.count_nonzero(inliers))
if in_cnt < 6:
self.quality = 0.0
self.prev_gray = gray
self.pts = good_new.reshape(-1, 1, 2)
return None
disp_in = disp[inliers]
med = np.median(disp_in, axis=0)
dx, dy = float(med[0]), float(med[1])
x1, y1, x2, y2 = self.box
new_box = np.array([x1 + dx, y1 + dy, x2 + dx, y2 + dy], dtype=np.float32)
frac = float(in_cnt) / float(len(inliers))
self.quality = clamp(frac * (self.good_count / max(1, KLT_MAX_CORNERS)), 0.0, 1.0)
self.prev_gray = gray
self.pts = good_new[inliers].reshape(-1, 1, 2)
self.box = new_box
return new_box
# =========================
# ASYNC YOLO WORKER (effective)
# =========================
class YOLOWorker:
def __init__(self, model):
self.model = model
self.req = deque(maxlen=YOLO_QUEUE_MAX)
self.res = deque(maxlen=1)
self.lock = threading.Lock()
self.running = False
self.thread = threading.Thread(target=self._loop, daemon=True)
def start(self):
self.running = True
self.thread.start()
def stop(self):
self.running = False
self.thread.join(timeout=1.0)
def submit(self, frame_eff_bgr, roi_box_eff, mode, ts):
with self.lock:
self.req.append((frame_eff_bgr, roi_box_eff, mode, ts))
def try_get(self):
with self.lock:
if not self.res:
return None
return self.res.pop()
def _loop(self):
while self.running:
item = None
with self.lock:
if self.req:
item = self.req.pop()
self.req.clear()
if item is None:
time.sleep(0.001)
continue
frame, roi_box, mode, ts = item
h, w = frame.shape[:2]
dets = []
infer_ms = 0.0
used_roi = False
try:
if roi_box is not None:
roi_box = clip_box(roi_box, w, h)
crop, ox, oy = crop_roi(frame, roi_box)
if crop.size > 0:
used_roi = True
t0 = time.perf_counter()
with torch.inference_mode():
r = self.model(crop, conf=CONF, imgsz=IMG_SIZE_ROI, verbose=False, max_det=MAX_DET)[0]
infer_ms = (time.perf_counter() - t0) * 1000.0
dets = filter_yolo_boxes_with_scores(r, offset_x=ox, offset_y=oy, min_conf=BT_LOW)
else:
sh, sw = frame.shape[:2]
short = min(sh, sw)
scale = 1.0
target = IMG_SIZE_FULL
if short > target:
scale = target / float(short)
small = cv2.resize(frame, (int(sw * scale), int(sh * scale)), interpolation=cv2.INTER_AREA)
else:
small = frame
t0 = time.perf_counter()
with torch.inference_mode():
r = self.model(small, conf=CONF, imgsz=IMG_SIZE_FULL, verbose=False, max_det=MAX_DET)[0]
infer_ms = (time.perf_counter() - t0) * 1000.0
dets_s = filter_yolo_boxes_with_scores(r, 0, 0, min_conf=BT_LOW)
if scale != 1.0:
inv = 1.0 / scale
dets = [np.array([d[0]*inv, d[1]*inv, d[2]*inv, d[3]*inv, d[4]], dtype=np.float32) for d in dets_s]
else:
dets = dets_s
except Exception:
dets = []
infer_ms = 0.0
with self.lock:
self.res.append((dets, ts, mode, infer_ms, used_roi))
# =========================
# MAIN
# =========================
def main():
print("Loading YOLO...")
model = YOLO(MODEL_PATH)
if torch.cuda.is_available():
model.to(f"cuda:{DEVICE}")
if USE_HALF:
try:
model.model.half()
except Exception:
pass
if torch.cuda.is_available():
dummy = np.zeros((IMG_SIZE_ROI, IMG_SIZE_ROI, 3), dtype=np.uint8)
with torch.inference_mode():
for _ in range(3):
_ = model(dummy, conf=CONF, imgsz=IMG_SIZE_ROI, verbose=False, max_det=MAX_DET)
cap = cv2.VideoCapture(VIDEO_PATH) if USE_VIDEO_FILE else cv2.VideoCapture(CAM_INDEX, CAP_BACKEND)
if not cap.isOpened():
print("Capture open failed")
return
if SHOW_OUTPUT:
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
yolo_worker = YOLOWorker(model)
yolo_worker.start()
kf = Kalman8D()
klt = KLTTracker()
bt = BYTETracker(
track_high_thresh=BT_HIGH,
track_low_thresh=BT_LOW,
new_track_thresh=BT_NEW,
match_thresh=BT_MATCH_IOU,
track_buffer=BT_BUFFER,
min_hits=BT_MIN_HITS,
)
target_id = None # single-target lock over ByteTrack tracks
locked_box_eff = None
confirmed = False
hit_streak = 0
miss_streak = 0
ref_hist = None
prev_ts = None
frame_id = 0
last_det_frame = -999
last_fullscan_frame = -999
yolo_no_det = 0
last_yolo_ok_ts = -1e9
perf_hist = deque(maxlen=120)
yolo_hist = deque(maxlen=120)
print("Start")
while True:
ret, frame_orig = cap.read()
ts = time.perf_counter()
if not ret:
break
frame_eff, sx, sy = get_effective_frame(frame_orig)
eh, ew = frame_eff.shape[:2]
# dt
if prev_ts is None:
dt = 1.0 / 25.0
else:
dt = float(max(1e-3, ts - prev_ts))
prev_ts = ts
iter_start = time.perf_counter()
# 1) Kalman predict
pred_box_eff = None
if kf.initialized:
kf.predict(dt)
pred_box_eff = clip_box(kf.to_box(), ew, eh)
# speed/unc (effective px)
speed = 0.0
unc = 0.0
if kf.initialized:
vx = float(kf.x[4, 0]); vy = float(kf.x[5, 0])
speed = (vx * vx + vy * vy) ** 0.5
unc = kf.uncertainty()
# 2) KLT update (effective) with validity gates
klt_valid = False
if confirmed and locked_box_eff is not None:
if klt.prev_gray is None or klt.pts is None or len(klt.pts) < KLT_REINIT_MIN_POINTS:
klt.init(frame_eff, locked_box_eff)
klt_box = klt.update(frame_eff)
if klt_box is not None:
klt_box = clip_box(klt_box, ew, eh)
klt_valid = (klt.quality >= KLT_OK_Q) and (klt.good_count >= KLT_OK_PTS)
if klt_valid:
cx, cy = box_center(klt_box)
bw, bh = box_wh(klt_box)
kf.update([cx, cy, bw, bh])
locked_box_eff = klt_box
# 3) Adaptive detect schedule
base = 6 if confirmed else RECOVER_FORCED_DET_EVERY
fast_bonus = int(clamp(speed / 28.0, 0, 4))
unc_bonus = int(clamp(unc / 160.0, 0, 4))
miss_bonus = int(clamp(miss_streak, 0, 3))
klt_bonus = 2 if (confirmed and not klt_valid) else 0
det_every = clamp(base - (fast_bonus + unc_bonus + miss_bonus + klt_bonus), 1, 10)
yolo_fresh = (ts - last_yolo_ok_ts) <= YOLO_FRESH_SEC
if YOLO_FORCE_DET_WHEN_WEAK and confirmed and (not klt_valid) and (not yolo_fresh):
det_every = 1
run_det = (frame_id - last_det_frame) >= det_every
need_fullscan = (not confirmed) and (frame_id - last_fullscan_frame) >= RECOVER_FULLSCAN_EVERY
# 4) Submit YOLO (effective)
if run_det:
roi_box = None
if (not need_fullscan) and kf.initialized:
cx = float(kf.x[0, 0]); cy = float(kf.x[1, 0])
vx = float(kf.x[4, 0]); vy = float(kf.x[5, 0])
vnorm = (vx * vx + vy * vy) ** 0.5 + 1e-6
ux, uy = vx / vnorm, vy / vnorm
radius = BASE_RADIUS + SPEED_RADIUS_FACTOR * vnorm
radius = clamp(radius, BASE_RADIUS, MAX_RADIUS)
fx = ux * radius * (FORWARD_BIAS - 1.0)
fy = uy * radius * (FORWARD_BIAS - 1.0)
scx = cx + fx * 0.35
scy = cy + fy * 0.35
rx = radius * FORWARD_BIAS
ry = radius * SIDE_BIAS
roi_box = clip_box([scx - rx, scy - ry, scx + rx, scy + ry], ew, eh)
mode = "ROI" if roi_box is not None else "FULL"
if need_fullscan:
roi_box = None
mode = "FULL"
last_fullscan_frame = frame_id
yolo_worker.submit(frame_eff, roi_box, mode, ts)
last_det_frame = frame_id
# 5) Consume YOLO result
# 5) Consume YOLO result
yolo = yolo_worker.try_get()
dets_eff = []
infer_ms = 0.0
used_roi = False
if yolo is not None:
dets_eff, ts_det, mode, infer_ms, used_roi = yolo
yolo_hist.append(infer_ms)
if len(dets_eff) == 0:
yolo_no_det += 1
else:
yolo_no_det = 0
last_yolo_ok_ts = ts
# ---- DEBUG: print one detection sample sometimes ----
if DEBUG and dets_eff and (frame_id % 30 == 0):
print("sample det:", dets_eff[0])
# 6) ByteTrack update + choose single target
dets_np = np.array(dets_eff, dtype=np.float32) if dets_eff else np.zeros((0, 5), dtype=np.float32)
tracks = bt.update(dets_np, dt=dt)
chosen = None
target_track = None
# keep current target_id if possible
if target_id is not None:
for t in tracks:
if t.track_id == target_id:
target_track = t
break
# reacquire if needed
if target_track is None and len(tracks) > 0:
if kf.initialized:
pred = clip_box(kf.to_box(), ew, eh)
pc = box_center(pred)
best = None
best_score = -1e9
for t in tracks:
b = clip_box(t.tlbr, ew, eh)
c = box_center(b)
dist = float(np.linalg.norm(c - pc))
i = iou(b, pred)
s = (2.0 * i) + (1.0 / (1.0 + dist)) + 0.2 * float(t.score)
if s > best_score:
best_score = s
best = t
target_track = best
else:
target_track = max(tracks, key=lambda tt: box_area(tt.tlbr))
# apply chosen
if target_track is not None:
target_id = target_track.track_id
chosen = clip_box(target_track.tlbr, ew, eh)
# 7) Update Kalman / FSM using chosen (same logic)
if chosen is not None:
if not kf.initialized:
kf.init_from_box(chosen)
else:
cx, cy = box_center(chosen)
bw, bh = box_wh(chosen)
kf.update([cx, cy, bw, bh])
locked_box_eff = chosen
hit_streak += 1
miss_streak = 0
if not confirmed and hit_streak >= CONFIRM_HITS:
confirmed = True
klt.init(frame_eff, locked_box_eff)
ref_hist = compute_hsv_hist(frame_eff, locked_box_eff) if USE_HSV_GATE else None
else:
if not (confirmed and klt_valid):
miss_streak += 1
hit_streak = 0
if confirmed and (yolo_no_det >= YOLO_NO_DET_LIMIT) and (not klt_valid):
confirmed = False
locked_box_eff = None
ref_hist = None
target_id = None
klt.reset()
miss_streak = 0
hit_streak = 0
yolo_no_det = 0
elif miss_streak >= MAX_MISSES:
confirmed = False
locked_box_eff = None
ref_hist = None
target_id = None
klt.reset()
# 8) Draw on ORIGINAL frame (map boxes back)
locked_box_orig = None
if locked_box_eff is not None:
locked_box_orig = unscale_box(locked_box_eff, sx, sy)
locked_box_orig = clip_box(locked_box_orig, frame_orig.shape[1], frame_orig.shape[0])
# 8.1 Draw YOLO dets (green)
if DRAW_ALL_BOXES and dets_eff:
for d in dets_eff:
b_eff = d[:4]
b_orig = unscale_box(b_eff, sx, sy)
b_orig = clip_box(b_orig, frame_orig.shape[1], frame_orig.shape[0])
x1, y1, x2, y2 = map(int, b_orig)
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 255, 0), 1)
cv2.putText(frame_orig, f"{float(d[4]):.2f}", (x1, max(0, y1 - 6)),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# 8.2 Draw ByteTrack tracks (cyan)
if DRAW_BT_TRACKS and tracks:
for t in tracks:
if BT_DRAW_ONLY_CONFIRMED and (t.hits < BT_MIN_HITS):
continue
b_eff = clip_box(t.tlbr, ew, eh)
b_orig = unscale_box(b_eff, sx, sy)
b_orig = clip_box(b_orig, frame_orig.shape[1], frame_orig.shape[0])
x1, y1, x2, y2 = map(int, b_orig)
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (255, 255, 0), 2)
cv2.putText(frame_orig, f"T{t.track_id}:{t.score:.2f}",
(x1, max(0, y1 - 8)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# 8.3 Draw locked target (red)
if DRAW_LOCK_BOX and locked_box_orig is not None:
x1, y1, x2, y2 = map(int, locked_box_orig)
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame_orig, f"ID={target_id}", (x1, max(0, y1 - 10)),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# 8.4 Draw Kalman point (blue)
if DRAW_KALMAN and kf.initialized:
pb_eff = clip_box(kf.to_box(), ew, eh)
pb_orig = unscale_box(pb_eff, sx, sy)
cx, cy = box_center(pb_orig)
cv2.circle(frame_orig, (int(cx), int(cy)), 5, (255, 0, 0), -1)
if DRAW_ALL_BOXES and dets_eff:
for d in dets_eff:
b = d[:4]
bo = unscale_box(b, sx, sy)
x1, y1, x2, y2 = map(int, bo)
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 255, 0), 1)
status = "CONFIRMED" if confirmed else "RECOVER"
cv2.putText(frame_orig, status, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 2)
if DEBUG:
cv2.putText(frame_orig, f"eff={ew}x{eh} miss={miss_streak} hit={hit_streak} detEvery={det_every} yNoDet={yolo_no_det}",
(20, 75), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.putText(frame_orig, f"YOLO dets={len(dets_eff)} infer={infer_ms:.1f}ms mode={'ROI' if used_roi else 'FULL'}",
(20, 105), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.putText(frame_orig, f"KLT valid={int(klt_valid)} q={klt.quality:.2f} pts={klt.good_count} speed={speed:.1f} dt={dt*1000:.1f}ms",
(20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# show / pace
if SHOW_OUTPUT:
cv2.imshow(WINDOW_NAME, frame_orig)
iter_ms = (time.perf_counter() - iter_start) * 1000.0
perf_hist.append(iter_ms)
target_dt = 1.0 / float(max(1, TARGET_OUT_FPS))
elapsed = time.perf_counter() - iter_start
wait_ms = 1
if VIDEO_REALTIME and elapsed < target_dt:
wait_ms = int((target_dt - elapsed) * 1000.0)
wait_ms = clamp(wait_ms, 1, 50)
if cv2.waitKey(int(wait_ms)) == 27:
break
if frame_id % 60 == 0 and perf_hist:
p50 = np.percentile(perf_hist, 50)
p95 = np.percentile(perf_hist, 95)
fps = 1000.0 / max(np.mean(perf_hist), 1e-6)
if yolo_hist:
yp50 = np.percentile(yolo_hist, 50)
yp95 = np.percentile(yolo_hist, 95)
else:
yp50, yp95 = 0.0, 0.0
print(f"[perf] fps~{fps:.1f} iter p50={p50:.1f} p95={p95:.1f} | yolo p50={yp50:.1f} p95={yp95:.1f}")
frame_id += 1
yolo_worker.stop()
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()