You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
341 lines
9.9 KiB
Python
341 lines
9.9 KiB
Python
# bytetrack_min.py
|
|
import numpy as np
|
|
|
|
def tlbr_to_xyah(tlbr):
|
|
x1, y1, x2, y2 = tlbr
|
|
w = max(1.0, x2 - x1)
|
|
h = max(1.0, y2 - y1)
|
|
cx = x1 + w * 0.5
|
|
cy = y1 + h * 0.5
|
|
a = w / h
|
|
return np.array([cx, cy, a, h], dtype=np.float32)
|
|
|
|
def xyah_to_tlbr(xyah):
|
|
cx, cy, a, h = [float(v) for v in xyah]
|
|
h = max(2.0, h)
|
|
w = max(2.0, a * h)
|
|
x1 = cx - w * 0.5
|
|
y1 = cy - h * 0.5
|
|
x2 = cx + w * 0.5
|
|
y2 = cy + h * 0.5
|
|
return np.array([x1, y1, x2, y2], dtype=np.float32)
|
|
|
|
def iou_tlbr(a, b):
|
|
ax1, ay1, ax2, ay2 = a
|
|
bx1, by1, bx2, by2 = b
|
|
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
|
|
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
|
|
iw = max(0.0, ix2 - ix1)
|
|
ih = max(0.0, iy2 - iy1)
|
|
inter = iw * ih
|
|
if inter <= 0:
|
|
return 0.0
|
|
area_a = max(1.0, (ax2 - ax1)) * max(1.0, (ay2 - ay1))
|
|
area_b = max(1.0, (bx2 - bx1)) * max(1.0, (by2 - by1))
|
|
union = area_a + area_b - inter
|
|
return float(inter / union) if union > 0 else 0.0
|
|
|
|
def iou_matrix(tracks_tlbr, dets_tlbr):
|
|
if len(tracks_tlbr) == 0 or len(dets_tlbr) == 0:
|
|
return np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32)
|
|
M = np.zeros((len(tracks_tlbr), len(dets_tlbr)), dtype=np.float32)
|
|
for i, t in enumerate(tracks_tlbr):
|
|
for j, d in enumerate(dets_tlbr):
|
|
M[i, j] = iou_tlbr(t, d)
|
|
return M
|
|
|
|
# Hungarian assignment (min-cost). We pass cost = 1 - IoU.
|
|
def hungarian(cost):
|
|
cost = cost.copy()
|
|
n, m = cost.shape
|
|
N = max(n, m)
|
|
|
|
pad = np.zeros((N, N), dtype=np.float32)
|
|
pad[:n, :m] = cost
|
|
big = float(cost.max() + 1.0) if cost.size else 1.0
|
|
if n < N:
|
|
pad[n:, :] = big
|
|
if m < N:
|
|
pad[:, m:] = big
|
|
cost = pad
|
|
|
|
N = cost.shape[0]
|
|
u = np.zeros(N, dtype=np.float32)
|
|
v = np.zeros(N, dtype=np.float32)
|
|
p = np.zeros(N, dtype=np.int32)
|
|
way = np.zeros(N, dtype=np.int32)
|
|
|
|
for i in range(1, N):
|
|
p[0] = i
|
|
j0 = 0
|
|
minv = np.full(N, np.inf, dtype=np.float32)
|
|
used = np.zeros(N, dtype=bool)
|
|
way.fill(0)
|
|
|
|
while True:
|
|
used[j0] = True
|
|
i0 = p[j0]
|
|
delta = np.inf
|
|
j1 = 0
|
|
for j in range(1, N):
|
|
if not used[j]:
|
|
cur = cost[i0, j] - u[i0] - v[j]
|
|
if cur < minv[j]:
|
|
minv[j] = cur
|
|
way[j] = j0
|
|
if minv[j] < delta:
|
|
delta = minv[j]
|
|
j1 = j
|
|
for j in range(N):
|
|
if used[j]:
|
|
u[p[j]] += delta
|
|
v[j] -= delta
|
|
else:
|
|
minv[j] -= delta
|
|
j0 = j1
|
|
if p[j0] == 0:
|
|
break
|
|
|
|
while True:
|
|
j1 = way[j0]
|
|
p[j0] = p[j1]
|
|
j0 = j1
|
|
if j0 == 0:
|
|
break
|
|
|
|
assignment = -np.ones(N, dtype=np.int32)
|
|
for j in range(1, N):
|
|
if p[j] != 0:
|
|
assignment[p[j]] = j
|
|
|
|
row_to_col = assignment[:n]
|
|
matches = []
|
|
for r, c in enumerate(row_to_col):
|
|
if 0 <= c < m:
|
|
matches.append((r, int(c)))
|
|
return matches
|
|
|
|
class KalmanXYAH:
|
|
# State: [cx,cy,a,h, vcx,vcy,va,vh]
|
|
def __init__(self):
|
|
self.ndim = 4
|
|
self.dim_x = 8
|
|
self._motion_mat = np.eye(self.dim_x, dtype=np.float32)
|
|
for i in range(self.ndim):
|
|
self._motion_mat[i, self.ndim + i] = 1.0
|
|
self._update_mat = np.eye(self.ndim, self.dim_x, dtype=np.float32)
|
|
|
|
# Noise (tunable)
|
|
self.std_pos = np.array([2.0, 2.0, 1e-2, 2.5], dtype=np.float32)
|
|
self.std_vel = np.array([6.0, 6.0, 1e-2, 6.0], dtype=np.float32)
|
|
|
|
def initiate(self, measurement_xyah):
|
|
mean = np.zeros((self.dim_x,), dtype=np.float32)
|
|
mean[:4] = measurement_xyah
|
|
cov = np.eye(self.dim_x, dtype=np.float32)
|
|
cov[:4, :4] *= 50.0
|
|
cov[4:, 4:] *= 200.0
|
|
return mean, cov
|
|
|
|
def predict(self, mean, cov, dt=1.0):
|
|
motion_mat = self._motion_mat.copy()
|
|
for i in range(self.ndim):
|
|
motion_mat[i, self.ndim + i] = float(dt)
|
|
|
|
std_pos = self.std_pos
|
|
std_vel = self.std_vel
|
|
Q = np.diag(np.concatenate([std_pos**2, std_vel**2]).astype(np.float32))
|
|
|
|
mean = motion_mat @ mean
|
|
cov = motion_mat @ cov @ motion_mat.T + Q
|
|
return mean, cov
|
|
|
|
def update(self, mean, cov, measurement_xyah):
|
|
R = np.diag((self.std_pos ** 2).astype(np.float32))
|
|
S = self._update_mat @ cov @ self._update_mat.T + R
|
|
K = cov @ self._update_mat.T @ np.linalg.inv(S)
|
|
y = measurement_xyah - (self._update_mat @ mean)
|
|
mean = mean + K @ y
|
|
cov = (np.eye(self.dim_x, dtype=np.float32) - K @ self._update_mat) @ cov
|
|
return mean, cov
|
|
|
|
class TrackState:
|
|
Tracked = 1
|
|
Lost = 2
|
|
Removed = 3
|
|
|
|
class STrack:
|
|
_next_id = 1
|
|
|
|
def __init__(self, tlbr, score, kf: KalmanXYAH):
|
|
self.kf = kf
|
|
self.mean = None
|
|
self.cov = None
|
|
self.tlbr = np.array(tlbr, dtype=np.float32)
|
|
self.score = float(score)
|
|
|
|
self.track_id = STrack._next_id
|
|
STrack._next_id += 1
|
|
|
|
self.state = TrackState.Tracked
|
|
self.is_activated = False
|
|
|
|
self.frame_id = 0
|
|
self.start_frame = 0
|
|
|
|
self.time_since_update = 0
|
|
self.hits = 0
|
|
|
|
def activate(self, frame_id):
|
|
self.frame_id = frame_id
|
|
self.start_frame = frame_id
|
|
self.time_since_update = 0
|
|
xyah = tlbr_to_xyah(self.tlbr)
|
|
self.mean, self.cov = self.kf.initiate(xyah)
|
|
self.is_activated = True
|
|
self.hits = 1
|
|
self.state = TrackState.Tracked
|
|
|
|
def predict(self, dt=1.0):
|
|
if self.mean is None:
|
|
return
|
|
self.mean, self.cov = self.kf.predict(self.mean, self.cov, dt=dt)
|
|
self.tlbr = xyah_to_tlbr(self.mean[:4])
|
|
self.time_since_update += 1
|
|
|
|
def update(self, tlbr, score, frame_id):
|
|
self.frame_id = frame_id
|
|
self.time_since_update = 0
|
|
self.score = float(score)
|
|
self.hits += 1
|
|
xyah = tlbr_to_xyah(tlbr)
|
|
self.mean, self.cov = self.kf.update(self.mean, self.cov, xyah)
|
|
self.tlbr = xyah_to_tlbr(self.mean[:4])
|
|
self.state = TrackState.Tracked
|
|
self.is_activated = True
|
|
|
|
def mark_lost(self):
|
|
self.state = TrackState.Lost
|
|
|
|
def mark_removed(self):
|
|
self.state = TrackState.Removed
|
|
|
|
class BYTETracker:
|
|
def __init__(
|
|
self,
|
|
track_high_thresh=0.35,
|
|
track_low_thresh=0.12,
|
|
new_track_thresh=0.35,
|
|
match_thresh=0.70,
|
|
track_buffer=50,
|
|
min_hits=2,
|
|
):
|
|
self.high = float(track_high_thresh)
|
|
self.low = float(track_low_thresh)
|
|
self.new = float(new_track_thresh)
|
|
self.match_thresh = float(match_thresh)
|
|
self.track_buffer = int(track_buffer)
|
|
self.min_hits = int(min_hits)
|
|
|
|
self.kf = KalmanXYAH()
|
|
self.frame_id = 0
|
|
|
|
self.tracked = []
|
|
self.lost = []
|
|
self.removed = []
|
|
|
|
def update(self, dets_tlbr_score, dt=1.0):
|
|
self.frame_id += 1
|
|
frame_id = self.frame_id
|
|
|
|
if dets_tlbr_score is None:
|
|
dets_tlbr_score = np.zeros((0, 5), dtype=np.float32)
|
|
dets = dets_tlbr_score.astype(np.float32, copy=False)
|
|
|
|
if len(dets) == 0:
|
|
scores = np.zeros((0,), dtype=np.float32)
|
|
else:
|
|
scores = dets[:, 4]
|
|
|
|
high_mask = scores >= self.high
|
|
low_mask = (scores >= self.low) & (scores < self.high)
|
|
|
|
dets_high = dets[high_mask]
|
|
dets_low = dets[low_mask]
|
|
|
|
for t in self.tracked:
|
|
t.predict(dt=dt)
|
|
|
|
matches_a, u_trk_a, u_det_a = self._associate(self.tracked, dets_high, iou_thresh=self.match_thresh)
|
|
|
|
for ti, di in matches_a:
|
|
trk = self.tracked[ti]
|
|
d = dets_high[di]
|
|
trk.update(d[:4], d[4], frame_id)
|
|
|
|
remaining_tracks = [self.tracked[i] for i in u_trk_a]
|
|
matches_b, u_trk_b, _u_det_b = self._associate(
|
|
remaining_tracks, dets_low, iou_thresh=max(0.10, self.match_thresh - 0.15)
|
|
)
|
|
|
|
for ti, di in matches_b:
|
|
trk = remaining_tracks[ti]
|
|
d = dets_low[di]
|
|
trk.update(d[:4], d[4], frame_id)
|
|
|
|
unmatched_after_b = [remaining_tracks[i] for i in u_trk_b]
|
|
for trk in unmatched_after_b:
|
|
trk.mark_lost()
|
|
|
|
new_tracked = [t for t in self.tracked if t.state == TrackState.Tracked]
|
|
newly_lost = [t for t in self.tracked if t.state == TrackState.Lost]
|
|
self.tracked = new_tracked
|
|
self.lost.extend(newly_lost)
|
|
|
|
for di in u_det_a:
|
|
d = dets_high[di]
|
|
if float(d[4]) >= self.new:
|
|
nt = STrack(d[:4], d[4], self.kf)
|
|
nt.activate(frame_id)
|
|
self.tracked.append(nt)
|
|
|
|
kept_lost = []
|
|
for t in self.lost:
|
|
if (frame_id - t.frame_id) <= self.track_buffer:
|
|
kept_lost.append(t)
|
|
else:
|
|
t.mark_removed()
|
|
self.removed.append(t)
|
|
self.lost = kept_lost
|
|
|
|
out = []
|
|
for t in self.tracked:
|
|
if t.hits >= self.min_hits:
|
|
out.append(t)
|
|
return out
|
|
|
|
def _associate(self, tracks, dets, iou_thresh):
|
|
if len(tracks) == 0 or len(dets) == 0:
|
|
return [], list(range(len(tracks))), list(range(len(dets)))
|
|
|
|
trk_boxes = [t.tlbr for t in tracks]
|
|
det_boxes = [d[:4] for d in dets]
|
|
|
|
ious = iou_matrix(trk_boxes, det_boxes)
|
|
cost = 1.0 - ious
|
|
|
|
matches = hungarian(cost)
|
|
|
|
matched = []
|
|
u_trk = set(range(len(tracks)))
|
|
u_det = set(range(len(dets)))
|
|
|
|
thr = float(iou_thresh)
|
|
for ti, di in matches:
|
|
if ious[ti, di] >= thr:
|
|
matched.append((ti, di))
|
|
u_trk.discard(ti)
|
|
u_det.discard(di)
|
|
|
|
return matched, sorted(list(u_trk)), sorted(list(u_det))
|