You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

395 lines
12 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
import math
from dataclasses import dataclass
from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple
from urllib import error, request
import json
Point3D = Tuple[float, float, float]
SPEED_OF_LIGHT_M_S = 299_792_458.0
@dataclass(frozen=True)
class Sphere:
center: Point3D
radius: float
@dataclass(frozen=True)
class PropagationModel:
tx_power_dbm: float
tx_gain_dbi: float = 0.0
rx_gain_dbi: float = 0.0
path_loss_exponent: float = 2.0
reference_distance_m: float = 1.0
min_distance_m: float = 1e-3
@dataclass(frozen=True)
class ReceiverSignal:
receiver_id: str
center: Point3D
amplitude_dbm: float
frequency_hz: float
@dataclass(frozen=True)
class TrilaterationResult:
point: Point3D
residuals: Tuple[float, float, float]
rmse: float
exact: bool
candidate_points: Tuple[Point3D, ...]
def _validate(spheres: Sequence[Sphere]) -> None:
if len(spheres) != 3:
raise ValueError("Expected exactly 3 spheres.")
for idx, sphere in enumerate(spheres, start=1):
if sphere.radius < 0:
raise ValueError(f"Radius for sphere #{idx} must be non-negative.")
def _vec_sub(a: Point3D, b: Point3D) -> Point3D:
return (a[0] - b[0], a[1] - b[1], a[2] - b[2])
def _vec_add(a: Point3D, b: Point3D) -> Point3D:
return (a[0] + b[0], a[1] + b[1], a[2] + b[2])
def _vec_scale(a: Point3D, scale: float) -> Point3D:
return (a[0] * scale, a[1] * scale, a[2] * scale)
def _vec_dot(a: Point3D, b: Point3D) -> float:
return a[0] * b[0] + a[1] * b[1] + a[2] * b[2]
def _vec_cross(a: Point3D, b: Point3D) -> Point3D:
return (
a[1] * b[2] - a[2] * b[1],
a[2] * b[0] - a[0] * b[2],
a[0] * b[1] - a[1] * b[0],
)
def _vec_norm(a: Point3D) -> float:
return math.sqrt(_vec_dot(a, a))
def _vec_unit(a: Point3D) -> Point3D:
n = _vec_norm(a)
if math.isclose(n, 0.0, abs_tol=1e-12):
raise ValueError("Degenerate receiver geometry: duplicated centers.")
return _vec_scale(a, 1.0 / n)
def _distance(p1: Point3D, p2: Point3D) -> float:
d = _vec_sub(p1, p2)
return _vec_norm(d)
def rssi_to_distance_m(
amplitude_dbm: float, frequency_hz: float, model: PropagationModel
) -> float:
if frequency_hz <= 0.0:
raise ValueError("Frequency must be positive.")
if model.reference_distance_m <= 0.0:
raise ValueError("reference_distance_m must be positive.")
if model.path_loss_exponent <= 0.0:
raise ValueError("path_loss_exponent must be positive.")
fspl_ref_db = 20.0 * math.log10(
4.0 * math.pi * model.reference_distance_m * frequency_hz / SPEED_OF_LIGHT_M_S
)
rx_power_at_ref_dbm = (
model.tx_power_dbm + model.tx_gain_dbi + model.rx_gain_dbi - fspl_ref_db
)
distance = model.reference_distance_m * 10.0 ** (
(rx_power_at_ref_dbm - amplitude_dbm) / (10.0 * model.path_loss_exponent)
)
return max(distance, model.min_distance_m)
def _spheres_from_signals(
signals: Sequence[ReceiverSignal], model: PropagationModel
) -> List[Sphere]:
if len(signals) != 3:
raise ValueError("Expected exactly 3 receiver signals.")
spheres: List[Sphere] = []
for signal in signals:
radius = rssi_to_distance_m(
amplitude_dbm=signal.amplitude_dbm,
frequency_hz=signal.frequency_hz,
model=model,
)
spheres.append(Sphere(center=signal.center, radius=radius))
return spheres
def _sphere_intersection_candidates(spheres: Sequence[Sphere]) -> Tuple[Point3D, ...]:
p1, p2, p3 = spheres[0].center, spheres[1].center, spheres[2].center
r1, r2, r3 = spheres[0].radius, spheres[1].radius, spheres[2].radius
ex = _vec_unit(_vec_sub(p2, p1))
p3p1 = _vec_sub(p3, p1)
i = _vec_dot(ex, p3p1)
temp = _vec_sub(p3p1, _vec_scale(ex, i))
ey = _vec_unit(temp)
ez = _vec_cross(ex, ey)
d = _distance(p1, p2)
j = _vec_dot(ey, p3p1)
if math.isclose(j, 0.0, abs_tol=1e-12):
raise ValueError("Degenerate receiver geometry: centers are collinear.")
x = (r1 * r1 - r2 * r2 + d * d) / (2.0 * d)
y = (r1 * r1 - r3 * r3 + i * i + j * j) / (2.0 * j) - (i / j) * x
z_sq = r1 * r1 - x * x - y * y
base = _vec_add(p1, _vec_add(_vec_scale(ex, x), _vec_scale(ey, y)))
if z_sq < 0.0:
return (base,)
z = math.sqrt(z_sq)
return (
_vec_add(base, _vec_scale(ez, z)),
_vec_add(base, _vec_scale(ez, -z)),
)
def _solve_3x3(a: List[List[float]], b: List[float]) -> Optional[List[float]]:
m = [row[:] + [rhs] for row, rhs in zip(a, b)]
n = 3
for col in range(n):
pivot = max(range(col, n), key=lambda r: abs(m[r][col]))
if math.isclose(m[pivot][col], 0.0, abs_tol=1e-12):
return None
if pivot != col:
m[col], m[pivot] = m[pivot], m[col]
pivot_value = m[col][col]
for k in range(col, n + 1):
m[col][k] /= pivot_value
for row in range(n):
if row == col:
continue
factor = m[row][col]
for k in range(col, n + 1):
m[row][k] -= factor * m[col][k]
return [m[0][n], m[1][n], m[2][n]]
def _gauss_newton_point(
spheres: Sequence[Sphere], initial_point: Point3D, iterations: int = 40
) -> Point3D:
x, y, z = initial_point
damping = 1e-6
for _ in range(iterations):
residuals: List[float] = []
j_rows: List[Point3D] = []
for sphere in spheres:
cx, cy, cz = sphere.center
dx = x - cx
dy = y - cy
dz = z - cz
dist = math.sqrt(dx * dx + dy * dy + dz * dz)
if dist < 1e-9:
dist = 1e-9
residuals.append(dist - sphere.radius)
j_rows.append((dx / dist, dy / dist, dz / dist))
jt_j = [[0.0, 0.0, 0.0] for _ in range(3)]
jt_r = [0.0, 0.0, 0.0]
for (jx, jy, jz), r in zip(j_rows, residuals):
jt_j[0][0] += jx * jx
jt_j[0][1] += jx * jy
jt_j[0][2] += jx * jz
jt_j[1][0] += jy * jx
jt_j[1][1] += jy * jy
jt_j[1][2] += jy * jz
jt_j[2][0] += jz * jx
jt_j[2][1] += jz * jy
jt_j[2][2] += jz * jz
jt_r[0] += jx * r
jt_r[1] += jy * r
jt_r[2] += jz * r
for i in range(3):
jt_j[i][i] += damping
delta = _solve_3x3(jt_j, [-jt_r[0], -jt_r[1], -jt_r[2]])
if delta is None:
break
x += delta[0]
y += delta[1]
z += delta[2]
if max(abs(delta[0]), abs(delta[1]), abs(delta[2])) < 1e-8:
break
return (x, y, z)
def _residuals(point: Point3D, spheres: Sequence[Sphere]) -> Tuple[float, float, float]:
values = tuple(_distance(point, sphere.center) - sphere.radius for sphere in spheres)
return (values[0], values[1], values[2])
def _rmse(residuals: Iterable[float]) -> float:
residual_list = list(residuals)
return math.sqrt(sum(r * r for r in residual_list) / len(residual_list))
def solve_three_sphere_intersection(
spheres: Sequence[Sphere],
tolerance: float = 1e-3,
z_preference: Literal["positive", "negative"] = "positive",
) -> TrilaterationResult:
_validate(spheres)
try:
analytic_candidates = list(_sphere_intersection_candidates(spheres))
except ValueError:
analytic_candidates = []
all_candidates = analytic_candidates[:]
center_guess = (
(spheres[0].center[0] + spheres[1].center[0] + spheres[2].center[0]) / 3.0,
(spheres[0].center[1] + spheres[1].center[1] + spheres[2].center[1]) / 3.0,
(spheres[0].center[2] + spheres[1].center[2] + spheres[2].center[2]) / 3.0,
)
all_candidates.append(_gauss_newton_point(spheres, center_guess))
if analytic_candidates:
all_candidates.append(_gauss_newton_point(spheres, analytic_candidates[0]))
if len(analytic_candidates) == 2:
all_candidates.append(_gauss_newton_point(spheres, analytic_candidates[1]))
scored: List[Tuple[float, float, Point3D, Tuple[float, float, float]]] = []
for point in all_candidates:
current_residuals = _residuals(point, spheres)
score_rmse = _rmse(current_residuals)
z_bias = -point[2] if z_preference == "positive" else point[2]
scored.append((score_rmse, z_bias, point, current_residuals))
scored.sort(key=lambda x: (x[0], x[1]))
best_rmse, _, best_point, best_residuals = scored[0]
exact = all(abs(r) <= tolerance for r in best_residuals)
return TrilaterationResult(
point=best_point,
residuals=best_residuals,
rmse=best_rmse,
exact=exact,
candidate_points=tuple(p for _, _, p, _ in scored),
)
def solve_from_signal_amplitudes(
signals: Sequence[ReceiverSignal],
model: PropagationModel,
tolerance: float = 1e-3,
z_preference: Literal["positive", "negative"] = "positive",
) -> Tuple[TrilaterationResult, List[Sphere]]:
spheres = _spheres_from_signals(signals, model)
result = solve_three_sphere_intersection(
spheres=spheres, tolerance=tolerance, z_preference=z_preference
)
return result, spheres
def build_result_payload(
signals: Sequence[ReceiverSignal],
spheres: Sequence[Sphere],
result: TrilaterationResult,
model: PropagationModel,
) -> Dict[str, object]:
receivers = []
for signal, sphere, residual in zip(signals, spheres, result.residuals):
receivers.append(
{
"receiver_id": signal.receiver_id,
"center": {
"x": signal.center[0],
"y": signal.center[1],
"z": signal.center[2],
},
"frequency_hz": signal.frequency_hz,
"amplitude_dbm": signal.amplitude_dbm,
"radius_m": sphere.radius,
"residual_m": residual,
}
)
return {
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"position": {"x": result.point[0], "y": result.point[1], "z": result.point[2]},
"exact": result.exact,
"rmse_m": result.rmse,
"model": {
"tx_power_dbm": model.tx_power_dbm,
"tx_gain_dbi": model.tx_gain_dbi,
"rx_gain_dbi": model.rx_gain_dbi,
"path_loss_exponent": model.path_loss_exponent,
"reference_distance_m": model.reference_distance_m,
},
"receivers": receivers,
}
def send_payload_to_server(
server_ip: str,
payload: Dict[str, object],
port: int = 8080,
path: str = "/triangulation",
timeout_s: float = 3.0,
) -> Tuple[int, str]:
if not path.startswith("/"):
path = "/" + path
url = f"http://{server_ip}:{port}{path}"
data = json.dumps(payload).encode("utf-8")
req = request.Request(
url=url,
data=data,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with request.urlopen(req, timeout=timeout_s) as response:
body = response.read().decode("utf-8", errors="replace")
return response.status, body
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, body
except error.URLError as exc:
return 0, str(exc.reason)
def solve_and_prepare_payload(
signals: Sequence[ReceiverSignal],
model: PropagationModel,
tolerance: float = 1e-3,
z_preference: Literal["positive", "negative"] = "positive",
) -> Tuple[TrilaterationResult, Dict[str, object]]:
result, spheres = solve_from_signal_amplitudes(
signals=signals,
model=model,
tolerance=tolerance,
z_preference=z_preference,
)
payload = build_result_payload(
signals=signals,
spheres=spheres,
result=result,
model=model,
)
return result, payload