You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
850 lines
26 KiB
Python
850 lines
26 KiB
Python
import cv2
|
|
import numpy as np
|
|
import time
|
|
import threading
|
|
from collections import deque
|
|
import torch
|
|
from ultralytics import YOLO
|
|
|
|
from bytetrack_min import BYTETracker # <-- add this file nearby
|
|
|
|
|
|
# USER SETTINGS
|
|
|
|
MODEL_PATH = r"C:\Users\Legion\PycharmProjects\YOLOTrain\.venv\runs\detect\train30\weights\best.pt"
|
|
DEVICE = 0
|
|
USE_HALF = True
|
|
|
|
USE_VIDEO_FILE = True
|
|
VIDEO_PATH = r"C:\Users\Legion\PycharmProjects\YOLOTrain\videoMAI\Высший пилотаж.mp4"
|
|
VIDEO_REALTIME = True
|
|
|
|
CAM_INDEX = 0
|
|
CAP_BACKEND = cv2.CAP_DSHOW # or cv2.CAP_MSMF
|
|
|
|
SHOW_OUTPUT = True
|
|
WINDOW_NAME = "FPV LOCK PAL + ByteTrack"
|
|
TARGET_OUT_FPS = 20 # best-effort pacing
|
|
|
|
# Effective resolution policy (for analog PAL)
|
|
EFFECTIVE_W = 720
|
|
EFFECTIVE_H = 576
|
|
FORCE_EFFECTIVE_PAL = True
|
|
|
|
# YOLO inference settings
|
|
CONF = 0.22 # confidence used INSIDE YOLO inference call
|
|
IMG_SIZE_ROI = 384
|
|
IMG_SIZE_FULL = 640
|
|
MAX_DET = 60
|
|
TARGET_CLASS_ID = 0 # set int if you want only one class
|
|
|
|
# Filters (small drones far away)
|
|
MIN_BOX_AREA = 30
|
|
MAX_BOX_AREA = 0
|
|
ASPECT_RANGE = (0.15, 6.0)
|
|
|
|
# FSM
|
|
CONFIRM_HITS = 2
|
|
MAX_MISSES = 10
|
|
RECOVER_FULLSCAN_EVERY = 18
|
|
RECOVER_FORCED_DET_EVERY = 2
|
|
|
|
# ROI sizing (in effective pixels)
|
|
BASE_RADIUS = 150
|
|
MAX_RADIUS = 520
|
|
SPEED_RADIUS_FACTOR = 18.0
|
|
FORWARD_BIAS = 1.8
|
|
SIDE_BIAS = 1.2
|
|
|
|
# KLT optical flow tracker (effective)
|
|
KLT_MAX_CORNERS = 140
|
|
KLT_QUALITY = 0.01
|
|
KLT_MIN_DIST = 6
|
|
KLT_WIN = (21, 21)
|
|
KLT_MAX_LEVEL = 3
|
|
KLT_OUTLIER_THRESH = 10.0
|
|
|
|
# KLT validity gates (anti-drift)
|
|
KLT_OK_Q = 0.35
|
|
KLT_OK_PTS = 30
|
|
KLT_REINIT_MIN_POINTS = 30
|
|
|
|
# YOLO safety
|
|
YOLO_NO_DET_LIMIT = 3
|
|
YOLO_FRESH_SEC = 0.50
|
|
YOLO_FORCE_DET_WHEN_WEAK = True
|
|
|
|
# Appearance gate (HSV hist) in effective image
|
|
USE_HSV_GATE = False
|
|
HSV_HIST_BINS = (16, 16)
|
|
HSV_GATE_MIN_SIM = 0.25
|
|
HSV_UPDATE_EVERY = 25
|
|
|
|
# Async YOLO worker
|
|
YOLO_QUEUE_MAX = 1
|
|
|
|
# Debug / draw
|
|
DEBUG = True
|
|
DRAW_ALL_BOXES = True
|
|
DRAW_LOCK_BOX = True
|
|
DRAW_KALMAN = True
|
|
DRAW_KLT_POINTS = False
|
|
DRAW_BT_TRACKS = True
|
|
BT_DRAW_ONLY_CONFIRMED = False
|
|
|
|
|
|
# ByteTrack SETTINGS (important!)
|
|
|
|
|
|
BT_HIGH = 0.35
|
|
BT_LOW = 0.12
|
|
BT_NEW = 0.35
|
|
BT_MATCH_IOU = 0.70
|
|
BT_BUFFER = 50
|
|
BT_MIN_HITS = 1
|
|
|
|
|
|
# HELPERS
|
|
|
|
|
|
def clamp(x, a, b):
|
|
return max(a, min(b, x))
|
|
|
|
def box_center(box):
|
|
x1, y1, x2, y2 = box
|
|
return np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32)
|
|
|
|
def box_wh(box):
|
|
x1, y1, x2, y2 = box
|
|
return np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32)
|
|
|
|
def box_area(box):
|
|
w, h = box_wh(box)
|
|
return float(w * h)
|
|
|
|
def iou(a, b):
|
|
ax1, ay1, ax2, ay2 = a
|
|
bx1, by1, bx2, by2 = b
|
|
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
|
|
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
|
|
inter = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1)
|
|
if inter <= 0:
|
|
return 0.0
|
|
union = box_area(a) + box_area(b) - inter
|
|
return 0.0 if union <= 0 else float(inter / union)
|
|
|
|
def clip_box(box, w, h):
|
|
x1, y1, x2, y2 = box
|
|
x1 = clamp(float(x1), 0.0, float(w - 2))
|
|
y1 = clamp(float(y1), 0.0, float(h - 2))
|
|
x2 = clamp(float(x2), 1.0, float(w - 1))
|
|
y2 = clamp(float(y2), 1.0, float(h - 1))
|
|
if x2 <= x1 + 1:
|
|
x2 = x1 + 2
|
|
if y2 <= y1 + 1:
|
|
y2 = y1 + 2
|
|
return np.array([x1, y1, x2, y2], dtype=np.float32)
|
|
|
|
def crop_roi(frame, roi_box):
|
|
x1, y1, x2, y2 = map(int, roi_box)
|
|
return frame[y1:y2, x1:x2], x1, y1
|
|
|
|
def compute_hsv_hist(frame, box):
|
|
x1, y1, x2, y2 = map(int, box)
|
|
patch = frame[y1:y2, x1:x2]
|
|
if patch.size == 0:
|
|
return None
|
|
hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
|
|
hist = cv2.calcHist([hsv], [0, 1], None, list(HSV_HIST_BINS), [0, 180, 0, 256])
|
|
cv2.normalize(hist, hist, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX)
|
|
return hist
|
|
|
|
def hsv_sim(h1, h2):
|
|
if h1 is None or h2 is None:
|
|
return 0.0
|
|
d = cv2.compareHist(h1, h2, cv2.HISTCMP_BHATTACHARYYA)
|
|
return float(1.0 - d)
|
|
|
|
def filter_yolo_boxes_with_scores(result, offset_x=0, offset_y=0, min_conf=0.12):
|
|
dets_out = []
|
|
if result.boxes is None or len(result.boxes) == 0:
|
|
return dets_out
|
|
|
|
xyxy = result.boxes.xyxy.detach().cpu().numpy()
|
|
confs = result.boxes.conf.detach().cpu().numpy()
|
|
clss = result.boxes.cls.detach().cpu().numpy().astype(int)
|
|
|
|
for b, c, cls_id in zip(xyxy, confs, clss):
|
|
score = float(c)
|
|
if score < float(min_conf):
|
|
continue
|
|
if TARGET_CLASS_ID is not None and cls_id != TARGET_CLASS_ID:
|
|
continue
|
|
|
|
x1, y1, x2, y2 = map(float, b)
|
|
ww = max(0.0, x2 - x1)
|
|
hh = max(0.0, y2 - y1)
|
|
if ww <= 1.0 or hh <= 1.0:
|
|
continue
|
|
|
|
a = ww * hh
|
|
if a < MIN_BOX_AREA:
|
|
continue
|
|
if MAX_BOX_AREA > 0 and a > MAX_BOX_AREA:
|
|
continue
|
|
|
|
ar = ww / hh
|
|
if not (ASPECT_RANGE[0] <= ar <= ASPECT_RANGE[1]):
|
|
continue
|
|
|
|
dets_out.append(np.array([x1 + offset_x, y1 + offset_y, x2 + offset_x, y2 + offset_y, score], dtype=np.float32))
|
|
|
|
return dets_out
|
|
|
|
|
|
# --- scale mapping between original and effective ---
|
|
|
|
def get_effective_frame(orig_bgr):
|
|
oh, ow = orig_bgr.shape[:2]
|
|
if not FORCE_EFFECTIVE_PAL:
|
|
return orig_bgr, 1.0, 1.0
|
|
eff = cv2.resize(orig_bgr, (EFFECTIVE_W, EFFECTIVE_H), interpolation=cv2.INTER_AREA)
|
|
sx = EFFECTIVE_W / float(ow)
|
|
sy = EFFECTIVE_H / float(oh)
|
|
return eff, sx, sy
|
|
|
|
def unscale_box(box_eff, sx, sy):
|
|
x1, y1, x2, y2 = box_eff
|
|
return np.array([x1 / sx, y1 / sy, x2 / sx, y2 / sy], dtype=np.float32)
|
|
|
|
# =========================
|
|
# KALMAN 8D (cx,cy,w,h + velocities) with dt
|
|
# =========================
|
|
|
|
class Kalman8D:
|
|
def __init__(self):
|
|
self.x = np.zeros((8, 1), dtype=np.float32)
|
|
self.P = np.eye(8, dtype=np.float32) * 50.0
|
|
|
|
self.H = np.zeros((4, 8), dtype=np.float32)
|
|
self.H[0, 0] = 1.0
|
|
self.H[1, 1] = 1.0
|
|
self.H[2, 2] = 1.0
|
|
self.H[3, 3] = 1.0
|
|
|
|
self.R = np.eye(4, dtype=np.float32)
|
|
self.R[0, 0] = 16.0
|
|
self.R[1, 1] = 16.0
|
|
self.R[2, 2] = 25.0
|
|
self.R[3, 3] = 25.0
|
|
|
|
self.initialized = False
|
|
self.q_pos = 2.0
|
|
self.q_size = 3.0
|
|
self.q_vel = 4.0
|
|
|
|
def F(self, dt):
|
|
F = np.eye(8, dtype=np.float32)
|
|
F[0, 4] = dt
|
|
F[1, 5] = dt
|
|
F[2, 6] = dt
|
|
F[3, 7] = dt
|
|
return F
|
|
|
|
def Q(self, dt):
|
|
q = np.zeros((8, 8), dtype=np.float32)
|
|
s1 = dt * dt
|
|
s2 = dt
|
|
q[0, 0] = self.q_pos * s1
|
|
q[1, 1] = self.q_pos * s1
|
|
q[2, 2] = self.q_size * s1
|
|
q[3, 3] = self.q_size * s1
|
|
q[4, 4] = self.q_vel * s2
|
|
q[5, 5] = self.q_vel * s2
|
|
q[6, 6] = self.q_vel * s2
|
|
q[7, 7] = self.q_vel * s2
|
|
return q
|
|
|
|
def predict(self, dt):
|
|
dt = float(max(1e-3, dt))
|
|
F = self.F(dt)
|
|
self.x = F @ self.x
|
|
self.P = F @ self.P @ F.T + self.Q(dt)
|
|
return self.x[:4].flatten()
|
|
|
|
def update(self, z):
|
|
z = np.array(z, dtype=np.float32).reshape(4, 1)
|
|
y = z - self.H @ self.x
|
|
S = self.H @ self.P @ self.H.T + self.R
|
|
K = self.P @ self.H.T @ np.linalg.inv(S)
|
|
self.x = self.x + K @ y
|
|
I = np.eye(8, dtype=np.float32)
|
|
self.P = (I - K @ self.H) @ self.P
|
|
|
|
def init_from_box(self, box):
|
|
cx, cy = box_center(box)
|
|
w, h = box_wh(box)
|
|
self.x[:] = 0
|
|
self.x[0, 0] = cx
|
|
self.x[1, 0] = cy
|
|
self.x[2, 0] = w
|
|
self.x[3, 0] = h
|
|
self.P = np.eye(8, dtype=np.float32) * 50.0
|
|
self.initialized = True
|
|
|
|
def to_box(self):
|
|
cx = float(self.x[0, 0])
|
|
cy = float(self.x[1, 0])
|
|
w = float(max(2.0, self.x[2, 0]))
|
|
h = float(max(2.0, self.x[3, 0]))
|
|
return np.array([cx - w * 0.5, cy - h * 0.5, cx + w * 0.5, cy + h * 0.5], dtype=np.float32)
|
|
|
|
def uncertainty(self):
|
|
return float(self.P[0, 0] + self.P[1, 1] + self.P[2, 2] + self.P[3, 3])
|
|
|
|
# =========================
|
|
# KLT TRACKER (effective)
|
|
# =========================
|
|
|
|
class KLTTracker:
|
|
def __init__(self):
|
|
self.prev_gray = None
|
|
self.pts = None
|
|
self.box = None
|
|
self.good_count = 0
|
|
self.quality = 0.0
|
|
|
|
def reset(self):
|
|
self.prev_gray = None
|
|
self.pts = None
|
|
self.box = None
|
|
self.good_count = 0
|
|
self.quality = 0.0
|
|
|
|
def init(self, frame_bgr, box):
|
|
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
|
|
x1, y1, x2, y2 = map(int, box)
|
|
mask = np.zeros_like(gray)
|
|
cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
|
|
|
|
pts = cv2.goodFeaturesToTrack(
|
|
gray,
|
|
maxCorners=KLT_MAX_CORNERS,
|
|
qualityLevel=KLT_QUALITY,
|
|
minDistance=KLT_MIN_DIST,
|
|
mask=mask
|
|
)
|
|
self.prev_gray = gray
|
|
self.pts = pts
|
|
self.box = box.copy()
|
|
self.good_count = 0
|
|
self.quality = 0.0
|
|
|
|
def update(self, frame_bgr):
|
|
if self.prev_gray is None or self.pts is None or len(self.pts) == 0 or self.box is None:
|
|
self.quality = 0.0
|
|
self.good_count = 0
|
|
return None
|
|
|
|
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
|
|
next_pts, st, err = cv2.calcOpticalFlowPyrLK(
|
|
self.prev_gray, gray, self.pts, None,
|
|
winSize=KLT_WIN,
|
|
maxLevel=KLT_MAX_LEVEL,
|
|
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
|
|
)
|
|
|
|
if next_pts is None or st is None:
|
|
self.quality = 0.0
|
|
self.good_count = 0
|
|
self.prev_gray = gray
|
|
return None
|
|
|
|
st = st.reshape(-1)
|
|
good_old = self.pts[st == 1].reshape(-1, 2)
|
|
good_new = next_pts[st == 1].reshape(-1, 2)
|
|
|
|
self.good_count = int(len(good_new))
|
|
if self.good_count < 6:
|
|
self.quality = 0.0
|
|
self.prev_gray = gray
|
|
self.pts = good_new.reshape(-1, 1, 2) if self.good_count > 0 else None
|
|
return None
|
|
|
|
disp = good_new - good_old
|
|
med = np.median(disp, axis=0)
|
|
d = np.linalg.norm(disp - med[None, :], axis=1)
|
|
inliers = d < KLT_OUTLIER_THRESH
|
|
in_cnt = int(np.count_nonzero(inliers))
|
|
|
|
if in_cnt < 6:
|
|
self.quality = 0.0
|
|
self.prev_gray = gray
|
|
self.pts = good_new.reshape(-1, 1, 2)
|
|
return None
|
|
|
|
disp_in = disp[inliers]
|
|
med = np.median(disp_in, axis=0)
|
|
dx, dy = float(med[0]), float(med[1])
|
|
|
|
x1, y1, x2, y2 = self.box
|
|
new_box = np.array([x1 + dx, y1 + dy, x2 + dx, y2 + dy], dtype=np.float32)
|
|
|
|
frac = float(in_cnt) / float(len(inliers))
|
|
self.quality = clamp(frac * (self.good_count / max(1, KLT_MAX_CORNERS)), 0.0, 1.0)
|
|
|
|
self.prev_gray = gray
|
|
self.pts = good_new[inliers].reshape(-1, 1, 2)
|
|
self.box = new_box
|
|
return new_box
|
|
|
|
# =========================
|
|
# ASYNC YOLO WORKER (effective)
|
|
# =========================
|
|
|
|
class YOLOWorker:
|
|
def __init__(self, model):
|
|
self.model = model
|
|
self.req = deque(maxlen=YOLO_QUEUE_MAX)
|
|
self.res = deque(maxlen=1)
|
|
self.lock = threading.Lock()
|
|
self.running = False
|
|
self.thread = threading.Thread(target=self._loop, daemon=True)
|
|
|
|
def start(self):
|
|
self.running = True
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
self.thread.join(timeout=1.0)
|
|
|
|
def submit(self, frame_eff_bgr, roi_box_eff, mode, ts):
|
|
with self.lock:
|
|
self.req.append((frame_eff_bgr, roi_box_eff, mode, ts))
|
|
|
|
def try_get(self):
|
|
with self.lock:
|
|
if not self.res:
|
|
return None
|
|
return self.res.pop()
|
|
|
|
def _loop(self):
|
|
while self.running:
|
|
item = None
|
|
with self.lock:
|
|
if self.req:
|
|
item = self.req.pop()
|
|
self.req.clear()
|
|
|
|
if item is None:
|
|
time.sleep(0.001)
|
|
continue
|
|
|
|
frame, roi_box, mode, ts = item
|
|
h, w = frame.shape[:2]
|
|
dets = []
|
|
infer_ms = 0.0
|
|
used_roi = False
|
|
|
|
try:
|
|
if roi_box is not None:
|
|
roi_box = clip_box(roi_box, w, h)
|
|
crop, ox, oy = crop_roi(frame, roi_box)
|
|
if crop.size > 0:
|
|
used_roi = True
|
|
t0 = time.perf_counter()
|
|
with torch.inference_mode():
|
|
r = self.model(crop, conf=CONF, imgsz=IMG_SIZE_ROI, verbose=False, max_det=MAX_DET)[0]
|
|
infer_ms = (time.perf_counter() - t0) * 1000.0
|
|
dets = filter_yolo_boxes_with_scores(r, offset_x=ox, offset_y=oy, min_conf=BT_LOW)
|
|
|
|
else:
|
|
sh, sw = frame.shape[:2]
|
|
short = min(sh, sw)
|
|
scale = 1.0
|
|
target = IMG_SIZE_FULL
|
|
if short > target:
|
|
scale = target / float(short)
|
|
small = cv2.resize(frame, (int(sw * scale), int(sh * scale)), interpolation=cv2.INTER_AREA)
|
|
else:
|
|
small = frame
|
|
|
|
t0 = time.perf_counter()
|
|
with torch.inference_mode():
|
|
r = self.model(small, conf=CONF, imgsz=IMG_SIZE_FULL, verbose=False, max_det=MAX_DET)[0]
|
|
infer_ms = (time.perf_counter() - t0) * 1000.0
|
|
|
|
dets_s = filter_yolo_boxes_with_scores(r, 0, 0, min_conf=BT_LOW)
|
|
if scale != 1.0:
|
|
inv = 1.0 / scale
|
|
dets = [np.array([d[0]*inv, d[1]*inv, d[2]*inv, d[3]*inv, d[4]], dtype=np.float32) for d in dets_s]
|
|
else:
|
|
dets = dets_s
|
|
|
|
except Exception:
|
|
dets = []
|
|
infer_ms = 0.0
|
|
|
|
with self.lock:
|
|
self.res.append((dets, ts, mode, infer_ms, used_roi))
|
|
|
|
# =========================
|
|
# MAIN
|
|
# =========================
|
|
|
|
def main():
|
|
print("Loading YOLO...")
|
|
model = YOLO(MODEL_PATH)
|
|
|
|
if torch.cuda.is_available():
|
|
model.to(f"cuda:{DEVICE}")
|
|
if USE_HALF:
|
|
try:
|
|
model.model.half()
|
|
except Exception:
|
|
pass
|
|
|
|
if torch.cuda.is_available():
|
|
dummy = np.zeros((IMG_SIZE_ROI, IMG_SIZE_ROI, 3), dtype=np.uint8)
|
|
with torch.inference_mode():
|
|
for _ in range(3):
|
|
_ = model(dummy, conf=CONF, imgsz=IMG_SIZE_ROI, verbose=False, max_det=MAX_DET)
|
|
|
|
cap = cv2.VideoCapture(VIDEO_PATH) if USE_VIDEO_FILE else cv2.VideoCapture(CAM_INDEX, CAP_BACKEND)
|
|
if not cap.isOpened():
|
|
print("Capture open failed")
|
|
return
|
|
|
|
if SHOW_OUTPUT:
|
|
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
|
|
|
|
yolo_worker = YOLOWorker(model)
|
|
yolo_worker.start()
|
|
|
|
kf = Kalman8D()
|
|
klt = KLTTracker()
|
|
|
|
bt = BYTETracker(
|
|
track_high_thresh=BT_HIGH,
|
|
track_low_thresh=BT_LOW,
|
|
new_track_thresh=BT_NEW,
|
|
match_thresh=BT_MATCH_IOU,
|
|
track_buffer=BT_BUFFER,
|
|
min_hits=BT_MIN_HITS,
|
|
)
|
|
target_id = None # single-target lock over ByteTrack tracks
|
|
|
|
locked_box_eff = None
|
|
confirmed = False
|
|
hit_streak = 0
|
|
miss_streak = 0
|
|
ref_hist = None
|
|
|
|
prev_ts = None
|
|
frame_id = 0
|
|
last_det_frame = -999
|
|
last_fullscan_frame = -999
|
|
|
|
yolo_no_det = 0
|
|
last_yolo_ok_ts = -1e9
|
|
|
|
perf_hist = deque(maxlen=120)
|
|
yolo_hist = deque(maxlen=120)
|
|
|
|
print("Start")
|
|
|
|
while True:
|
|
ret, frame_orig = cap.read()
|
|
ts = time.perf_counter()
|
|
if not ret:
|
|
break
|
|
|
|
frame_eff, sx, sy = get_effective_frame(frame_orig)
|
|
eh, ew = frame_eff.shape[:2]
|
|
|
|
# dt
|
|
if prev_ts is None:
|
|
dt = 1.0 / 25.0
|
|
else:
|
|
dt = float(max(1e-3, ts - prev_ts))
|
|
prev_ts = ts
|
|
|
|
iter_start = time.perf_counter()
|
|
|
|
# 1) Kalman predict
|
|
pred_box_eff = None
|
|
if kf.initialized:
|
|
kf.predict(dt)
|
|
pred_box_eff = clip_box(kf.to_box(), ew, eh)
|
|
|
|
# speed/unc (effective px)
|
|
speed = 0.0
|
|
unc = 0.0
|
|
if kf.initialized:
|
|
vx = float(kf.x[4, 0]); vy = float(kf.x[5, 0])
|
|
speed = (vx * vx + vy * vy) ** 0.5
|
|
unc = kf.uncertainty()
|
|
|
|
# 2) KLT update (effective) with validity gates
|
|
klt_valid = False
|
|
if confirmed and locked_box_eff is not None:
|
|
if klt.prev_gray is None or klt.pts is None or len(klt.pts) < KLT_REINIT_MIN_POINTS:
|
|
klt.init(frame_eff, locked_box_eff)
|
|
|
|
klt_box = klt.update(frame_eff)
|
|
if klt_box is not None:
|
|
klt_box = clip_box(klt_box, ew, eh)
|
|
klt_valid = (klt.quality >= KLT_OK_Q) and (klt.good_count >= KLT_OK_PTS)
|
|
if klt_valid:
|
|
cx, cy = box_center(klt_box)
|
|
bw, bh = box_wh(klt_box)
|
|
kf.update([cx, cy, bw, bh])
|
|
locked_box_eff = klt_box
|
|
|
|
# 3) Adaptive detect schedule
|
|
base = 6 if confirmed else RECOVER_FORCED_DET_EVERY
|
|
fast_bonus = int(clamp(speed / 28.0, 0, 4))
|
|
unc_bonus = int(clamp(unc / 160.0, 0, 4))
|
|
miss_bonus = int(clamp(miss_streak, 0, 3))
|
|
klt_bonus = 2 if (confirmed and not klt_valid) else 0
|
|
det_every = clamp(base - (fast_bonus + unc_bonus + miss_bonus + klt_bonus), 1, 10)
|
|
|
|
yolo_fresh = (ts - last_yolo_ok_ts) <= YOLO_FRESH_SEC
|
|
if YOLO_FORCE_DET_WHEN_WEAK and confirmed and (not klt_valid) and (not yolo_fresh):
|
|
det_every = 1
|
|
|
|
run_det = (frame_id - last_det_frame) >= det_every
|
|
need_fullscan = (not confirmed) and (frame_id - last_fullscan_frame) >= RECOVER_FULLSCAN_EVERY
|
|
|
|
# 4) Submit YOLO (effective)
|
|
if run_det:
|
|
roi_box = None
|
|
if (not need_fullscan) and kf.initialized:
|
|
cx = float(kf.x[0, 0]); cy = float(kf.x[1, 0])
|
|
vx = float(kf.x[4, 0]); vy = float(kf.x[5, 0])
|
|
vnorm = (vx * vx + vy * vy) ** 0.5 + 1e-6
|
|
ux, uy = vx / vnorm, vy / vnorm
|
|
|
|
radius = BASE_RADIUS + SPEED_RADIUS_FACTOR * vnorm
|
|
radius = clamp(radius, BASE_RADIUS, MAX_RADIUS)
|
|
|
|
fx = ux * radius * (FORWARD_BIAS - 1.0)
|
|
fy = uy * radius * (FORWARD_BIAS - 1.0)
|
|
|
|
scx = cx + fx * 0.35
|
|
scy = cy + fy * 0.35
|
|
rx = radius * FORWARD_BIAS
|
|
ry = radius * SIDE_BIAS
|
|
|
|
roi_box = clip_box([scx - rx, scy - ry, scx + rx, scy + ry], ew, eh)
|
|
|
|
mode = "ROI" if roi_box is not None else "FULL"
|
|
if need_fullscan:
|
|
roi_box = None
|
|
mode = "FULL"
|
|
last_fullscan_frame = frame_id
|
|
|
|
yolo_worker.submit(frame_eff, roi_box, mode, ts)
|
|
last_det_frame = frame_id
|
|
|
|
# 5) Consume YOLO result
|
|
# 5) Consume YOLO result
|
|
yolo = yolo_worker.try_get()
|
|
dets_eff = []
|
|
infer_ms = 0.0
|
|
used_roi = False
|
|
if yolo is not None:
|
|
dets_eff, ts_det, mode, infer_ms, used_roi = yolo
|
|
yolo_hist.append(infer_ms)
|
|
|
|
if len(dets_eff) == 0:
|
|
yolo_no_det += 1
|
|
else:
|
|
yolo_no_det = 0
|
|
last_yolo_ok_ts = ts
|
|
|
|
# ---- DEBUG: print one detection sample sometimes ----
|
|
if DEBUG and dets_eff and (frame_id % 30 == 0):
|
|
print("sample det:", dets_eff[0])
|
|
|
|
# 6) ByteTrack update + choose single target
|
|
dets_np = np.array(dets_eff, dtype=np.float32) if dets_eff else np.zeros((0, 5), dtype=np.float32)
|
|
tracks = bt.update(dets_np, dt=dt)
|
|
|
|
chosen = None
|
|
target_track = None
|
|
|
|
# keep current target_id if possible
|
|
if target_id is not None:
|
|
for t in tracks:
|
|
if t.track_id == target_id:
|
|
target_track = t
|
|
break
|
|
|
|
# reacquire if needed
|
|
if target_track is None and len(tracks) > 0:
|
|
if kf.initialized:
|
|
pred = clip_box(kf.to_box(), ew, eh)
|
|
pc = box_center(pred)
|
|
best = None
|
|
best_score = -1e9
|
|
for t in tracks:
|
|
b = clip_box(t.tlbr, ew, eh)
|
|
c = box_center(b)
|
|
dist = float(np.linalg.norm(c - pc))
|
|
i = iou(b, pred)
|
|
s = (2.0 * i) + (1.0 / (1.0 + dist)) + 0.2 * float(t.score)
|
|
if s > best_score:
|
|
best_score = s
|
|
best = t
|
|
target_track = best
|
|
else:
|
|
target_track = max(tracks, key=lambda tt: box_area(tt.tlbr))
|
|
|
|
# apply chosen
|
|
if target_track is not None:
|
|
target_id = target_track.track_id
|
|
chosen = clip_box(target_track.tlbr, ew, eh)
|
|
|
|
# 7) Update Kalman / FSM using chosen (same logic)
|
|
if chosen is not None:
|
|
if not kf.initialized:
|
|
kf.init_from_box(chosen)
|
|
else:
|
|
cx, cy = box_center(chosen)
|
|
bw, bh = box_wh(chosen)
|
|
kf.update([cx, cy, bw, bh])
|
|
|
|
locked_box_eff = chosen
|
|
hit_streak += 1
|
|
miss_streak = 0
|
|
|
|
if not confirmed and hit_streak >= CONFIRM_HITS:
|
|
confirmed = True
|
|
klt.init(frame_eff, locked_box_eff)
|
|
ref_hist = compute_hsv_hist(frame_eff, locked_box_eff) if USE_HSV_GATE else None
|
|
|
|
else:
|
|
if not (confirmed and klt_valid):
|
|
miss_streak += 1
|
|
hit_streak = 0
|
|
|
|
if confirmed and (yolo_no_det >= YOLO_NO_DET_LIMIT) and (not klt_valid):
|
|
confirmed = False
|
|
locked_box_eff = None
|
|
ref_hist = None
|
|
target_id = None
|
|
klt.reset()
|
|
miss_streak = 0
|
|
hit_streak = 0
|
|
yolo_no_det = 0
|
|
|
|
elif miss_streak >= MAX_MISSES:
|
|
confirmed = False
|
|
locked_box_eff = None
|
|
ref_hist = None
|
|
target_id = None
|
|
klt.reset()
|
|
|
|
# 8) Draw on ORIGINAL frame (map boxes back)
|
|
locked_box_orig = None
|
|
if locked_box_eff is not None:
|
|
locked_box_orig = unscale_box(locked_box_eff, sx, sy)
|
|
locked_box_orig = clip_box(locked_box_orig, frame_orig.shape[1], frame_orig.shape[0])
|
|
|
|
# 8.1 Draw YOLO dets (green)
|
|
if DRAW_ALL_BOXES and dets_eff:
|
|
for d in dets_eff:
|
|
b_eff = d[:4]
|
|
b_orig = unscale_box(b_eff, sx, sy)
|
|
b_orig = clip_box(b_orig, frame_orig.shape[1], frame_orig.shape[0])
|
|
x1, y1, x2, y2 = map(int, b_orig)
|
|
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 255, 0), 1)
|
|
cv2.putText(frame_orig, f"{float(d[4]):.2f}", (x1, max(0, y1 - 6)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
# 8.2 Draw ByteTrack tracks (cyan)
|
|
if DRAW_BT_TRACKS and tracks:
|
|
for t in tracks:
|
|
if BT_DRAW_ONLY_CONFIRMED and (t.hits < BT_MIN_HITS):
|
|
continue
|
|
b_eff = clip_box(t.tlbr, ew, eh)
|
|
b_orig = unscale_box(b_eff, sx, sy)
|
|
b_orig = clip_box(b_orig, frame_orig.shape[1], frame_orig.shape[0])
|
|
x1, y1, x2, y2 = map(int, b_orig)
|
|
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (255, 255, 0), 2)
|
|
cv2.putText(frame_orig, f"T{t.track_id}:{t.score:.2f}",
|
|
(x1, max(0, y1 - 8)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
|
|
|
|
# 8.3 Draw locked target (red)
|
|
if DRAW_LOCK_BOX and locked_box_orig is not None:
|
|
x1, y1, x2, y2 = map(int, locked_box_orig)
|
|
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 0, 255), 2)
|
|
cv2.putText(frame_orig, f"ID={target_id}", (x1, max(0, y1 - 10)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
|
|
|
|
# 8.4 Draw Kalman point (blue)
|
|
if DRAW_KALMAN and kf.initialized:
|
|
pb_eff = clip_box(kf.to_box(), ew, eh)
|
|
pb_orig = unscale_box(pb_eff, sx, sy)
|
|
cx, cy = box_center(pb_orig)
|
|
cv2.circle(frame_orig, (int(cx), int(cy)), 5, (255, 0, 0), -1)
|
|
|
|
|
|
if DRAW_ALL_BOXES and dets_eff:
|
|
for d in dets_eff:
|
|
b = d[:4]
|
|
bo = unscale_box(b, sx, sy)
|
|
x1, y1, x2, y2 = map(int, bo)
|
|
cv2.rectangle(frame_orig, (x1, y1), (x2, y2), (0, 255, 0), 1)
|
|
|
|
status = "CONFIRMED" if confirmed else "RECOVER"
|
|
cv2.putText(frame_orig, status, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 2)
|
|
|
|
if DEBUG:
|
|
cv2.putText(frame_orig, f"eff={ew}x{eh} miss={miss_streak} hit={hit_streak} detEvery={det_every} yNoDet={yolo_no_det}",
|
|
(20, 75), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|
cv2.putText(frame_orig, f"YOLO dets={len(dets_eff)} infer={infer_ms:.1f}ms mode={'ROI' if used_roi else 'FULL'}",
|
|
(20, 105), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|
cv2.putText(frame_orig, f"KLT valid={int(klt_valid)} q={klt.quality:.2f} pts={klt.good_count} speed={speed:.1f} dt={dt*1000:.1f}ms",
|
|
(20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|
|
|
# show / pace
|
|
if SHOW_OUTPUT:
|
|
cv2.imshow(WINDOW_NAME, frame_orig)
|
|
|
|
iter_ms = (time.perf_counter() - iter_start) * 1000.0
|
|
perf_hist.append(iter_ms)
|
|
|
|
target_dt = 1.0 / float(max(1, TARGET_OUT_FPS))
|
|
elapsed = time.perf_counter() - iter_start
|
|
wait_ms = 1
|
|
if VIDEO_REALTIME and elapsed < target_dt:
|
|
wait_ms = int((target_dt - elapsed) * 1000.0)
|
|
wait_ms = clamp(wait_ms, 1, 50)
|
|
|
|
if cv2.waitKey(int(wait_ms)) == 27:
|
|
break
|
|
|
|
if frame_id % 60 == 0 and perf_hist:
|
|
p50 = np.percentile(perf_hist, 50)
|
|
p95 = np.percentile(perf_hist, 95)
|
|
fps = 1000.0 / max(np.mean(perf_hist), 1e-6)
|
|
if yolo_hist:
|
|
yp50 = np.percentile(yolo_hist, 50)
|
|
yp95 = np.percentile(yolo_hist, 95)
|
|
else:
|
|
yp50, yp95 = 0.0, 0.0
|
|
print(f"[perf] fps~{fps:.1f} iter p50={p50:.1f} p95={p95:.1f} | yolo p50={yp50:.1f} p95={yp95:.1f}")
|
|
|
|
frame_id += 1
|
|
|
|
yolo_worker.stop()
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|