You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
9.9 KiB
Python

# bytetrack_min.py
import numpy as np
def tlbr_to_xyah(tlbr):
x1, y1, x2, y2 = tlbr
w = max(1.0, x2 - x1)
h = max(1.0, y2 - y1)
cx = x1 + w * 0.5
cy = y1 + h * 0.5
a = w / h
return np.array([cx, cy, a, h], dtype=np.float32)
def xyah_to_tlbr(xyah):
cx, cy, a, h = [float(v) for v in xyah]
h = max(2.0, h)
w = max(2.0, a * h)
x1 = cx - w * 0.5
y1 = cy - h * 0.5
x2 = cx + w * 0.5
y2 = cy + h * 0.5
return np.array([x1, y1, x2, y2], dtype=np.float32)
def iou_tlbr(a, b):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
if inter <= 0:
return 0.0
area_a = max(1.0, (ax2 - ax1)) * max(1.0, (ay2 - ay1))
area_b = max(1.0, (bx2 - bx1)) * max(1.0, (by2 - by1))
union = area_a + area_b - inter
return float(inter / union) if union > 0 else 0.0
def iou_matrix(tracks_tlbr, dets_tlbr):
if len(tracks_tlbr) == 0 or len(dets_tlbr) == 0:
return np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32)
M = np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32)
for i, t in enumerate(tracks_tlbr):
for j, d in enumerate(dets_tlbr):
M[i, j] = iou_tlbr(t, d)
return M
# Hungarian assignment (min-cost). We pass cost = 1 - IoU.
def hungarian(cost):
cost = cost.copy()
n, m = cost.shape
N = max(n, m)
pad = np.zeros((N, N), dtype=np.float32)
pad[:n, :m] = cost
big = float(cost.max() + 1.0) if cost.size else 1.0
if n < N:
pad[n:, :] = big
if m < N:
pad[:, m:] = big
cost = pad
N = cost.shape[0]
u = np.zeros(N, dtype=np.float32)
v = np.zeros(N, dtype=np.float32)
p = np.zeros(N, dtype=np.int32)
way = np.zeros(N, dtype=np.int32)
for i in range(1, N):
p[0] = i
j0 = 0
minv = np.full(N, np.inf, dtype=np.float32)
used = np.zeros(N, dtype=bool)
way.fill(0)
while True:
used[j0] = True
i0 = p[j0]
delta = np.inf
j1 = 0
for j in range(1, N):
if not used[j]:
cur = cost[i0, j] - u[i0] - v[j]
if cur < minv[j]:
minv[j] = cur
way[j] = j0
if minv[j] < delta:
delta = minv[j]
j1 = j
for j in range(N):
if used[j]:
u[p[j]] += delta
v[j] -= delta
else:
minv[j] -= delta
j0 = j1
if p[j0] == 0:
break
while True:
j1 = way[j0]
p[j0] = p[j1]
j0 = j1
if j0 == 0:
break
assignment = -np.ones(N, dtype=np.int32)
for j in range(1, N):
if p[j] != 0:
assignment[p[j]] = j
row_to_col = assignment[:n]
matches = []
for r, c in enumerate(row_to_col):
if 0 <= c < m:
matches.append((r, int(c)))
return matches
class KalmanXYAH:
# State: [cx,cy,a,h, vcx,vcy,va,vh]
def __init__(self):
self.ndim = 4
self.dim_x = 8
self._motion_mat = np.eye(self.dim_x, dtype=np.float32)
for i in range(self.ndim):
self._motion_mat[i, self.ndim + i] = 1.0
self._update_mat = np.eye(self.ndim, self.dim_x, dtype=np.float32)
# Noise (tunable)
self.std_pos = np.array([2.0, 2.0, 1e-2, 2.5], dtype=np.float32)
self.std_vel = np.array([6.0, 6.0, 1e-2, 6.0], dtype=np.float32)
def initiate(self, measurement_xyah):
mean = np.zeros((self.dim_x,), dtype=np.float32)
mean[:4] = measurement_xyah
cov = np.eye(self.dim_x, dtype=np.float32)
cov[:4, :4] *= 50.0
cov[4:, 4:] *= 200.0
return mean, cov
def predict(self, mean, cov, dt=1.0):
motion_mat = self._motion_mat.copy()
for i in range(self.ndim):
motion_mat[i, self.ndim + i] = float(dt)
std_pos = self.std_pos
std_vel = self.std_vel
Q = np.diag(np.concatenate([std_pos**2, std_vel**2]).astype(np.float32))
mean = motion_mat @ mean
cov = motion_mat @ cov @ motion_mat.T + Q
return mean, cov
def update(self, mean, cov, measurement_xyah):
R = np.diag((self.std_pos ** 2).astype(np.float32))
S = self._update_mat @ cov @ self._update_mat.T + R
K = cov @ self._update_mat.T @ np.linalg.inv(S)
y = measurement_xyah - (self._update_mat @ mean)
mean = mean + K @ y
cov = (np.eye(self.dim_x, dtype=np.float32) - K @ self._update_mat) @ cov
return mean, cov
class TrackState:
Tracked = 1
Lost = 2
Removed = 3
class STrack:
_next_id = 1
def __init__(self, tlbr, score, kf: KalmanXYAH):
self.kf = kf
self.mean = None
self.cov = None
self.tlbr = np.array(tlbr, dtype=np.float32)
self.score = float(score)
self.track_id = STrack._next_id
STrack._next_id += 1
self.state = TrackState.Tracked
self.is_activated = False
self.frame_id = 0
self.start_frame = 0
self.time_since_update = 0
self.hits = 0
def activate(self, frame_id):
self.frame_id = frame_id
self.start_frame = frame_id
self.time_since_update = 0
xyah = tlbr_to_xyah(self.tlbr)
self.mean, self.cov = self.kf.initiate(xyah)
self.is_activated = True
self.hits = 1
self.state = TrackState.Tracked
def predict(self, dt=1.0):
if self.mean is None:
return
self.mean, self.cov = self.kf.predict(self.mean, self.cov, dt=dt)
self.tlbr = xyah_to_tlbr(self.mean[:4])
self.time_since_update += 1
def update(self, tlbr, score, frame_id):
self.frame_id = frame_id
self.time_since_update = 0
self.score = float(score)
self.hits += 1
xyah = tlbr_to_xyah(tlbr)
self.mean, self.cov = self.kf.update(self.mean, self.cov, xyah)
self.tlbr = xyah_to_tlbr(self.mean[:4])
self.state = TrackState.Tracked
self.is_activated = True
def mark_lost(self):
self.state = TrackState.Lost
def mark_removed(self):
self.state = TrackState.Removed
class BYTETracker:
def __init__(
self,
track_high_thresh=0.35,
track_low_thresh=0.12,
new_track_thresh=0.35,
match_thresh=0.70,
track_buffer=50,
min_hits=2,
):
self.high = float(track_high_thresh)
self.low = float(track_low_thresh)
self.new = float(new_track_thresh)
self.match_thresh = float(match_thresh)
self.track_buffer = int(track_buffer)
self.min_hits = int(min_hits)
self.kf = KalmanXYAH()
self.frame_id = 0
self.tracked = []
self.lost = []
self.removed = []
def update(self, dets_tlbr_score, dt=1.0):
self.frame_id += 1
frame_id = self.frame_id
if dets_tlbr_score is None:
dets_tlbr_score = np.zeros((0, 5), dtype=np.float32)
dets = dets_tlbr_score.astype(np.float32, copy=False)
if len(dets) == 0:
scores = np.zeros((0,), dtype=np.float32)
else:
scores = dets[:, 4]
high_mask = scores >= self.high
low_mask = (scores >= self.low) & (scores < self.high)
dets_high = dets[high_mask]
dets_low = dets[low_mask]
for t in self.tracked:
t.predict(dt=dt)
matches_a, u_trk_a, u_det_a = self._associate(self.tracked, dets_high, iou_thresh=self.match_thresh)
for ti, di in matches_a:
trk = self.tracked[ti]
d = dets_high[di]
trk.update(d[:4], d[4], frame_id)
remaining_tracks = [self.tracked[i] for i in u_trk_a]
matches_b, u_trk_b, _u_det_b = self._associate(
remaining_tracks, dets_low, iou_thresh=max(0.10, self.match_thresh - 0.15)
)
for ti, di in matches_b:
trk = remaining_tracks[ti]
d = dets_low[di]
trk.update(d[:4], d[4], frame_id)
unmatched_after_b = [remaining_tracks[i] for i in u_trk_b]
for trk in unmatched_after_b:
trk.mark_lost()
new_tracked = [t for t in self.tracked if t.state == TrackState.Tracked]
newly_lost = [t for t in self.tracked if t.state == TrackState.Lost]
self.tracked = new_tracked
self.lost.extend(newly_lost)
for di in u_det_a:
d = dets_high[di]
if float(d[4]) >= self.new:
nt = STrack(d[:4], d[4], self.kf)
nt.activate(frame_id)
self.tracked.append(nt)
kept_lost = []
for t in self.lost:
if (frame_id - t.frame_id) <= self.track_buffer:
kept_lost.append(t)
else:
t.mark_removed()
self.removed.append(t)
self.lost = kept_lost
out = []
for t in self.tracked:
if t.hits >= self.min_hits:
out.append(t)
return out
def _associate(self, tracks, dets, iou_thresh):
if len(tracks) == 0 or len(dets) == 0:
return [], list(range(len(tracks))), list(range(len(dets)))
trk_boxes = [t.tlbr for t in tracks]
det_boxes = [d[:4] for d in dets]
ious = iou_matrix(trk_boxes, det_boxes)
cost = 1.0 - ious
matches = hungarian(cost)
matched = []
u_trk = set(range(len(tracks)))
u_det = set(range(len(dets)))
thr = float(iou_thresh)
for ti, di in matches:
if ious[ti, di] >= thr:
matched.append((ti, di))
u_trk.discard(ti)
u_det.discard(di)
return matched, sorted(list(u_trk)), sorted(list(u_det))