You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1284 lines
50 KiB
Python

from __future__ import annotations
import argparse
import hmac
import itertools
import json
import math
import mimetypes
import statistics
import threading
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
from urllib import error, parse, request
from triangulation import (
PropagationModel,
Sphere,
rssi_to_distance_m,
send_payload_to_server,
solve_three_sphere_intersection,
)
Point3D = Tuple[float, float, float]
MAX_CONFIG_BODY_BYTES = 1_000_000 # 1 MB guardrail for /config POST.
HZ_IN_MHZ = 1_000_000.0
def _utc_now_iso_seconds() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _load_json(path: str) -> Dict[str, object]:
file_path = Path(path)
if not file_path.exists():
raise SystemExit(f"Config file not found: {path}")
# Accept optional UTF-8 BOM to avoid startup failures with edited JSON files.
with file_path.open("r", encoding="utf-8-sig") as fh:
data = json.load(fh)
if not isinstance(data, dict):
raise SystemExit("Config root must be a JSON object.")
return data
def _center_from_obj(obj: Dict[str, object]) -> Point3D:
center = obj.get("center")
if not isinstance(center, dict):
raise ValueError("Receiver center must be an object.")
return (float(center["x"]), float(center["y"]), float(center["z"]))
def _parse_model(config: Dict[str, object]) -> PropagationModel:
model_obj = config.get("model")
if not isinstance(model_obj, dict):
raise ValueError("Config must contain object 'model'.")
return PropagationModel(
tx_power_dbm=float(model_obj["tx_power_dbm"]),
tx_gain_dbi=float(model_obj.get("tx_gain_dbi", 0.0)),
rx_gain_dbi=float(model_obj.get("rx_gain_dbi", 0.0)),
path_loss_exponent=float(model_obj.get("path_loss_exponent", 2.0)),
reference_distance_m=float(model_obj.get("reference_distance_m", 1.0)),
min_distance_m=float(model_obj.get("min_distance_m", 1e-3)),
)
def _float_from_measurement(
item: Dict[str, object],
keys: Sequence[str],
field_name: str,
source_label: str,
row_index: int,
) -> float:
for key in keys:
if key in item:
value = item[key]
try:
parsed = float(value)
except (TypeError, ValueError):
raise ValueError(
f"{source_label}: row #{row_index} field '{key}' must be numeric, got {value!r}."
) from None
if not math.isfinite(parsed):
raise ValueError(
f"{source_label}: row #{row_index} field '{key}' must be finite, got {value!r}."
)
return parsed
raise ValueError(f"{source_label}: row #{row_index} missing field '{field_name}'.")
def _float_with_key_from_measurement(
item: Dict[str, object],
keys: Sequence[str],
field_name: str,
source_label: str,
row_index: int,
) -> Tuple[str, float]:
for key in keys:
if key in item:
value = _float_from_measurement(
item=item,
keys=(key,),
field_name=field_name,
source_label=source_label,
row_index=row_index,
)
return key, value
raise ValueError(f"{source_label}: row #{row_index} missing field '{field_name}'.")
def _parse_frequency_hz_from_measurement(
row: Dict[str, object],
source_label: str,
row_index: int,
) -> float:
key, value = _float_with_key_from_measurement(
row,
keys=(
"frequency_hz",
"freq_hz",
"f_hz",
"frequency_mhz",
"freq_mhz",
"f_mhz",
"frequency",
"freq",
"f",
),
field_name="frequency",
source_label=source_label,
row_index=row_index,
)
if key in ("frequency_hz", "freq_hz", "f_hz"):
return value
if key in ("frequency_mhz", "freq_mhz", "f_mhz"):
return value * HZ_IN_MHZ
# For generic fields default to MHz in this project.
# Keep backward compatibility: very large values are treated as Hz.
if value >= 10_000_000.0:
return value
return value * HZ_IN_MHZ
def _parse_receiver_input_filter(
receiver_obj: Dict[str, object],
receiver_id: str,
default_filter_obj: Optional[Dict[str, object]] = None,
) -> Dict[str, object]:
raw_receiver_filter = receiver_obj.get("input_filter")
if raw_receiver_filter is None:
raw_receiver_filter = {}
if not isinstance(raw_receiver_filter, dict):
raise ValueError(f"receiver '{receiver_id}': input_filter must be an object.")
merged_filter: Dict[str, object] = {}
if isinstance(default_filter_obj, dict):
merged_filter.update(default_filter_obj)
merged_filter.update(raw_receiver_filter)
filter_obj = merged_filter
if not isinstance(filter_obj, dict):
raise ValueError(f"receiver '{receiver_id}': input_filter must be an object.")
min_freq_mhz_raw = filter_obj.get("min_frequency_mhz")
max_freq_mhz_raw = filter_obj.get("max_frequency_mhz")
if min_freq_mhz_raw is None and "min_frequency_hz" in filter_obj:
min_freq_mhz_raw = float(filter_obj["min_frequency_hz"]) / HZ_IN_MHZ
if max_freq_mhz_raw is None and "max_frequency_hz" in filter_obj:
max_freq_mhz_raw = float(filter_obj["max_frequency_hz"]) / HZ_IN_MHZ
parsed = {
"enabled": bool(filter_obj.get("enabled", False)),
"min_frequency_mhz": float(min_freq_mhz_raw if min_freq_mhz_raw is not None else 0.0),
"max_frequency_mhz": float(max_freq_mhz_raw if max_freq_mhz_raw is not None else 1_000_000_000.0),
"min_rssi_dbm": float(filter_obj.get("min_rssi_dbm", -200.0)),
"max_rssi_dbm": float(filter_obj.get("max_rssi_dbm", 50.0)),
}
if parsed["max_frequency_mhz"] < parsed["min_frequency_mhz"]:
raise ValueError(
f"receiver '{receiver_id}': input_filter.max_frequency_mhz must be >= min_frequency_mhz."
)
if parsed["max_rssi_dbm"] < parsed["min_rssi_dbm"]:
raise ValueError(
f"receiver '{receiver_id}': input_filter.max_rssi_dbm must be >= min_rssi_dbm."
)
return parsed
def _apply_receiver_input_filter(
measurements: Sequence[Tuple[float, float]],
receiver_filter: Dict[str, object],
) -> List[Tuple[float, float]]:
if not bool(receiver_filter.get("enabled", False)):
return list(measurements)
min_frequency_mhz = float(receiver_filter["min_frequency_mhz"])
max_frequency_mhz = float(receiver_filter["max_frequency_mhz"])
min_rssi_dbm = float(receiver_filter["min_rssi_dbm"])
max_rssi_dbm = float(receiver_filter["max_rssi_dbm"])
filtered = []
for frequency_hz, rssi_dbm in measurements:
frequency_mhz = frequency_hz / HZ_IN_MHZ
if not (min_frequency_mhz <= frequency_mhz <= max_frequency_mhz):
continue
if not (min_rssi_dbm <= rssi_dbm <= max_rssi_dbm):
continue
filtered.append((frequency_hz, rssi_dbm))
return filtered
def _parse_receiver_configured_frequencies(
receiver_obj: Dict[str, object],
receiver_id: str,
) -> List[int]:
raw_frequencies = receiver_obj.get("frequencies_mhz")
if raw_frequencies is None:
return []
if not isinstance(raw_frequencies, list):
raise ValueError(
f"receiver '{receiver_id}': frequencies_mhz must be an array of numbers."
)
parsed_hz: List[int] = []
for index, value in enumerate(raw_frequencies, start=1):
try:
frequency_mhz = float(value)
except (TypeError, ValueError):
raise ValueError(
f"receiver '{receiver_id}': frequencies_mhz[{index}] must be numeric."
) from None
if not math.isfinite(frequency_mhz) or frequency_mhz <= 0.0:
raise ValueError(
f"receiver '{receiver_id}': frequencies_mhz[{index}] must be > 0."
)
parsed_hz.append(int(round(frequency_mhz * HZ_IN_MHZ)))
return sorted(set(parsed_hz))
def _apply_receiver_configured_frequencies(
measurements: Sequence[Tuple[float, float]],
configured_frequencies_hz: Sequence[int],
) -> List[Tuple[float, float]]:
if not configured_frequencies_hz:
return list(measurements)
allowed = set(int(value) for value in configured_frequencies_hz)
filtered: List[Tuple[float, float]] = []
for frequency_hz, rssi_dbm in measurements:
rounded_hz = int(round(frequency_hz))
if rounded_hz in allowed:
filtered.append((float(rounded_hz), rssi_dbm))
return filtered
def _parse_output_server_config(
output_obj: Dict[str, object],
default_name: str,
) -> Dict[str, object]:
name = str(output_obj.get("name", default_name)).strip() or default_name
ip = str(output_obj.get("ip", "")).strip()
# Keep backward compatibility for explicit enabled flag, but allow simplified config:
# if enabled is omitted, non-empty IP means enabled output target.
if "enabled" in output_obj:
enabled = bool(output_obj.get("enabled"))
else:
enabled = bool(ip)
port = int(output_obj.get("port", 8080))
path = str(output_obj.get("path", "/triangulation"))
timeout_s = float(output_obj.get("timeout_s", 3.0))
frequency_filter_enabled = bool(output_obj.get("frequency_filter_enabled", False))
min_frequency_mhz_raw = output_obj.get("min_frequency_mhz")
max_frequency_mhz_raw = output_obj.get("max_frequency_mhz")
if min_frequency_mhz_raw is None and "min_frequency_hz" in output_obj:
min_frequency_mhz_raw = float(output_obj["min_frequency_hz"]) / HZ_IN_MHZ
if max_frequency_mhz_raw is None and "max_frequency_hz" in output_obj:
max_frequency_mhz_raw = float(output_obj["max_frequency_hz"]) / HZ_IN_MHZ
min_frequency_mhz = float(min_frequency_mhz_raw or 0.0)
max_frequency_mhz = float(max_frequency_mhz_raw or 0.0)
min_frequency_hz = min_frequency_mhz * HZ_IN_MHZ
max_frequency_hz = max_frequency_mhz * HZ_IN_MHZ
if enabled and not ip:
raise ValueError(f"runtime output '{name}': ip must be non-empty when enabled=true.")
if frequency_filter_enabled:
if min_frequency_mhz <= 0.0:
raise ValueError(
f"runtime output '{name}': min_frequency_mhz must be > 0 when frequency filter is enabled."
)
if max_frequency_mhz <= 0.0:
raise ValueError(
f"runtime output '{name}': max_frequency_mhz must be > 0 when frequency filter is enabled."
)
if max_frequency_mhz < min_frequency_mhz:
raise ValueError(
f"runtime output '{name}': max_frequency_mhz must be >= min_frequency_mhz."
)
return {
"name": name,
"enabled": enabled,
"ip": ip,
"port": port,
"path": path,
"timeout_s": timeout_s,
"frequency_filter_enabled": frequency_filter_enabled,
"min_frequency_mhz": min_frequency_mhz,
"max_frequency_mhz": max_frequency_mhz,
"min_frequency_hz": min_frequency_hz,
"max_frequency_hz": max_frequency_hz,
}
def parse_source_payload(
payload: object,
source_label: str,
expected_receiver_id: Optional[str] = None,
) -> List[Tuple[float, float]]:
if isinstance(payload, dict):
if expected_receiver_id is not None and "receiver_id" in payload:
payload_receiver_id = str(payload["receiver_id"])
if payload_receiver_id != expected_receiver_id:
raise ValueError(
f"{source_label}: payload receiver_id '{payload_receiver_id}' "
f"does not match expected '{expected_receiver_id}'."
)
raw_items = payload.get("measurements")
if raw_items is None:
raw_items = payload.get("samples")
if raw_items is None:
raw_items = payload.get("data")
if raw_items is None:
raw_items = payload.get("m")
elif isinstance(payload, list):
raw_items = payload
else:
raise ValueError(f"{source_label}: payload must be list or object.")
if not isinstance(raw_items, list) or not raw_items:
raise ValueError(f"{source_label}: payload contains no measurements.")
parsed_items: List[Tuple[float, float]] = []
for row_index, row in enumerate(raw_items, start=1):
if not isinstance(row, dict):
raise ValueError(f"{source_label}: row #{row_index} must be an object.")
frequency_hz = _parse_frequency_hz_from_measurement(
row=row,
source_label=source_label,
row_index=row_index,
)
amplitude_dbm = _float_from_measurement(
row,
keys=("amplitude_dbm", "rssi_dbm", "dbm", "amplitude", "rssi"),
field_name="amplitude_dbm",
source_label=source_label,
row_index=row_index,
)
if frequency_hz <= 0.0:
raise ValueError(
f"{source_label}: row #{row_index} field 'frequency_hz' must be > 0."
)
parsed_items.append((frequency_hz, amplitude_dbm))
return parsed_items
def aggregate_radius(
measurements: Sequence[Tuple[float, float]],
model: PropagationModel,
method: str,
) -> float:
distances = [
rssi_to_distance_m(amplitude_dbm=amplitude_dbm, frequency_hz=frequency_hz, model=model)
for frequency_hz, amplitude_dbm in measurements
]
if method == "median":
return float(statistics.median(distances))
if method == "mean":
return float(sum(distances) / len(distances))
raise ValueError("aggregation must be 'median' or 'mean'")
def _group_by_frequency(
measurements: Sequence[Tuple[float, float]],
) -> Dict[float, List[Tuple[float, float]]]:
grouped: Dict[float, List[Tuple[float, float]]] = {}
for frequency_hz, amplitude_dbm in measurements:
if frequency_hz not in grouped:
grouped[frequency_hz] = []
grouped[frequency_hz].append((frequency_hz, amplitude_dbm))
return grouped
def _fetch_measurements(
url: str,
timeout_s: float,
expected_receiver_id: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
) -> List[Tuple[float, float]]:
source_label = f"source_url={url}"
request_headers = {"Accept": "application/json"}
if headers:
request_headers.update(headers)
req = request.Request(url=url, method="GET", headers=request_headers)
try:
with request.urlopen(req, timeout=timeout_s) as response:
payload = json.loads(response.read().decode("utf-8"))
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} for '{url}': {body}")
except error.URLError as exc:
raise RuntimeError(f"Cannot reach '{url}': {exc.reason}")
except TimeoutError:
raise RuntimeError(f"Timeout while reading '{url}'")
except json.JSONDecodeError as exc:
raise RuntimeError(f"Invalid JSON from '{url}': {exc}")
try:
return parse_source_payload(
payload=payload,
source_label=source_label,
expected_receiver_id=expected_receiver_id,
)
except ValueError as exc:
raise RuntimeError(str(exc)) from None
class AutoService:
def __init__(self, config: Dict[str, object], config_path: Optional[str] = None) -> None:
self.config = config
self.config_path = config_path
self.model = _parse_model(config)
solver_obj = config.get("solver", {})
runtime_obj = config.get("runtime", {})
input_obj = config.get("input")
if not isinstance(solver_obj, dict):
raise ValueError("solver must be object.")
if not isinstance(runtime_obj, dict):
raise ValueError("runtime must be object.")
if not isinstance(input_obj, dict):
raise ValueError("input must be object.")
self.tolerance = float(solver_obj.get("tolerance", 1e-3))
self.z_preference = str(solver_obj.get("z_preference", "positive"))
if self.z_preference not in ("positive", "negative"):
raise ValueError("solver.z_preference must be 'positive' or 'negative'.")
self.poll_interval_s = float(runtime_obj.get("poll_interval_s", 1.0))
self.write_api_token = str(runtime_obj.get("write_api_token", "")).strip()
parsed_output_servers: List[Dict[str, object]] = []
output_servers_obj = runtime_obj.get("output_servers")
if output_servers_obj is not None:
if not isinstance(output_servers_obj, list):
raise ValueError("runtime.output_servers must be list.")
for index, output_obj in enumerate(output_servers_obj, start=1):
if not isinstance(output_obj, dict):
raise ValueError("runtime.output_servers[] must be object.")
parsed_output_servers.append(
_parse_output_server_config(
output_obj=output_obj,
default_name=f"output_{index}",
)
)
else:
output_obj = runtime_obj.get("output_server", {})
if output_obj is None:
output_obj = {}
if not isinstance(output_obj, dict):
raise ValueError("runtime.output_server must be object.")
parsed_output_servers.append(
_parse_output_server_config(
output_obj=output_obj,
default_name="output_1",
)
)
self.output_servers = parsed_output_servers
self.output_enabled = any(bool(server.get("enabled")) for server in self.output_servers)
self.source_timeout_s = float(input_obj.get("source_timeout_s", 3.0))
self.aggregation = str(input_obj.get("aggregation", "median"))
if self.aggregation not in ("median", "mean"):
raise ValueError("input.aggregation must be 'median' or 'mean'.")
input_mode = str(input_obj.get("mode", "http_sources"))
if input_mode != "http_sources":
raise ValueError("Automatic service requires input.mode = 'http_sources'.")
raw_default_filter = input_obj.get("default_input_filter")
default_filter_obj: Optional[Dict[str, object]] = None
if raw_default_filter is not None:
if not isinstance(raw_default_filter, dict):
raise ValueError("input.default_input_filter must be object.")
default_filter_obj = raw_default_filter
receivers = input_obj.get("receivers")
if not isinstance(receivers, list) or len(receivers) < 3:
raise ValueError("input.receivers must contain at least 3 objects.")
parsed_receivers: List[Dict[str, object]] = []
for receiver in receivers:
if not isinstance(receiver, dict):
raise ValueError("Each receiver must be object.")
access_obj = receiver.get("access", {})
if access_obj is None:
access_obj = {}
if not isinstance(access_obj, dict):
raise ValueError("receiver.access must be object.")
source_url = str(
receiver.get("source_url")
or access_obj.get("url")
or access_obj.get("source_url")
or ""
).strip()
if not source_url:
raise ValueError(
f"receiver '{receiver.get('receiver_id', '')}': source_url/access.url must be non-empty."
)
source_headers: Dict[str, str] = {}
source_api_token = str(
receiver.get("source_api_token") or access_obj.get("api_token") or ""
).strip()
if source_api_token:
source_headers["Authorization"] = f"Bearer {source_api_token}"
parsed_receivers.append(
{
"receiver_id": str(receiver["receiver_id"]),
"center": _center_from_obj(receiver),
"source_url": source_url,
"source_headers": source_headers,
"configured_frequencies_hz": _parse_receiver_configured_frequencies(
receiver_obj=receiver,
receiver_id=str(receiver["receiver_id"]),
),
"input_filter": _parse_receiver_input_filter(
receiver_obj=receiver,
receiver_id=str(receiver["receiver_id"]),
default_filter_obj=default_filter_obj,
),
}
)
self.receivers = parsed_receivers
self.state_lock = threading.Lock()
self.latest_payload: Optional[Dict[str, object]] = None
self.last_error: str = "no data yet"
self.updated_at_utc: Optional[str] = None
self.last_output_delivery: Dict[str, object] = {
"enabled": self.output_enabled,
"status": "disabled" if not self.output_enabled else "pending",
"http_status": None,
"response_body": "",
"sent_at_utc": None,
"servers": [
{
"name": server["name"],
"enabled": bool(server["enabled"]),
"status": "disabled" if not bool(server["enabled"]) else "pending",
"http_status": None,
"response_body": "",
"sent_at_utc": None,
"target": {
"ip": server["ip"],
"port": server["port"],
"path": server["path"],
},
"frequency_filter": {
"enabled": server["frequency_filter_enabled"],
"min_frequency_mhz": server["min_frequency_mhz"],
"max_frequency_mhz": server["max_frequency_mhz"],
},
}
for server in self.output_servers
],
}
self.stop_event = threading.Event()
self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
def start(self) -> None:
self.poll_thread.start()
def stop(self) -> None:
self.stop_event.set()
if self.poll_thread.is_alive():
self.poll_thread.join(timeout=2.0)
def refresh_once(self) -> None:
receiver_payloads: List[Dict[str, object]] = []
grouped_by_receiver: List[Dict[float, List[Tuple[float, float]]]] = []
for receiver in self.receivers:
receiver_id = str(receiver["receiver_id"])
center = receiver["center"]
source_url = str(receiver["source_url"])
source_headers = receiver.get("source_headers")
raw_measurements = _fetch_measurements(
source_url,
timeout_s=self.source_timeout_s,
expected_receiver_id=receiver_id,
headers=source_headers if isinstance(source_headers, dict) else None,
)
receiver_filter = receiver["input_filter"]
measurements = _apply_receiver_input_filter(
raw_measurements, receiver_filter=receiver_filter
)
configured_frequencies_hz = receiver.get("configured_frequencies_hz", [])
if isinstance(configured_frequencies_hz, list):
measurements = _apply_receiver_configured_frequencies(
measurements,
configured_frequencies_hz=configured_frequencies_hz,
)
if not measurements:
raise RuntimeError(
f"receiver '{receiver_id}': no measurements left after configured filters."
)
grouped = _group_by_frequency(measurements)
grouped_by_receiver.append(grouped)
radius_m = aggregate_radius(measurements, model=self.model, method=self.aggregation)
samples = []
for frequency_hz, amplitude_dbm in measurements:
samples.append(
{
"frequency_hz": frequency_hz,
"frequency_mhz": frequency_hz / HZ_IN_MHZ,
"amplitude_dbm": amplitude_dbm,
"distance_m": rssi_to_distance_m(
amplitude_dbm=amplitude_dbm,
frequency_hz=frequency_hz,
model=self.model,
),
}
)
receiver_payloads.append(
{
"receiver_id": receiver_id,
"center": {"x": center[0], "y": center[1], "z": center[2]},
"source_url": source_url,
"aggregation": self.aggregation,
"input_filter": receiver_filter,
"configured_frequencies_mhz": [
float(int(value)) / HZ_IN_MHZ
for value in (
configured_frequencies_hz
if isinstance(configured_frequencies_hz, list)
else []
)
],
"raw_samples_count": len(raw_measurements),
"filtered_samples_count": len(measurements),
"radius_m_all_freq": radius_m,
"samples": samples,
}
)
frequency_rows: List[Dict[str, object]] = []
best_row: Optional[Dict[str, object]] = None
all_frequencies = sorted(
{frequency for grouped in grouped_by_receiver for frequency in grouped.keys()}
)
for frequency_hz in all_frequencies:
available_indices = [
idx for idx, grouped in enumerate(grouped_by_receiver) if frequency_hz in grouped
]
if len(available_indices) < 3:
continue
best_combo_row: Optional[Dict[str, object]] = None
best_combo_result = None
best_combo_indices: Optional[Tuple[int, int, int]] = None
best_combo_spheres: Optional[List[Sphere]] = None
for combo in itertools.combinations(available_indices, 3):
spheres_for_frequency: List[Sphere] = []
row_receivers: List[Dict[str, object]] = []
for receiver_index in combo:
receiver = self.receivers[receiver_index]
measurement_subset = grouped_by_receiver[receiver_index][frequency_hz]
radius_m = aggregate_radius(
measurement_subset, model=self.model, method=self.aggregation
)
spheres_for_frequency.append(
Sphere(center=receiver["center"], radius=radius_m)
)
row_receivers.append(
{
"receiver_id": str(receiver["receiver_id"]),
"radius_m": radius_m,
"samples_count": len(measurement_subset),
}
)
result = solve_three_sphere_intersection(
spheres=spheres_for_frequency,
tolerance=self.tolerance,
z_preference=self.z_preference, # type: ignore[arg-type]
)
candidate_row = {
"frequency_hz": frequency_hz,
"frequency_mhz": frequency_hz / HZ_IN_MHZ,
"position": {
"x": result.point[0],
"y": result.point[1],
"z": result.point[2],
},
"exact": result.exact,
"rmse_m": result.rmse,
"receivers": row_receivers,
"used_receivers_count": 3,
"available_receivers_count": len(available_indices),
}
if (
best_combo_row is None
or float(candidate_row["rmse_m"]) < float(best_combo_row["rmse_m"])
):
best_combo_row = candidate_row
best_combo_result = result
best_combo_indices = combo
best_combo_spheres = spheres_for_frequency
if (
best_combo_row is None
or best_combo_result is None
or best_combo_indices is None
or best_combo_spheres is None
):
continue
row_receivers = best_combo_row["receivers"]
for local_index, receiver_index in enumerate(best_combo_indices):
residual = best_combo_result.residuals[local_index]
row_receivers[local_index]["residual_m"] = residual
receiver_payloads[receiver_index].setdefault("per_frequency", []).append(
{
"frequency_hz": frequency_hz,
"frequency_mhz": frequency_hz / HZ_IN_MHZ,
"radius_m": best_combo_spheres[local_index].radius,
"residual_m": residual,
"samples_count": len(grouped_by_receiver[receiver_index][frequency_hz]),
}
)
frequency_rows.append(best_combo_row)
if best_row is None or float(best_combo_row["rmse_m"]) < float(best_row["rmse_m"]):
best_row = best_combo_row
if best_row is None:
if len(self.receivers) == 3:
raise RuntimeError("No common frequencies across all 3 receivers.")
raise RuntimeError("Cannot build frequency table for trilateration.")
payload = {
"timestamp_utc": _utc_now_iso_seconds(),
"selected_frequency_hz": best_row["frequency_hz"],
"selected_frequency_mhz": float(best_row["frequency_hz"]) / HZ_IN_MHZ,
"position": best_row["position"],
"exact": best_row["exact"],
"rmse_m": best_row["rmse_m"],
"frequency_table": frequency_rows,
"model": {
"tx_power_dbm": self.model.tx_power_dbm,
"tx_gain_dbi": self.model.tx_gain_dbi,
"rx_gain_dbi": self.model.rx_gain_dbi,
"path_loss_exponent": self.model.path_loss_exponent,
"reference_distance_m": self.model.reference_distance_m,
},
"receivers": receiver_payloads,
}
with self.state_lock:
self.latest_payload = payload
self.updated_at_utc = payload["timestamp_utc"] # type: ignore[index]
self.last_error = ""
delivery = self._deliver_to_output_servers(payload)
with self.state_lock:
self.last_output_delivery = delivery
if delivery["status"] in ("error", "partial"):
failed_servers = [
row["name"]
for row in delivery.get("servers", [])
if isinstance(row, dict) and row.get("status") == "error"
]
raise RuntimeError(
"Output server(s) rejected payload: "
+ ", ".join(str(name) for name in failed_servers)
)
@staticmethod
def _row_frequency_mhz(row: Dict[str, object]) -> Optional[float]:
mhz = row.get("frequency_mhz")
if isinstance(mhz, (int, float)):
return float(mhz)
hz = row.get("frequency_hz")
if isinstance(hz, (int, float)):
return float(hz) / HZ_IN_MHZ
return None
@staticmethod
def _position_from_row(row: Dict[str, object]) -> Optional[Dict[str, float]]:
position_obj = row.get("position")
if not isinstance(position_obj, dict):
return None
try:
return {
"x": float(position_obj["x"]),
"y": float(position_obj["y"]),
"z": float(position_obj["z"]),
}
except (TypeError, ValueError, KeyError):
return None
def _build_output_payload(
self,
payload: Dict[str, object],
output_server: Dict[str, object],
) -> Optional[Dict[str, object]]:
table_obj = payload.get("frequency_table")
if not isinstance(table_obj, list):
return None
rows: List[Dict[str, object]] = []
for row in table_obj:
if not isinstance(row, dict):
continue
frequency_hz = row.get("frequency_hz")
if not isinstance(frequency_hz, (int, float)):
continue
if self._position_from_row(row) is None:
continue
if bool(output_server.get("frequency_filter_enabled", False)):
if not (
float(output_server.get("min_frequency_hz", 0.0))
<= float(frequency_hz)
<= float(output_server.get("max_frequency_hz", 0.0))
):
continue
rows.append(row)
if not rows:
return None
best_row = min(
rows,
key=lambda row: float(row.get("rmse_m", float("inf"))),
)
best_position = self._position_from_row(best_row)
if best_position is None:
return None
# Minimal transport payload for final server integration: coordinates only.
return best_position
def _deliver_to_output_servers(self, payload: Dict[str, object]) -> Dict[str, object]:
now = _utc_now_iso_seconds()
servers_delivery: List[Dict[str, object]] = []
enabled_targets = [server for server in self.output_servers if bool(server.get("enabled"))]
for server in self.output_servers:
server_delivery = {
"name": server["name"],
"enabled": bool(server["enabled"]),
"status": "disabled",
"http_status": None,
"response_body": "",
"sent_at_utc": now,
"target": {
"ip": server["ip"],
"port": server["port"],
"path": server["path"],
},
"frequency_filter": {
"enabled": server["frequency_filter_enabled"],
"min_frequency_mhz": server["min_frequency_mhz"],
"max_frequency_mhz": server["max_frequency_mhz"],
},
}
if not bool(server["enabled"]):
servers_delivery.append(server_delivery)
continue
output_payload = self._build_output_payload(payload=payload, output_server=server)
if output_payload is None:
server_delivery["status"] = "skipped"
server_delivery["response_body"] = "No frequencies in configured output range"
servers_delivery.append(server_delivery)
continue
status_code, response_body = send_payload_to_server(
server_ip=str(server["ip"]),
payload=output_payload,
port=int(server["port"]),
path=str(server["path"]),
timeout_s=float(server["timeout_s"]),
)
server_delivery["http_status"] = status_code
server_delivery["response_body"] = response_body
server_delivery["status"] = "ok" if 200 <= status_code < 300 else "error"
servers_delivery.append(server_delivery)
ok_count = sum(1 for row in servers_delivery if row["status"] == "ok")
error_count = sum(1 for row in servers_delivery if row["status"] == "error")
skipped_count = sum(1 for row in servers_delivery if row["status"] == "skipped")
if not enabled_targets:
status = "disabled"
elif error_count > 0 and ok_count > 0:
status = "partial"
elif error_count > 0:
status = "error"
elif ok_count == 0 and skipped_count > 0:
status = "skipped"
else:
status = "ok"
primary = next((row for row in servers_delivery if row["enabled"]), None)
if primary is None and servers_delivery:
primary = servers_delivery[0]
return {
"enabled": bool(enabled_targets),
"status": status,
"http_status": None if primary is None else primary["http_status"],
"response_body": "" if primary is None else primary["response_body"],
"sent_at_utc": now,
"target": None if primary is None else primary["target"],
"frequency_filter": None if primary is None else primary["frequency_filter"],
"ok_count": ok_count,
"error_count": error_count,
"skipped_count": skipped_count,
"servers": servers_delivery,
}
def _poll_loop(self) -> None:
while not self.stop_event.is_set():
try:
self.refresh_once()
except Exception as exc:
with self.state_lock:
self.last_error = str(exc)
self.stop_event.wait(self.poll_interval_s)
def snapshot(self) -> Dict[str, object]:
with self.state_lock:
return {
"updated_at_utc": self.updated_at_utc,
"last_error": self.last_error,
"payload": self.latest_payload,
"output_delivery": self.last_output_delivery,
}
def _make_handler(service: AutoService):
service_holder = {"current": service}
service_swap_lock = threading.Lock()
class ServiceHandler(BaseHTTPRequestHandler):
@staticmethod
def _current_service() -> AutoService:
return service_holder["current"]
def _is_write_authorized(self) -> bool:
service_obj = self._current_service()
expected_token = service_obj.write_api_token
if not expected_token:
return True
header_token = self.headers.get("X-API-Token", "")
if hmac.compare_digest(header_token, expected_token):
return True
authorization = self.headers.get("Authorization", "")
if authorization.lower().startswith("bearer "):
bearer_token = authorization[7:].strip()
if hmac.compare_digest(bearer_token, expected_token):
return True
return False
def _write_bytes(
self,
status_code: int,
content: bytes,
content_type: str,
) -> None:
self.send_response(status_code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def _write_json(self, status_code: int, payload: Dict[str, object]) -> None:
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self._write_bytes(
status_code=status_code,
content=raw,
content_type="application/json; charset=utf-8",
)
def _write_static(self, relative_path: str) -> None:
web_root = Path(__file__).resolve().parent / "web"
file_path = (web_root / relative_path).resolve()
# Protect against path traversal outside /web.
try:
file_path.relative_to(web_root.resolve())
except ValueError:
self._write_json(404, {"error": "not_found"})
return
if not file_path.exists() or not file_path.is_file():
self._write_json(404, {"error": "not_found"})
return
mime_type, _ = mimetypes.guess_type(str(file_path))
if mime_type is None:
mime_type = "application/octet-stream"
# Force UTF-8 for text assets to avoid mojibake in browsers.
if mime_type.startswith("text/") or mime_type in (
"application/javascript",
"application/x-javascript",
):
mime_type = f"{mime_type}; charset=utf-8"
self._write_bytes(200, file_path.read_bytes(), mime_type)
def log_message(self, format: str, *args) -> None:
return
def do_GET(self) -> None:
path = parse.urlparse(self.path).path
service_obj = self._current_service()
snapshot = service_obj.snapshot()
if path == "/" or path == "/ui":
self._write_static("index.html")
return
if path.startswith("/static/"):
self._write_static(path.removeprefix("/static/"))
return
if path == "/health":
status = "ok" if snapshot["payload"] else "warming_up"
http_code = 200 if status == "ok" else 503
self._write_json(
http_code,
{
"status": status,
"updated_at_utc": snapshot["updated_at_utc"],
"error": snapshot["last_error"],
},
)
return
if path == "/result":
payload = snapshot["payload"]
if payload is None:
self._write_json(
503,
{
"status": "warming_up",
"updated_at_utc": snapshot["updated_at_utc"],
"error": snapshot["last_error"],
},
)
return
self._write_json(
200,
{
"status": "ok",
"updated_at_utc": snapshot["updated_at_utc"],
"data": payload,
"output_delivery": snapshot["output_delivery"],
},
)
return
if path == "/frequencies":
payload = snapshot["payload"]
if payload is None:
self._write_json(
503,
{
"status": "warming_up",
"updated_at_utc": snapshot["updated_at_utc"],
"error": snapshot["last_error"],
},
)
return
self._write_json(
200,
{
"status": "ok",
"updated_at_utc": snapshot["updated_at_utc"],
"selected_frequency_hz": payload.get("selected_frequency_hz"),
"selected_frequency_mhz": payload.get("selected_frequency_mhz"),
"frequency_table": payload.get("frequency_table", []),
"output_delivery": snapshot["output_delivery"],
},
)
return
if path == "/config":
public_config = json.loads(json.dumps(service_obj.config))
runtime_obj = public_config.get("runtime")
if isinstance(runtime_obj, dict):
if "write_api_token" in runtime_obj:
runtime_obj["write_api_token"] = ""
runtime_obj["write_api_token_set"] = bool(service_obj.write_api_token)
self._write_json(
200,
{
"status": "ok",
"config_path": service_obj.config_path,
"config": public_config,
},
)
return
self._write_json(404, {"error": "not_found"})
def do_POST(self) -> None:
path = parse.urlparse(self.path).path
if not self._is_write_authorized():
self._write_json(
401,
{"status": "error", "error": "unauthorized: missing or invalid API token"},
)
return
if path == "/config":
service_obj = self._current_service()
try:
content_length = int(self.headers.get("Content-Length", "0"))
except ValueError:
self._write_json(400, {"status": "error", "error": "Invalid Content-Length"})
return
if content_length <= 0:
self._write_json(400, {"status": "error", "error": "Empty request body"})
return
if content_length > MAX_CONFIG_BODY_BYTES:
self._write_json(
413,
{
"status": "error",
"error": (
f"Config payload too large: {content_length} bytes, "
f"max is {MAX_CONFIG_BODY_BYTES}"
),
},
)
return
body = self.rfile.read(content_length) if content_length > 0 else b""
try:
new_config = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
self._write_json(400, {"status": "error", "error": f"Invalid JSON: {exc}"})
return
if not isinstance(new_config, dict):
self._write_json(400, {"status": "error", "error": "Config must be JSON object"})
return
# Avoid accidental token wipe when /config GET response is redacted in clients.
runtime_obj = new_config.get("runtime")
if isinstance(runtime_obj, dict) and service_obj.write_api_token:
incoming_token = str(runtime_obj.get("write_api_token", "")).strip()
if not incoming_token:
runtime_obj["write_api_token"] = service_obj.write_api_token
try:
new_service = AutoService(new_config, config_path=service_obj.config_path)
except Exception as exc:
self._write_json(
400,
{"status": "error", "error": f"Config validation failed: {exc}"},
)
return
save_error = ""
if service_obj.config_path:
try:
Path(service_obj.config_path).write_text(
json.dumps(new_config, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except OSError as exc:
save_error = str(exc)
try:
new_service.start()
except Exception as exc:
self._write_json(
500,
{
"status": "error",
"error": f"Failed to start service with new config: {exc}",
},
)
return
with service_swap_lock:
old_service = service_holder["current"]
service_holder["current"] = new_service
old_service.stop()
self._write_json(
200,
{
"status": "ok",
"saved": bool(service_obj.config_path) and not bool(save_error),
"save_error": save_error,
"restart_required": False,
"applied": True,
"config_path": service_obj.config_path,
},
)
return
if path != "/refresh":
self._write_json(404, {"error": "not_found"})
return
try:
self._current_service().refresh_once()
except Exception as exc:
self._write_json(500, {"status": "error", "error": str(exc)})
return
snapshot = self._current_service().snapshot()
self._write_json(
200,
{
"status": "ok",
"updated_at_utc": snapshot["updated_at_utc"],
},
)
ServiceHandler.service_holder = service_holder # type: ignore[attr-defined]
return ServiceHandler
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Automatic trilateration service: polls 3 receiver servers and exposes result API."
)
parser.add_argument("--config", type=str, default="config.json")
parser.add_argument("--host", type=str, default="")
parser.add_argument("--port", type=int, default=0)
return parser.parse_args()
def main() -> int:
args = parse_args()
config = _load_json(args.config)
runtime = config.get("runtime", {})
if not isinstance(runtime, dict):
raise SystemExit("runtime must be object.")
host = args.host or str(runtime.get("listen_host", "0.0.0.0"))
port = args.port or int(runtime.get("listen_port", 8081))
service = AutoService(config, config_path=args.config)
service.start()
handler = _make_handler(service)
server = ThreadingHTTPServer((host, port), handler)
print(f"service_listen: http://{host}:{port}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
current_service = handler.service_holder["current"] # type: ignore[attr-defined]
current_service.stop()
return 0
if __name__ == "__main__":
raise SystemExit(main())