from __future__ import annotations import argparse import json import statistics from pathlib import Path from typing import Dict, List, Sequence, Tuple from urllib import error, request from triangulation import ( PropagationModel, ReceiverSignal, Sphere, rssi_to_distance_m, send_payload_to_server, solve_and_prepare_payload, solve_three_sphere_intersection, ) Point3D = Tuple[float, float, float] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="3D trilateration from config file or direct CLI parameters." ) parser.add_argument( "--config", type=str, default="config.json", help="Path to JSON config. Used when --receiver is not provided.", ) parser.add_argument( "--receiver", action="append", nargs=6, metavar=("ID", "X", "Y", "Z", "AMPLITUDE_DBM", "FREQ_HZ"), help="Direct receiver measurement. Provide exactly 3 times for CLI mode.", ) parser.add_argument("--tx-power-dbm", type=float) parser.add_argument("--tx-gain-dbi", type=float, default=0.0) parser.add_argument("--rx-gain-dbi", type=float, default=0.0) parser.add_argument("--path-loss-exponent", type=float, default=2.0) parser.add_argument("--reference-distance-m", type=float, default=1.0) parser.add_argument("--min-distance-m", type=float, default=1e-3) parser.add_argument("--tolerance", type=float, default=1e-3) parser.add_argument( "--z-preference", choices=("positive", "negative"), default="positive", ) parser.add_argument("--server-ip", type=str, default="") parser.add_argument("--server-port", type=int, default=8080) parser.add_argument("--server-path", type=str, default="/triangulation") parser.add_argument("--timeout-s", type=float, default=3.0) parser.add_argument("--no-print-json", action="store_true") return parser.parse_args() def _load_json(path: str) -> Dict[str, object]: config_path = Path(path) if not config_path.exists(): raise SystemExit( f"Config file '{path}' not found. Create it from config.template.json or pass CLI receivers." ) with config_path.open("r", encoding="utf-8") as file: loaded = json.load(file) if not isinstance(loaded, dict): raise SystemExit("Config root must be a JSON object.") return loaded def _center_from_obj(obj: Dict[str, object]) -> Point3D: center = obj.get("center") if not isinstance(center, dict): raise ValueError("Receiver must contain 'center' object.") return (float(center["x"]), float(center["y"]), float(center["z"])) def _model_from_obj(obj: Dict[str, object]) -> PropagationModel: return PropagationModel( tx_power_dbm=float(obj["tx_power_dbm"]), tx_gain_dbi=float(obj.get("tx_gain_dbi", 0.0)), rx_gain_dbi=float(obj.get("rx_gain_dbi", 0.0)), path_loss_exponent=float(obj.get("path_loss_exponent", 2.0)), reference_distance_m=float(obj.get("reference_distance_m", 1.0)), min_distance_m=float(obj.get("min_distance_m", 1e-3)), ) def _float_from_measurement(item: Dict[str, object], keys: Sequence[str], name: str) -> float: for key in keys: if key in item: return float(item[key]) raise ValueError(f"Missing '{name}' in source measurement.") def _parse_source_payload(payload: object) -> List[Tuple[float, float]]: if isinstance(payload, dict): raw_measurements = payload.get("measurements") if raw_measurements is None: raw_measurements = payload.get("samples") if raw_measurements is None: raw_measurements = payload.get("data") elif isinstance(payload, list): raw_measurements = payload else: raise ValueError("Source payload must be array or object with measurements.") if not isinstance(raw_measurements, list) or not raw_measurements: raise ValueError("Source payload has no measurements.") parsed: List[Tuple[float, float]] = [] for item in raw_measurements: if not isinstance(item, dict): raise ValueError("Each measurement item must be an object.") frequency_hz = _float_from_measurement( item, keys=("frequency_hz", "freq_hz", "frequency", "freq"), name="frequency_hz", ) amplitude_dbm = _float_from_measurement( item, keys=("amplitude_dbm", "rssi_dbm", "amplitude", "rssi"), name="amplitude_dbm", ) parsed.append((frequency_hz, amplitude_dbm)) return parsed def _fetch_source_measurements(url: str, timeout_s: float) -> List[Tuple[float, float]]: req = request.Request(url=url, method="GET", headers={"Accept": "application/json"}) try: with request.urlopen(req, timeout=timeout_s) as response: payload = json.loads(response.read().decode("utf-8")) except error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} for '{url}': {body}") except error.URLError as exc: raise RuntimeError(f"Cannot reach '{url}': {exc.reason}") except TimeoutError: raise RuntimeError(f"Timeout while reading '{url}'") except json.JSONDecodeError as exc: raise RuntimeError(f"Invalid JSON from '{url}': {exc}") return _parse_source_payload(payload) def _aggregate_radius( measurements: Sequence[Tuple[float, float]], model: PropagationModel, method: str, ) -> float: distances = [ rssi_to_distance_m(amplitude_dbm=amplitude, frequency_hz=frequency, model=model) for frequency, amplitude in measurements ] if method == "mean": return sum(distances) / len(distances) if method == "median": return float(statistics.median(distances)) raise ValueError("aggregation must be 'median' or 'mean'") def _run_direct_cli_mode(args: argparse.Namespace) -> int: if not args.receiver or len(args.receiver) != 3: raise SystemExit( "CLI mode requires exactly 3 --receiver entries. Otherwise use --config." ) if args.tx_power_dbm is None: raise SystemExit("CLI mode requires --tx-power-dbm.") signals: List[ReceiverSignal] = [] for receiver_id, x, y, z, amplitude_dbm, frequency_hz in args.receiver: signals.append( ReceiverSignal( receiver_id=receiver_id, center=(float(x), float(y), float(z)), amplitude_dbm=float(amplitude_dbm), frequency_hz=float(frequency_hz), ) ) model = PropagationModel( tx_power_dbm=args.tx_power_dbm, tx_gain_dbi=args.tx_gain_dbi, rx_gain_dbi=args.rx_gain_dbi, path_loss_exponent=args.path_loss_exponent, reference_distance_m=args.reference_distance_m, min_distance_m=args.min_distance_m, ) result, payload = solve_and_prepare_payload( signals=signals, model=model, tolerance=args.tolerance, z_preference=args.z_preference, ) _print_result(result, payload, print_json=not args.no_print_json) return _send_if_needed( payload=payload, server_ip=args.server_ip, port=args.server_port, path=args.server_path, timeout_s=args.timeout_s, ) def _run_config_mode(config: Dict[str, object]) -> int: model_obj = config.get("model") solver_obj = config.get("solver", {}) output_obj = config.get("output", {}) input_obj = config.get("input") if not isinstance(model_obj, dict): raise SystemExit("Config must contain object 'model'.") if not isinstance(input_obj, dict): raise SystemExit("Config must contain object 'input'.") if not isinstance(solver_obj, dict): raise SystemExit("'solver' must be object.") if not isinstance(output_obj, dict): raise SystemExit("'output' must be object.") model = _model_from_obj(model_obj) tolerance = float(solver_obj.get("tolerance", 1e-3)) z_preference = str(solver_obj.get("z_preference", "positive")) if z_preference not in ("positive", "negative"): raise SystemExit("solver.z_preference must be 'positive' or 'negative'.") input_mode = str(input_obj.get("mode", "manual")) if input_mode == "manual": payload, result = _solve_from_manual_config(input_obj, model, tolerance, z_preference) elif input_mode == "http_sources": payload, result = _solve_from_sources_config(input_obj, model, tolerance, z_preference) else: raise SystemExit("input.mode must be 'manual' or 'http_sources'.") print_json = bool(output_obj.get("print_json", True)) _print_result(result, payload, print_json=print_json) server_ip = str(output_obj.get("server_ip", "")) return _send_if_needed( payload=payload, server_ip=server_ip, port=int(output_obj.get("server_port", 8080)), path=str(output_obj.get("server_path", "/triangulation")), timeout_s=float(output_obj.get("timeout_s", 3.0)), ) def _solve_from_manual_config( input_obj: Dict[str, object], model: PropagationModel, tolerance: float, z_preference: str, ): receivers = input_obj.get("receivers") if not isinstance(receivers, list) or len(receivers) != 3: raise SystemExit("input.receivers must contain exactly 3 receivers.") signals: List[ReceiverSignal] = [] for receiver in receivers: if not isinstance(receiver, dict): raise SystemExit("Each receiver in input.receivers must be an object.") signals.append( ReceiverSignal( receiver_id=str(receiver["receiver_id"]), center=_center_from_obj(receiver), amplitude_dbm=float(receiver["amplitude_dbm"]), frequency_hz=float(receiver["frequency_hz"]), ) ) result, payload = solve_and_prepare_payload( signals=signals, model=model, tolerance=tolerance, z_preference=z_preference, # type: ignore[arg-type] ) return payload, result def _solve_from_sources_config( input_obj: Dict[str, object], model: PropagationModel, tolerance: float, z_preference: str, ): receivers = input_obj.get("receivers") if not isinstance(receivers, list) or len(receivers) != 3: raise SystemExit("input.receivers must contain exactly 3 receivers.") timeout_s = float(input_obj.get("source_timeout_s", 3.0)) aggregation = str(input_obj.get("aggregation", "median")) if aggregation not in ("median", "mean"): raise SystemExit("input.aggregation must be 'median' or 'mean'.") spheres: List[Sphere] = [] receiver_payloads: List[Dict[str, object]] = [] for receiver in receivers: if not isinstance(receiver, dict): raise SystemExit("Each receiver in input.receivers must be an object.") receiver_id = str(receiver["receiver_id"]) center = _center_from_obj(receiver) source_url = str(receiver["source_url"]) measurements = _fetch_source_measurements(source_url, timeout_s=timeout_s) radius = _aggregate_radius(measurements, model=model, method=aggregation) spheres.append(Sphere(center=center, radius=radius)) sample_payload = [] for frequency_hz, amplitude_dbm in measurements: sample_payload.append( { "frequency_hz": frequency_hz, "amplitude_dbm": amplitude_dbm, "distance_m": rssi_to_distance_m( amplitude_dbm=amplitude_dbm, frequency_hz=frequency_hz, model=model, ), } ) receiver_payloads.append( { "receiver_id": receiver_id, "center": {"x": center[0], "y": center[1], "z": center[2]}, "source_url": source_url, "aggregation": aggregation, "radius_m": radius, "samples": sample_payload, } ) result = solve_three_sphere_intersection( spheres=spheres, tolerance=tolerance, z_preference=z_preference, # type: ignore[arg-type] ) for idx, residual in enumerate(result.residuals): receiver_payloads[idx]["residual_m"] = residual payload = { "position": {"x": result.point[0], "y": result.point[1], "z": result.point[2]}, "exact": result.exact, "rmse_m": result.rmse, "model": { "tx_power_dbm": model.tx_power_dbm, "tx_gain_dbi": model.tx_gain_dbi, "rx_gain_dbi": model.rx_gain_dbi, "path_loss_exponent": model.path_loss_exponent, "reference_distance_m": model.reference_distance_m, }, "receivers": receiver_payloads, } return payload, result def _print_result(result, payload: Dict[str, object], print_json: bool) -> None: x, y, z = result.point print(f"point: ({x:.6f}, {y:.6f}, {z:.6f})") print( "residuals: " f"({result.residuals[0]:.6e}, {result.residuals[1]:.6e}, {result.residuals[2]:.6e})" ) print(f"rmse: {result.rmse:.6e}") print(f"exact: {result.exact}") if print_json: print(json.dumps(payload, indent=2, ensure_ascii=False)) def _send_if_needed( payload: Dict[str, object], server_ip: str, port: int, path: str, timeout_s: float, ) -> int: if not server_ip: return 0 status, body = send_payload_to_server( server_ip=server_ip, payload=payload, port=port, path=path, timeout_s=timeout_s, ) print(f"server_status: {status}") if body: print(f"server_response: {body}") if status == 0 or status >= 400: return 2 return 0 def main() -> int: args = parse_args() try: if args.receiver: return _run_direct_cli_mode(args) config = _load_json(args.config) return _run_config_mode(config) except (RuntimeError, ValueError, KeyError) as exc: raise SystemExit(str(exc)) if __name__ == "__main__": raise SystemExit(main())