from __future__ import annotations from datetime import datetime, timezone import math from dataclasses import dataclass from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple from urllib import error, request import json Point3D = Tuple[float, float, float] SPEED_OF_LIGHT_M_S = 299_792_458.0 def _utc_now_iso_seconds() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") @dataclass(frozen=True) class Sphere: center: Point3D radius: float @dataclass(frozen=True) class PropagationModel: tx_power_dbm: float tx_gain_dbi: float = 0.0 rx_gain_dbi: float = 0.0 path_loss_exponent: float = 2.0 reference_distance_m: float = 1.0 min_distance_m: float = 1e-3 @dataclass(frozen=True) class ReceiverSignal: receiver_id: str center: Point3D amplitude_dbm: float frequency_hz: float @dataclass(frozen=True) class TrilaterationResult: point: Point3D residuals: Tuple[float, float, float] rmse: float exact: bool candidate_points: Tuple[Point3D, ...] def _validate(spheres: Sequence[Sphere]) -> None: if len(spheres) != 3: raise ValueError("Expected exactly 3 spheres.") for idx, sphere in enumerate(spheres, start=1): if sphere.radius < 0: raise ValueError(f"Radius for sphere #{idx} must be non-negative.") def _vec_sub(a: Point3D, b: Point3D) -> Point3D: return (a[0] - b[0], a[1] - b[1], a[2] - b[2]) def _vec_add(a: Point3D, b: Point3D) -> Point3D: return (a[0] + b[0], a[1] + b[1], a[2] + b[2]) def _vec_scale(a: Point3D, scale: float) -> Point3D: return (a[0] * scale, a[1] * scale, a[2] * scale) def _vec_dot(a: Point3D, b: Point3D) -> float: return a[0] * b[0] + a[1] * b[1] + a[2] * b[2] def _vec_cross(a: Point3D, b: Point3D) -> Point3D: return ( a[1] * b[2] - a[2] * b[1], a[2] * b[0] - a[0] * b[2], a[0] * b[1] - a[1] * b[0], ) def _vec_norm(a: Point3D) -> float: return math.sqrt(_vec_dot(a, a)) def _vec_unit(a: Point3D) -> Point3D: n = _vec_norm(a) if math.isclose(n, 0.0, abs_tol=1e-12): raise ValueError("Degenerate receiver geometry: duplicated centers.") return _vec_scale(a, 1.0 / n) def _distance(p1: Point3D, p2: Point3D) -> float: d = _vec_sub(p1, p2) return _vec_norm(d) def rssi_to_distance_m( amplitude_dbm: float, frequency_hz: float, model: PropagationModel ) -> float: if frequency_hz <= 0.0: raise ValueError("Frequency must be positive.") if model.reference_distance_m <= 0.0: raise ValueError("reference_distance_m must be positive.") if model.path_loss_exponent <= 0.0: raise ValueError("path_loss_exponent must be positive.") fspl_ref_db = 20.0 * math.log10( 4.0 * math.pi * model.reference_distance_m * frequency_hz / SPEED_OF_LIGHT_M_S ) rx_power_at_ref_dbm = ( model.tx_power_dbm + model.tx_gain_dbi + model.rx_gain_dbi - fspl_ref_db ) distance = model.reference_distance_m * 10.0 ** ( (rx_power_at_ref_dbm - amplitude_dbm) / (10.0 * model.path_loss_exponent) ) return max(distance, model.min_distance_m) def _spheres_from_signals( signals: Sequence[ReceiverSignal], model: PropagationModel ) -> List[Sphere]: if len(signals) != 3: raise ValueError("Expected exactly 3 receiver signals.") spheres: List[Sphere] = [] for signal in signals: radius = rssi_to_distance_m( amplitude_dbm=signal.amplitude_dbm, frequency_hz=signal.frequency_hz, model=model, ) spheres.append(Sphere(center=signal.center, radius=radius)) return spheres def _sphere_intersection_candidates(spheres: Sequence[Sphere]) -> Tuple[Point3D, ...]: p1, p2, p3 = spheres[0].center, spheres[1].center, spheres[2].center r1, r2, r3 = spheres[0].radius, spheres[1].radius, spheres[2].radius ex = _vec_unit(_vec_sub(p2, p1)) p3p1 = _vec_sub(p3, p1) i = _vec_dot(ex, p3p1) temp = _vec_sub(p3p1, _vec_scale(ex, i)) ey = _vec_unit(temp) ez = _vec_cross(ex, ey) d = _distance(p1, p2) j = _vec_dot(ey, p3p1) if math.isclose(j, 0.0, abs_tol=1e-12): raise ValueError("Degenerate receiver geometry: centers are collinear.") x = (r1 * r1 - r2 * r2 + d * d) / (2.0 * d) y = (r1 * r1 - r3 * r3 + i * i + j * j) / (2.0 * j) - (i / j) * x z_sq = r1 * r1 - x * x - y * y base = _vec_add(p1, _vec_add(_vec_scale(ex, x), _vec_scale(ey, y))) if z_sq < 0.0: return (base,) z = math.sqrt(z_sq) return ( _vec_add(base, _vec_scale(ez, z)), _vec_add(base, _vec_scale(ez, -z)), ) def _solve_3x3(a: List[List[float]], b: List[float]) -> Optional[List[float]]: m = [row[:] + [rhs] for row, rhs in zip(a, b)] n = 3 for col in range(n): pivot = max(range(col, n), key=lambda r: abs(m[r][col])) if math.isclose(m[pivot][col], 0.0, abs_tol=1e-12): return None if pivot != col: m[col], m[pivot] = m[pivot], m[col] pivot_value = m[col][col] for k in range(col, n + 1): m[col][k] /= pivot_value for row in range(n): if row == col: continue factor = m[row][col] for k in range(col, n + 1): m[row][k] -= factor * m[col][k] return [m[0][n], m[1][n], m[2][n]] def _gauss_newton_point( spheres: Sequence[Sphere], initial_point: Point3D, iterations: int = 40 ) -> Point3D: x, y, z = initial_point damping = 1e-6 for _ in range(iterations): residuals: List[float] = [] j_rows: List[Point3D] = [] for sphere in spheres: cx, cy, cz = sphere.center dx = x - cx dy = y - cy dz = z - cz dist = math.sqrt(dx * dx + dy * dy + dz * dz) if dist < 1e-9: dist = 1e-9 residuals.append(dist - sphere.radius) j_rows.append((dx / dist, dy / dist, dz / dist)) jt_j = [[0.0, 0.0, 0.0] for _ in range(3)] jt_r = [0.0, 0.0, 0.0] for (jx, jy, jz), r in zip(j_rows, residuals): jt_j[0][0] += jx * jx jt_j[0][1] += jx * jy jt_j[0][2] += jx * jz jt_j[1][0] += jy * jx jt_j[1][1] += jy * jy jt_j[1][2] += jy * jz jt_j[2][0] += jz * jx jt_j[2][1] += jz * jy jt_j[2][2] += jz * jz jt_r[0] += jx * r jt_r[1] += jy * r jt_r[2] += jz * r for i in range(3): jt_j[i][i] += damping delta = _solve_3x3(jt_j, [-jt_r[0], -jt_r[1], -jt_r[2]]) if delta is None: break x += delta[0] y += delta[1] z += delta[2] if max(abs(delta[0]), abs(delta[1]), abs(delta[2])) < 1e-8: break return (x, y, z) def _residuals(point: Point3D, spheres: Sequence[Sphere]) -> Tuple[float, float, float]: values = tuple(_distance(point, sphere.center) - sphere.radius for sphere in spheres) return (values[0], values[1], values[2]) def _rmse(residuals: Iterable[float]) -> float: residual_list = list(residuals) return math.sqrt(sum(r * r for r in residual_list) / len(residual_list)) def solve_three_sphere_intersection( spheres: Sequence[Sphere], tolerance: float = 1e-3, z_preference: Literal["positive", "negative"] = "positive", ) -> TrilaterationResult: _validate(spheres) try: analytic_candidates = list(_sphere_intersection_candidates(spheres)) except ValueError: analytic_candidates = [] all_candidates = analytic_candidates[:] center_guess = ( (spheres[0].center[0] + spheres[1].center[0] + spheres[2].center[0]) / 3.0, (spheres[0].center[1] + spheres[1].center[1] + spheres[2].center[1]) / 3.0, (spheres[0].center[2] + spheres[1].center[2] + spheres[2].center[2]) / 3.0, ) all_candidates.append(_gauss_newton_point(spheres, center_guess)) if analytic_candidates: all_candidates.append(_gauss_newton_point(spheres, analytic_candidates[0])) if len(analytic_candidates) == 2: all_candidates.append(_gauss_newton_point(spheres, analytic_candidates[1])) scored: List[Tuple[float, float, Point3D, Tuple[float, float, float]]] = [] for point in all_candidates: current_residuals = _residuals(point, spheres) score_rmse = _rmse(current_residuals) z_bias = -point[2] if z_preference == "positive" else point[2] scored.append((score_rmse, z_bias, point, current_residuals)) scored.sort(key=lambda x: (x[0], x[1])) best_rmse, _, best_point, best_residuals = scored[0] exact = all(abs(r) <= tolerance for r in best_residuals) return TrilaterationResult( point=best_point, residuals=best_residuals, rmse=best_rmse, exact=exact, candidate_points=tuple(p for _, _, p, _ in scored), ) def solve_from_signal_amplitudes( signals: Sequence[ReceiverSignal], model: PropagationModel, tolerance: float = 1e-3, z_preference: Literal["positive", "negative"] = "positive", ) -> Tuple[TrilaterationResult, List[Sphere]]: spheres = _spheres_from_signals(signals, model) result = solve_three_sphere_intersection( spheres=spheres, tolerance=tolerance, z_preference=z_preference ) return result, spheres def build_result_payload( signals: Sequence[ReceiverSignal], spheres: Sequence[Sphere], result: TrilaterationResult, model: PropagationModel, ) -> Dict[str, object]: receivers = [] for signal, sphere, residual in zip(signals, spheres, result.residuals): receivers.append( { "receiver_id": signal.receiver_id, "center": { "x": signal.center[0], "y": signal.center[1], "z": signal.center[2], }, "frequency_hz": signal.frequency_hz, "amplitude_dbm": signal.amplitude_dbm, "radius_m": sphere.radius, "residual_m": residual, } ) return { "timestamp_utc": _utc_now_iso_seconds(), "position": {"x": result.point[0], "y": result.point[1], "z": result.point[2]}, "exact": result.exact, "rmse_m": result.rmse, "model": { "tx_power_dbm": model.tx_power_dbm, "tx_gain_dbi": model.tx_gain_dbi, "rx_gain_dbi": model.rx_gain_dbi, "path_loss_exponent": model.path_loss_exponent, "reference_distance_m": model.reference_distance_m, }, "receivers": receivers, } def send_payload_to_server( server_ip: str, payload: Dict[str, object], port: int = 8080, path: str = "/triangulation", timeout_s: float = 3.0, ) -> Tuple[int, str]: if not path.startswith("/"): path = "/" + path url = f"http://{server_ip}:{port}{path}" data = json.dumps(payload).encode("utf-8") req = request.Request( url=url, data=data, method="POST", headers={"Content-Type": "application/json"}, ) try: with request.urlopen(req, timeout=timeout_s) as response: body = response.read().decode("utf-8", errors="replace") return response.status, body except error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") return exc.code, body except error.URLError as exc: return 0, str(exc.reason) def solve_and_prepare_payload( signals: Sequence[ReceiverSignal], model: PropagationModel, tolerance: float = 1e-3, z_preference: Literal["positive", "negative"] = "positive", ) -> Tuple[TrilaterationResult, Dict[str, object]]: result, spheres = solve_from_signal_amplitudes( signals=signals, model=model, tolerance=tolerance, z_preference=z_preference, ) payload = build_result_payload( signals=signals, spheres=spheres, result=result, model=model, ) return result, payload