# bytetrack_min.py import numpy as np def tlbr_to_xyah(tlbr): x1, y1, x2, y2 = tlbr w = max(1.0, x2 - x1) h = max(1.0, y2 - y1) cx = x1 + w * 0.5 cy = y1 + h * 0.5 a = w / h return np.array([cx, cy, a, h], dtype=np.float32) def xyah_to_tlbr(xyah): cx, cy, a, h = [float(v) for v in xyah] h = max(2.0, h) w = max(2.0, a * h) x1 = cx - w * 0.5 y1 = cy - h * 0.5 x2 = cx + w * 0.5 y2 = cy + h * 0.5 return np.array([x1, y1, x2, y2], dtype=np.float32) def iou_tlbr(a, b): ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw = max(0.0, ix2 - ix1) ih = max(0.0, iy2 - iy1) inter = iw * ih if inter <= 0: return 0.0 area_a = max(1.0, (ax2 - ax1)) * max(1.0, (ay2 - ay1)) area_b = max(1.0, (bx2 - bx1)) * max(1.0, (by2 - by1)) union = area_a + area_b - inter return float(inter / union) if union > 0 else 0.0 def iou_matrix(tracks_tlbr, dets_tlbr): if len(tracks_tlbr) == 0 or len(dets_tlbr) == 0: return np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32) M = np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32) for i, t in enumerate(tracks_tlbr): for j, d in enumerate(dets_tlbr): M[i, j] = iou_tlbr(t, d) return M # Hungarian assignment (min-cost). We pass cost = 1 - IoU. def hungarian(cost): cost = cost.copy() n, m = cost.shape N = max(n, m) pad = np.zeros((N, N), dtype=np.float32) pad[:n, :m] = cost big = float(cost.max() + 1.0) if cost.size else 1.0 if n < N: pad[n:, :] = big if m < N: pad[:, m:] = big cost = pad N = cost.shape[0] u = np.zeros(N, dtype=np.float32) v = np.zeros(N, dtype=np.float32) p = np.zeros(N, dtype=np.int32) way = np.zeros(N, dtype=np.int32) for i in range(1, N): p[0] = i j0 = 0 minv = np.full(N, np.inf, dtype=np.float32) used = np.zeros(N, dtype=bool) way.fill(0) while True: used[j0] = True i0 = p[j0] delta = np.inf j1 = 0 for j in range(1, N): if not used[j]: cur = cost[i0, j] - u[i0] - v[j] if cur < minv[j]: minv[j] = cur way[j] = j0 if minv[j] < delta: delta = minv[j] j1 = j for j in range(N): if used[j]: u[p[j]] += delta v[j] -= delta else: minv[j] -= delta j0 = j1 if p[j0] == 0: break while True: j1 = way[j0] p[j0] = p[j1] j0 = j1 if j0 == 0: break assignment = -np.ones(N, dtype=np.int32) for j in range(1, N): if p[j] != 0: assignment[p[j]] = j row_to_col = assignment[:n] matches = [] for r, c in enumerate(row_to_col): if 0 <= c < m: matches.append((r, int(c))) return matches class KalmanXYAH: # State: [cx,cy,a,h, vcx,vcy,va,vh] def __init__(self): self.ndim = 4 self.dim_x = 8 self._motion_mat = np.eye(self.dim_x, dtype=np.float32) for i in range(self.ndim): self._motion_mat[i, self.ndim + i] = 1.0 self._update_mat = np.eye(self.ndim, self.dim_x, dtype=np.float32) # Noise (tunable) self.std_pos = np.array([2.0, 2.0, 1e-2, 2.5], dtype=np.float32) self.std_vel = np.array([6.0, 6.0, 1e-2, 6.0], dtype=np.float32) def initiate(self, measurement_xyah): mean = np.zeros((self.dim_x,), dtype=np.float32) mean[:4] = measurement_xyah cov = np.eye(self.dim_x, dtype=np.float32) cov[:4, :4] *= 50.0 cov[4:, 4:] *= 200.0 return mean, cov def predict(self, mean, cov, dt=1.0): motion_mat = self._motion_mat.copy() for i in range(self.ndim): motion_mat[i, self.ndim + i] = float(dt) std_pos = self.std_pos std_vel = self.std_vel Q = np.diag(np.concatenate([std_pos**2, std_vel**2]).astype(np.float32)) mean = motion_mat @ mean cov = motion_mat @ cov @ motion_mat.T + Q return mean, cov def update(self, mean, cov, measurement_xyah): R = np.diag((self.std_pos ** 2).astype(np.float32)) S = self._update_mat @ cov @ self._update_mat.T + R K = cov @ self._update_mat.T @ np.linalg.inv(S) y = measurement_xyah - (self._update_mat @ mean) mean = mean + K @ y cov = (np.eye(self.dim_x, dtype=np.float32) - K @ self._update_mat) @ cov return mean, cov class TrackState: Tracked = 1 Lost = 2 Removed = 3 class STrack: _next_id = 1 def __init__(self, tlbr, score, kf: KalmanXYAH): self.kf = kf self.mean = None self.cov = None self.tlbr = np.array(tlbr, dtype=np.float32) self.score = float(score) self.track_id = STrack._next_id STrack._next_id += 1 self.state = TrackState.Tracked self.is_activated = False self.frame_id = 0 self.start_frame = 0 self.time_since_update = 0 self.hits = 0 def activate(self, frame_id): self.frame_id = frame_id self.start_frame = frame_id self.time_since_update = 0 xyah = tlbr_to_xyah(self.tlbr) self.mean, self.cov = self.kf.initiate(xyah) self.is_activated = True self.hits = 1 self.state = TrackState.Tracked def predict(self, dt=1.0): if self.mean is None: return self.mean, self.cov = self.kf.predict(self.mean, self.cov, dt=dt) self.tlbr = xyah_to_tlbr(self.mean[:4]) self.time_since_update += 1 def update(self, tlbr, score, frame_id): self.frame_id = frame_id self.time_since_update = 0 self.score = float(score) self.hits += 1 xyah = tlbr_to_xyah(tlbr) self.mean, self.cov = self.kf.update(self.mean, self.cov, xyah) self.tlbr = xyah_to_tlbr(self.mean[:4]) self.state = TrackState.Tracked self.is_activated = True def mark_lost(self): self.state = TrackState.Lost def mark_removed(self): self.state = TrackState.Removed class BYTETracker: def __init__( self, track_high_thresh=0.35, track_low_thresh=0.12, new_track_thresh=0.35, match_thresh=0.70, track_buffer=50, min_hits=2, ): self.high = float(track_high_thresh) self.low = float(track_low_thresh) self.new = float(new_track_thresh) self.match_thresh = float(match_thresh) self.track_buffer = int(track_buffer) self.min_hits = int(min_hits) self.kf = KalmanXYAH() self.frame_id = 0 self.tracked = [] self.lost = [] self.removed = [] def update(self, dets_tlbr_score, dt=1.0): self.frame_id += 1 frame_id = self.frame_id if dets_tlbr_score is None: dets_tlbr_score = np.zeros((0, 5), dtype=np.float32) dets = dets_tlbr_score.astype(np.float32, copy=False) if len(dets) == 0: scores = np.zeros((0,), dtype=np.float32) else: scores = dets[:, 4] high_mask = scores >= self.high low_mask = (scores >= self.low) & (scores < self.high) dets_high = dets[high_mask] dets_low = dets[low_mask] for t in self.tracked: t.predict(dt=dt) matches_a, u_trk_a, u_det_a = self._associate(self.tracked, dets_high, iou_thresh=self.match_thresh) for ti, di in matches_a: trk = self.tracked[ti] d = dets_high[di] trk.update(d[:4], d[4], frame_id) remaining_tracks = [self.tracked[i] for i in u_trk_a] matches_b, u_trk_b, _u_det_b = self._associate( remaining_tracks, dets_low, iou_thresh=max(0.10, self.match_thresh - 0.15) ) for ti, di in matches_b: trk = remaining_tracks[ti] d = dets_low[di] trk.update(d[:4], d[4], frame_id) unmatched_after_b = [remaining_tracks[i] for i in u_trk_b] for trk in unmatched_after_b: trk.mark_lost() new_tracked = [t for t in self.tracked if t.state == TrackState.Tracked] newly_lost = [t for t in self.tracked if t.state == TrackState.Lost] self.tracked = new_tracked self.lost.extend(newly_lost) for di in u_det_a: d = dets_high[di] if float(d[4]) >= self.new: nt = STrack(d[:4], d[4], self.kf) nt.activate(frame_id) self.tracked.append(nt) kept_lost = [] for t in self.lost: if (frame_id - t.frame_id) <= self.track_buffer: kept_lost.append(t) else: t.mark_removed() self.removed.append(t) self.lost = kept_lost out = [] for t in self.tracked: if t.hits >= self.min_hits: out.append(t) return out def _associate(self, tracks, dets, iou_thresh): if len(tracks) == 0 or len(dets) == 0: return [], list(range(len(tracks))), list(range(len(dets))) trk_boxes = [t.tlbr for t in tracks] det_boxes = [d[:4] for d in dets] ious = iou_matrix(trk_boxes, det_boxes) cost = 1.0 - ious matches = hungarian(cost) matched = [] u_trk = set(range(len(tracks))) u_det = set(range(len(dets))) thr = float(iou_thresh) for ti, di in matches: if ious[ti, di] >= thr: matched.append((ti, di)) u_trk.discard(ti) u_det.discard(di) return matched, sorted(list(u_trk)), sorted(list(u_det))