You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2440 lines
98 KiB
Python

from __future__ import annotations
import argparse
import base64
import hmac
import itertools
import json
import math
import mimetypes
import secrets
import statistics
import threading
import time
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
from urllib import error, parse, request
from triangulation import (
PropagationModel,
Sphere,
rssi_to_distance_m,
send_payload_to_server,
solve_three_sphere_intersection,
)
Point3D = Tuple[float, float, float]
MAX_CONFIG_BODY_BYTES = 1_000_000 # 1 MB guardrail for /config POST.
HZ_IN_MHZ = 1_000_000.0
DEFAULT_AUTH_SESSION_TTL_S = 43_200
DEFAULT_AUTH_COOKIE_NAME = "triangulation_session"
DEFAULT_AUTH_PROVIDER = "keycloak"
def _utc_now_iso_seconds() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _load_json(path: str) -> Dict[str, object]:
file_path = Path(path)
if not file_path.exists():
raise SystemExit(f"Config file not found: {path}")
# Accept optional UTF-8 BOM to avoid startup failures with edited JSON files.
with file_path.open("r", encoding="utf-8-sig") as fh:
data = json.load(fh)
if not isinstance(data, dict):
raise SystemExit("Config root must be a JSON object.")
return data
def _base64url_json(segment: str) -> Dict[str, object]:
raw = str(segment or "").strip()
if not raw:
return {}
padding = "=" * (-len(raw) % 4)
try:
decoded = base64.urlsafe_b64decode(raw + padding).decode("utf-8")
payload = json.loads(decoded)
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def _decode_jwt_payload_unverified(token: str) -> Dict[str, object]:
parts = str(token or "").split(".")
if len(parts) < 2:
return {}
return _base64url_json(parts[1])
def _parse_auth_config(config: Dict[str, object]) -> Dict[str, object]:
auth_obj = config.get("auth", {})
if auth_obj is None:
auth_obj = {}
if not isinstance(auth_obj, dict):
raise ValueError("auth must be object.")
enabled = bool(auth_obj.get("enabled", False))
provider = str(auth_obj.get("provider", DEFAULT_AUTH_PROVIDER)).strip().lower()
if provider not in ("keycloak",):
raise ValueError("auth.provider must be 'keycloak'.")
session_ttl_s = int(auth_obj.get("session_ttl_s", DEFAULT_AUTH_SESSION_TTL_S))
if session_ttl_s <= 0:
raise ValueError("auth.session_ttl_s must be > 0.")
cookie_name = str(auth_obj.get("cookie_name", DEFAULT_AUTH_COOKIE_NAME)).strip()
if not cookie_name:
raise ValueError("auth.cookie_name must be non-empty.")
keycloak_obj = auth_obj.get("keycloak", {})
if keycloak_obj is None:
keycloak_obj = {}
if not isinstance(keycloak_obj, dict):
raise ValueError("auth.keycloak must be object.")
base_url = str(keycloak_obj.get("base_url", "")).strip().rstrip("/")
realm = str(keycloak_obj.get("realm", "")).strip()
client_id = str(keycloak_obj.get("client_id", "")).strip()
client_secret = str(keycloak_obj.get("client_secret", "")).strip()
admin_client_id = str(keycloak_obj.get("admin_client_id", "")).strip()
admin_client_secret = str(keycloak_obj.get("admin_client_secret", "")).strip()
user_role = str(keycloak_obj.get("user_role", "triangulation_user")).strip()
admin_role = str(keycloak_obj.get("admin_role", "triangulation_admin")).strip()
admin_console_url = str(keycloak_obj.get("admin_console_url", "")).strip()
if enabled:
if not base_url:
raise ValueError("auth.keycloak.base_url must be non-empty when auth.enabled=true.")
if not realm:
raise ValueError("auth.keycloak.realm must be non-empty when auth.enabled=true.")
if not client_id:
raise ValueError("auth.keycloak.client_id must be non-empty when auth.enabled=true.")
if not client_secret:
raise ValueError("auth.keycloak.client_secret must be non-empty when auth.enabled=true.")
if not admin_client_id:
raise ValueError(
"auth.keycloak.admin_client_id must be non-empty when auth.enabled=true."
)
if not admin_client_secret:
raise ValueError(
"auth.keycloak.admin_client_secret must be non-empty when auth.enabled=true."
)
if not user_role or not admin_role:
raise ValueError("auth.keycloak.user_role/admin_role must be non-empty.")
return {
"enabled": enabled,
"provider": provider,
"session_ttl_s": session_ttl_s,
"cookie_name": cookie_name,
"keycloak": {
"base_url": base_url,
"realm": realm,
"client_id": client_id,
"client_secret": client_secret,
"admin_client_id": admin_client_id,
"admin_client_secret": admin_client_secret,
"user_role": user_role,
"admin_role": admin_role,
"admin_console_url": admin_console_url,
},
}
def _public_config_view(config: Dict[str, object], write_api_token_set: bool) -> Dict[str, object]:
public_config = json.loads(json.dumps(config))
runtime_obj = public_config.get("runtime")
if isinstance(runtime_obj, dict):
if "write_api_token" in runtime_obj:
runtime_obj["write_api_token"] = ""
runtime_obj["write_api_token_set"] = bool(write_api_token_set)
auth_obj = public_config.get("auth")
if isinstance(auth_obj, dict):
keycloak_obj = auth_obj.get("keycloak")
if isinstance(keycloak_obj, dict):
source_auth_obj = config.get("auth", {})
source_keycloak_obj = (
source_auth_obj.get("keycloak", {})
if isinstance(source_auth_obj, dict)
else {}
)
if "client_secret" in keycloak_obj:
keycloak_obj["client_secret"] = ""
keycloak_obj["client_secret_set"] = bool(
str(
source_keycloak_obj.get("client_secret", "")
if isinstance(source_keycloak_obj, dict)
else ""
).strip()
)
if "admin_client_secret" in keycloak_obj:
keycloak_obj["admin_client_secret"] = ""
keycloak_obj["admin_client_secret_set"] = bool(
str(
source_keycloak_obj.get("admin_client_secret", "")
if isinstance(source_keycloak_obj, dict)
else ""
).strip()
)
return public_config
def _preserve_sensitive_config_values(
current_service: "AutoService",
new_config: Dict[str, object],
) -> None:
runtime_obj = new_config.get("runtime")
if isinstance(runtime_obj, dict) and current_service.write_api_token:
incoming_token = str(runtime_obj.get("write_api_token", "")).strip()
if not incoming_token:
runtime_obj["write_api_token"] = current_service.write_api_token
auth_obj = new_config.get("auth")
if not isinstance(auth_obj, dict):
return
keycloak_obj = auth_obj.get("keycloak")
if not isinstance(keycloak_obj, dict):
return
if current_service.keycloak_client_secret and not str(
keycloak_obj.get("client_secret", "")
).strip():
keycloak_obj["client_secret"] = current_service.keycloak_client_secret
if current_service.keycloak_admin_client_secret and not str(
keycloak_obj.get("admin_client_secret", "")
).strip():
keycloak_obj["admin_client_secret"] = current_service.keycloak_admin_client_secret
def _center_from_obj(obj: Dict[str, object]) -> Point3D:
center = obj.get("center")
if not isinstance(center, dict):
raise ValueError("Receiver center must be an object.")
return (float(center["x"]), float(center["y"]), float(center["z"]))
def _parse_model(config: Dict[str, object]) -> PropagationModel:
model_obj = config.get("model")
if not isinstance(model_obj, dict):
raise ValueError("Config must contain object 'model'.")
return PropagationModel(
tx_power_dbm=float(model_obj["tx_power_dbm"]),
tx_gain_dbi=float(model_obj.get("tx_gain_dbi", 0.0)),
rx_gain_dbi=float(model_obj.get("rx_gain_dbi", 0.0)),
path_loss_exponent=float(model_obj.get("path_loss_exponent", 2.0)),
reference_distance_m=float(model_obj.get("reference_distance_m", 1.0)),
min_distance_m=float(model_obj.get("min_distance_m", 1e-3)),
)
def _float_from_measurement(
item: Dict[str, object],
keys: Sequence[str],
field_name: str,
source_label: str,
row_index: int,
) -> float:
for key in keys:
if key in item:
value = item[key]
try:
parsed = float(value)
except (TypeError, ValueError):
raise ValueError(
f"{source_label}: row #{row_index} field '{key}' must be numeric, got {value!r}."
) from None
if not math.isfinite(parsed):
raise ValueError(
f"{source_label}: row #{row_index} field '{key}' must be finite, got {value!r}."
)
return parsed
raise ValueError(f"{source_label}: row #{row_index} missing field '{field_name}'.")
def _float_with_key_from_measurement(
item: Dict[str, object],
keys: Sequence[str],
field_name: str,
source_label: str,
row_index: int,
) -> Tuple[str, float]:
for key in keys:
if key in item:
value = _float_from_measurement(
item=item,
keys=(key,),
field_name=field_name,
source_label=source_label,
row_index=row_index,
)
return key, value
raise ValueError(f"{source_label}: row #{row_index} missing field '{field_name}'.")
def _parse_frequency_hz_from_measurement(
row: Dict[str, object],
source_label: str,
row_index: int,
) -> float:
key, value = _float_with_key_from_measurement(
row,
keys=(
"frequency_hz",
"freq_hz",
"f_hz",
"frequency_mhz",
"freq_mhz",
"f_mhz",
"frequency",
"freq",
"f",
),
field_name="frequency",
source_label=source_label,
row_index=row_index,
)
if key in ("frequency_hz", "freq_hz", "f_hz"):
return value
if key in ("frequency_mhz", "freq_mhz", "f_mhz"):
return value * HZ_IN_MHZ
# For generic fields default to MHz in this project.
# Keep backward compatibility: very large values are treated as Hz.
if value >= 10_000_000.0:
return value
return value * HZ_IN_MHZ
def _parse_receiver_input_filter(
receiver_obj: Dict[str, object],
receiver_id: str,
default_filter_obj: Optional[Dict[str, object]] = None,
) -> Dict[str, object]:
raw_receiver_filter = receiver_obj.get("input_filter")
if raw_receiver_filter is None:
raw_receiver_filter = {}
if not isinstance(raw_receiver_filter, dict):
raise ValueError(f"receiver '{receiver_id}': input_filter must be an object.")
merged_filter: Dict[str, object] = {}
if isinstance(default_filter_obj, dict):
merged_filter.update(default_filter_obj)
merged_filter.update(raw_receiver_filter)
filter_obj = merged_filter
if not isinstance(filter_obj, dict):
raise ValueError(f"receiver '{receiver_id}': input_filter must be an object.")
min_freq_mhz_raw = filter_obj.get("min_frequency_mhz")
max_freq_mhz_raw = filter_obj.get("max_frequency_mhz")
if min_freq_mhz_raw is None and "min_frequency_hz" in filter_obj:
min_freq_mhz_raw = float(filter_obj["min_frequency_hz"]) / HZ_IN_MHZ
if max_freq_mhz_raw is None and "max_frequency_hz" in filter_obj:
max_freq_mhz_raw = float(filter_obj["max_frequency_hz"]) / HZ_IN_MHZ
parsed = {
"enabled": bool(filter_obj.get("enabled", False)),
"min_frequency_mhz": float(min_freq_mhz_raw if min_freq_mhz_raw is not None else 0.0),
"max_frequency_mhz": float(max_freq_mhz_raw if max_freq_mhz_raw is not None else 1_000_000_000.0),
"min_rssi_dbm": float(filter_obj.get("min_rssi_dbm", -200.0)),
"max_rssi_dbm": float(filter_obj.get("max_rssi_dbm", 50.0)),
}
if parsed["max_frequency_mhz"] < parsed["min_frequency_mhz"]:
raise ValueError(
f"receiver '{receiver_id}': input_filter.max_frequency_mhz must be >= min_frequency_mhz."
)
if parsed["max_rssi_dbm"] < parsed["min_rssi_dbm"]:
raise ValueError(
f"receiver '{receiver_id}': input_filter.max_rssi_dbm must be >= min_rssi_dbm."
)
return parsed
def _apply_receiver_input_filter(
measurements: Sequence[Tuple[float, float]],
receiver_filter: Dict[str, object],
) -> List[Tuple[float, float]]:
if not bool(receiver_filter.get("enabled", False)):
return list(measurements)
min_frequency_mhz = float(receiver_filter["min_frequency_mhz"])
max_frequency_mhz = float(receiver_filter["max_frequency_mhz"])
min_rssi_dbm = float(receiver_filter["min_rssi_dbm"])
max_rssi_dbm = float(receiver_filter["max_rssi_dbm"])
filtered = []
for frequency_hz, rssi_dbm in measurements:
frequency_mhz = frequency_hz / HZ_IN_MHZ
if not (min_frequency_mhz <= frequency_mhz <= max_frequency_mhz):
continue
if not (min_rssi_dbm <= rssi_dbm <= max_rssi_dbm):
continue
filtered.append((frequency_hz, rssi_dbm))
return filtered
def _parse_receiver_configured_frequencies(
receiver_obj: Dict[str, object],
receiver_id: str,
) -> List[int]:
raw_frequencies = receiver_obj.get("frequencies_mhz")
if raw_frequencies is None:
return []
if not isinstance(raw_frequencies, list):
raise ValueError(
f"receiver '{receiver_id}': frequencies_mhz must be an array of numbers."
)
parsed_hz: List[int] = []
for index, value in enumerate(raw_frequencies, start=1):
try:
frequency_mhz = float(value)
except (TypeError, ValueError):
raise ValueError(
f"receiver '{receiver_id}': frequencies_mhz[{index}] must be numeric."
) from None
if not math.isfinite(frequency_mhz) or frequency_mhz <= 0.0:
raise ValueError(
f"receiver '{receiver_id}': frequencies_mhz[{index}] must be > 0."
)
parsed_hz.append(int(round(frequency_mhz * HZ_IN_MHZ)))
return sorted(set(parsed_hz))
def _apply_receiver_configured_frequencies(
measurements: Sequence[Tuple[float, float]],
configured_frequencies_hz: Sequence[int],
) -> List[Tuple[float, float]]:
if not configured_frequencies_hz:
return list(measurements)
allowed = set(int(value) for value in configured_frequencies_hz)
filtered: List[Tuple[float, float]] = []
for frequency_hz, rssi_dbm in measurements:
rounded_hz = int(round(frequency_hz))
if rounded_hz in allowed:
filtered.append((float(rounded_hz), rssi_dbm))
return filtered
def _parse_output_server_config(
output_obj: Dict[str, object],
default_name: str,
) -> Dict[str, object]:
name = str(output_obj.get("name", default_name)).strip() or default_name
ip = str(output_obj.get("ip", "")).strip()
# Keep backward compatibility for explicit enabled flag, but allow simplified config:
# if enabled is omitted, non-empty IP means enabled output target.
if "enabled" in output_obj:
enabled = bool(output_obj.get("enabled"))
else:
enabled = bool(ip)
port = int(output_obj.get("port", 8080))
path = str(output_obj.get("path", "/triangulation"))
timeout_s = float(output_obj.get("timeout_s", 3.0))
frequency_filter_enabled = bool(output_obj.get("frequency_filter_enabled", False))
min_frequency_mhz_raw = output_obj.get("min_frequency_mhz")
max_frequency_mhz_raw = output_obj.get("max_frequency_mhz")
if min_frequency_mhz_raw is None and "min_frequency_hz" in output_obj:
min_frequency_mhz_raw = float(output_obj["min_frequency_hz"]) / HZ_IN_MHZ
if max_frequency_mhz_raw is None and "max_frequency_hz" in output_obj:
max_frequency_mhz_raw = float(output_obj["max_frequency_hz"]) / HZ_IN_MHZ
min_frequency_mhz = float(min_frequency_mhz_raw or 0.0)
max_frequency_mhz = float(max_frequency_mhz_raw or 0.0)
min_frequency_hz = min_frequency_mhz * HZ_IN_MHZ
max_frequency_hz = max_frequency_mhz * HZ_IN_MHZ
if enabled and not ip:
raise ValueError(f"runtime output '{name}': ip must be non-empty when enabled=true.")
if frequency_filter_enabled:
if min_frequency_mhz <= 0.0:
raise ValueError(
f"runtime output '{name}': min_frequency_mhz must be > 0 when frequency filter is enabled."
)
if max_frequency_mhz <= 0.0:
raise ValueError(
f"runtime output '{name}': max_frequency_mhz must be > 0 when frequency filter is enabled."
)
if max_frequency_mhz < min_frequency_mhz:
raise ValueError(
f"runtime output '{name}': max_frequency_mhz must be >= min_frequency_mhz."
)
return {
"name": name,
"enabled": enabled,
"ip": ip,
"port": port,
"path": path,
"timeout_s": timeout_s,
"frequency_filter_enabled": frequency_filter_enabled,
"min_frequency_mhz": min_frequency_mhz,
"max_frequency_mhz": max_frequency_mhz,
"min_frequency_hz": min_frequency_hz,
"max_frequency_hz": max_frequency_hz,
}
def parse_source_payload(
payload: object,
source_label: str,
expected_receiver_id: Optional[str] = None,
) -> List[Tuple[float, float]]:
if isinstance(payload, dict):
if expected_receiver_id is not None and "receiver_id" in payload:
payload_receiver_id = str(payload["receiver_id"])
if payload_receiver_id != expected_receiver_id:
# Keep processing measurements even if upstream payload ID differs
# from the configured receiver_id. This allows safe UI renaming
# without breaking data collection from legacy sources.
pass
raw_items = payload.get("measurements")
if raw_items is None:
raw_items = payload.get("samples")
if raw_items is None:
raw_items = payload.get("data")
if raw_items is None:
raw_items = payload.get("m")
elif isinstance(payload, list):
raw_items = payload
else:
raise ValueError(f"{source_label}: payload must be list or object.")
if not isinstance(raw_items, list) or not raw_items:
raise ValueError(f"{source_label}: payload contains no measurements.")
parsed_items: List[Tuple[float, float]] = []
for row_index, row in enumerate(raw_items, start=1):
if not isinstance(row, dict):
raise ValueError(f"{source_label}: row #{row_index} must be an object.")
frequency_hz = _parse_frequency_hz_from_measurement(
row=row,
source_label=source_label,
row_index=row_index,
)
amplitude_dbm = _float_from_measurement(
row,
keys=("amplitude_dbm", "rssi_dbm", "dbm", "amplitude", "rssi"),
field_name="amplitude_dbm",
source_label=source_label,
row_index=row_index,
)
if frequency_hz <= 0.0:
raise ValueError(
f"{source_label}: row #{row_index} field 'frequency_hz' must be > 0."
)
parsed_items.append((frequency_hz, amplitude_dbm))
return parsed_items
def aggregate_radius(
measurements: Sequence[Tuple[float, float]],
model: PropagationModel,
method: str,
) -> float:
distances = [
rssi_to_distance_m(amplitude_dbm=amplitude_dbm, frequency_hz=frequency_hz, model=model)
for frequency_hz, amplitude_dbm in measurements
]
if method == "median":
return float(statistics.median(distances))
if method == "mean":
return float(sum(distances) / len(distances))
raise ValueError("aggregation must be 'median' or 'mean'")
def _group_by_frequency(
measurements: Sequence[Tuple[float, float]],
) -> Dict[float, List[Tuple[float, float]]]:
grouped: Dict[float, List[Tuple[float, float]]] = {}
for frequency_hz, amplitude_dbm in measurements:
if frequency_hz not in grouped:
grouped[frequency_hz] = []
grouped[frequency_hz].append((frequency_hz, amplitude_dbm))
return grouped
def _fetch_measurements(
url: str,
timeout_s: float,
expected_receiver_id: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
) -> List[Tuple[float, float]]:
source_label = f"source_url={url}"
request_headers = {"Accept": "application/json"}
if headers:
request_headers.update(headers)
req = request.Request(url=url, method="GET", headers=request_headers)
try:
with request.urlopen(req, timeout=timeout_s) as response:
payload = json.loads(response.read().decode("utf-8"))
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} for '{url}': {body}")
except error.URLError as exc:
raise RuntimeError(f"Cannot reach '{url}': {exc.reason}")
except TimeoutError:
raise RuntimeError(f"Timeout while reading '{url}'")
except json.JSONDecodeError as exc:
raise RuntimeError(f"Invalid JSON from '{url}': {exc}")
try:
return parse_source_payload(
payload=payload,
source_label=source_label,
expected_receiver_id=expected_receiver_id,
)
except ValueError as exc:
raise RuntimeError(str(exc)) from None
def _parse_json_object(raw_text: str) -> Dict[str, object]:
if not raw_text.strip():
return {}
try:
parsed = json.loads(raw_text)
except json.JSONDecodeError:
return {"raw": raw_text}
if isinstance(parsed, dict):
return parsed
return {"value": parsed}
def _http_json_request(
url: str,
method: str = "GET",
payload: Optional[Dict[str, object]] = None,
form: Optional[Dict[str, object]] = None,
headers: Optional[Dict[str, str]] = None,
timeout_s: float = 2.0,
) -> Tuple[int, Dict[str, object], str]:
request_headers = {"Accept": "application/json"}
body: Optional[bytes] = None
if payload is not None:
request_headers["Content-Type"] = "application/json"
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
elif form is not None:
request_headers["Content-Type"] = "application/x-www-form-urlencoded"
body = parse.urlencode(
{str(key): str(value) for key, value in form.items()},
doseq=True,
).encode("utf-8")
if isinstance(headers, dict):
request_headers.update({str(key): str(value) for key, value in headers.items()})
req = request.Request(url=url, method=method, headers=request_headers, data=body)
try:
with request.urlopen(req, timeout=timeout_s) as response:
text = response.read().decode("utf-8", errors="replace")
return int(response.status), _parse_json_object(text), ""
except error.HTTPError as exc:
text = exc.read().decode("utf-8", errors="replace")
return int(exc.code), _parse_json_object(text), ""
except Exception as exc: # pragma: no cover - network/IO branches
return 0, {}, str(exc)
def _receiver_control_urls(source_url: str) -> Tuple[str, str]:
parts = parse.urlsplit(source_url)
if parts.scheme not in ("http", "https") or not parts.netloc:
raise ValueError(f"Unsupported source URL: {source_url}")
control_url = parse.urlunsplit((parts.scheme, parts.netloc, "/control", "", ""))
status_url = parse.urlunsplit((parts.scheme, parts.netloc, "/status", "", ""))
return control_url, status_url
def _output_control_urls(output_server: Dict[str, object]) -> Tuple[str, str]:
ip = str(output_server.get("ip", "")).strip()
port = int(output_server.get("port", 8080))
if not ip:
raise ValueError("Output server has empty ip.")
base = f"http://{ip}:{port}"
return f"{base}/control", f"{base}/status"
def _keycloak_admin_request(
service: "AutoService",
path: str,
method: str = "GET",
payload: Optional[Dict[str, object]] = None,
json_body: Optional[object] = None,
timeout_s: float = 5.0,
) -> Tuple[int, object, str]:
access_token = service.get_admin_access_token()
url = f"{service.keycloak_admin_api_base()}{path}"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {access_token}",
}
body: Optional[bytes] = None
if json_body is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(json_body, ensure_ascii=False).encode("utf-8")
elif payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request(url=url, method=method, headers=headers, data=body)
try:
with request.urlopen(req, timeout=timeout_s) as response:
raw = response.read().decode("utf-8", errors="replace")
parsed = json.loads(raw) if raw.strip() else {}
return int(response.status), parsed, ""
except error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace")
try:
parsed = json.loads(raw) if raw.strip() else {}
except json.JSONDecodeError:
parsed = {"raw": raw}
return int(exc.code), parsed, ""
except Exception as exc: # pragma: no cover - network/IO branches
return 0, {}, str(exc)
def _keycloak_get_user_id_by_username(service: "AutoService", username: str) -> Optional[str]:
status_code, payload, request_error = _keycloak_admin_request(
service,
f"/users?username={parse.quote(str(username or '').strip())}&exact=true",
)
if request_error or status_code < 200 or status_code >= 300 or not isinstance(payload, list):
return None
for row in payload:
if not isinstance(row, dict):
continue
row_username = str(row.get("username", "")).strip()
if row_username.casefold() == str(username or "").strip().casefold():
user_id = str(row.get("id", "")).strip()
if user_id:
return user_id
return None
def _keycloak_get_role_representation(
service: "AutoService",
role_name: str,
) -> Dict[str, object]:
status_code, payload, request_error = _keycloak_admin_request(
service,
f"/roles/{parse.quote(role_name)}",
)
if request_error:
raise RuntimeError(f"Keycloak role request failed: {request_error}")
if status_code < 200 or status_code >= 300 or not isinstance(payload, dict):
raise RuntimeError(
f"Keycloak role request failed: {payload.get('errorMessage') if isinstance(payload, dict) else status_code}"
)
return payload
def _keycloak_list_users(service: "AutoService") -> List[Dict[str, object]]:
status_code, payload, request_error = _keycloak_admin_request(service, "/users?max=200")
if request_error:
raise RuntimeError(f"Keycloak users request failed: {request_error}")
if status_code < 200 or status_code >= 300 or not isinstance(payload, list):
raise RuntimeError(
f"Keycloak users request failed: {payload.get('errorMessage') if isinstance(payload, dict) else status_code}"
)
rows: List[Dict[str, object]] = []
for row in payload:
if not isinstance(row, dict):
continue
username = str(row.get("username", "")).strip()
user_id = str(row.get("id", "")).strip()
if not username or not user_id or username.startswith("service-account-"):
continue
role_status, role_payload, role_error = _keycloak_admin_request(
service,
f"/users/{parse.quote(user_id)}/role-mappings/realm",
)
if role_error:
raise RuntimeError(f"Keycloak role mappings request failed: {role_error}")
if role_status < 200 or role_status >= 300 or not isinstance(role_payload, list):
raise RuntimeError("Keycloak role mappings request failed.")
role_names = {
str(role_row.get("name", "")).strip()
for role_row in role_payload
if isinstance(role_row, dict)
}
if service.keycloak_admin_role in role_names:
app_role = "admin"
elif service.keycloak_user_role in role_names:
app_role = "user"
else:
app_role = ""
rows.append(
{
"id": user_id,
"username": username,
"enabled": bool(row.get("enabled", True)),
"first_name": str(row.get("firstName", "")).strip(),
"last_name": str(row.get("lastName", "")).strip(),
"role": app_role or "user",
}
)
return sorted(rows, key=lambda item: str(item.get("username", "")).casefold())
def _keycloak_apply_user_role(service: "AutoService", user_id: str, role: str) -> None:
target_role = service.keycloak_admin_role if role == "admin" else service.keycloak_user_role
other_role = service.keycloak_user_role if role == "admin" else service.keycloak_admin_role
target_repr = _keycloak_get_role_representation(service, target_role)
other_repr = _keycloak_get_role_representation(service, other_role)
status_code, _, request_error = _keycloak_admin_request(
service,
f"/users/{parse.quote(user_id)}/role-mappings/realm",
method="POST",
json_body=[target_repr],
)
if request_error or status_code < 200 or status_code >= 300:
raise RuntimeError("Failed to assign Keycloak role.")
_keycloak_admin_request(
service,
f"/users/{parse.quote(user_id)}/role-mappings/realm",
method="DELETE",
json_body=[other_repr],
)
def _keycloak_reset_password(service: "AutoService", user_id: str, password: str) -> None:
status_code, _, request_error = _keycloak_admin_request(
service,
f"/users/{parse.quote(user_id)}/reset-password",
method="PUT",
json_body={
"type": "password",
"temporary": False,
"value": password,
},
)
if request_error or status_code < 200 or status_code >= 300:
raise RuntimeError("Failed to update Keycloak password.")
def _receiver_configured_frequencies_mhz(receiver: Dict[str, object]) -> List[float]:
configured_hz = receiver.get("configured_frequencies_hz")
if not isinstance(configured_hz, list):
return []
values: List[float] = []
seen = set()
for hz_value in configured_hz:
try:
mhz_value = float(hz_value) / HZ_IN_MHZ
except (TypeError, ValueError):
continue
if not math.isfinite(mhz_value) or mhz_value <= 0.0:
continue
normalized = round(mhz_value, 6)
if normalized in seen:
continue
seen.add(normalized)
values.append(normalized)
return values
def _collect_mock_controls(service: "AutoService") -> Dict[str, object]:
inputs: List[Dict[str, object]] = []
for receiver in service.receivers:
receiver_id = str(receiver.get("receiver_id", ""))
source_url = str(receiver.get("source_url", ""))
row: Dict[str, object] = {
"id": receiver_id,
"name": receiver_id,
"source_url": source_url,
"reachable": False,
"enabled": None,
"configured_frequencies_mhz": _receiver_configured_frequencies_mhz(receiver),
"frequencies_mhz": None,
"error": "",
}
try:
_, status_url = _receiver_control_urls(source_url)
status_code, payload, request_error = _http_json_request(status_url, timeout_s=1.5)
row["status_url"] = status_url
if request_error:
row["error"] = request_error
else:
row["reachable"] = status_code > 0
enabled_value = payload.get("enabled")
if isinstance(enabled_value, bool):
row["enabled"] = enabled_value
frequencies_value = payload.get("frequencies_mhz")
if isinstance(frequencies_value, list):
parsed_frequencies: List[float] = []
for value in frequencies_value:
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if math.isfinite(numeric) and numeric > 0.0:
parsed_frequencies.append(round(numeric, 6))
if parsed_frequencies:
row["frequencies_mhz"] = parsed_frequencies
if status_code >= 400:
row["error"] = str(payload.get("error", f"HTTP {status_code}"))
except Exception as exc:
row["error"] = str(exc)
inputs.append(row)
outputs: List[Dict[str, object]] = []
for output_server in service.output_servers:
name = str(output_server.get("name", "output"))
row = {
"id": name,
"name": name,
"reachable": False,
"accept_writes": None,
"error": "",
}
try:
_, status_url = _output_control_urls(output_server)
status_code, payload, request_error = _http_json_request(status_url, timeout_s=1.5)
row["status_url"] = status_url
if request_error:
row["error"] = request_error
else:
row["reachable"] = status_code > 0
accept_value = payload.get("accept_writes")
if isinstance(accept_value, bool):
row["accept_writes"] = accept_value
if status_code >= 400:
row["error"] = str(payload.get("error", f"HTTP {status_code}"))
except Exception as exc:
row["error"] = str(exc)
outputs.append(row)
return {
"status": "ok",
"inputs": inputs,
"outputs": outputs,
}
def _set_mock_control(
service: "AutoService",
target: str,
target_id: str,
enabled: Optional[bool] = None,
frequencies_mhz: Optional[List[float]] = None,
) -> Dict[str, object]:
if target == "input":
receiver = next(
(
row
for row in service.receivers
if str(row.get("receiver_id", "")) == target_id
),
None,
)
if receiver is None:
raise ValueError(f"Input receiver '{target_id}' not found.")
control_url, _ = _receiver_control_urls(str(receiver.get("source_url", "")))
control_payload: Dict[str, object] = {}
if isinstance(enabled, bool):
control_payload["enabled"] = enabled
if isinstance(frequencies_mhz, list):
control_payload["frequencies_mhz"] = list(frequencies_mhz)
if not control_payload:
raise ValueError("Input control requires 'enabled' or 'frequencies_mhz'.")
status_code, payload, request_error = _http_json_request(
control_url,
method="POST",
payload=control_payload,
timeout_s=2.0,
)
if request_error:
raise RuntimeError(request_error)
if status_code < 200 or status_code >= 300:
raise RuntimeError(str(payload.get("error", f"HTTP {status_code}")))
message_parts: List[str] = []
if isinstance(enabled, bool):
action = "запущена" if enabled else "остановлена"
message_parts.append(f"Передача входных данных '{target_id}' {action}.")
if isinstance(frequencies_mhz, list):
frequencies_text = ", ".join(
f"{value:.6f}".rstrip("0").rstrip(".")
for value in frequencies_mhz
)
message_parts.append(
f"Частоты '{target_id}' обновлены: [{frequencies_text}] МГц."
)
message = " ".join(part for part in message_parts if part).strip() or "Настройки входа обновлены."
return {
"status": "ok",
"target": "input",
"id": target_id,
"enabled": enabled,
"frequencies_mhz": list(frequencies_mhz) if isinstance(frequencies_mhz, list) else None,
"message": message,
}
if target == "output":
if not isinstance(enabled, bool):
raise ValueError("Output control requires boolean 'enabled'.")
output_server = next(
(
row
for row in service.output_servers
if str(row.get("name", "")) == target_id
),
None,
)
if output_server is None:
raise ValueError(f"Output server '{target_id}' not found.")
control_url, _ = _output_control_urls(output_server)
status_code, payload, request_error = _http_json_request(
control_url,
method="POST",
payload={"accept_writes": enabled},
timeout_s=2.0,
)
if request_error:
raise RuntimeError(request_error)
if status_code < 200 or status_code >= 300:
raise RuntimeError(str(payload.get("error", f"HTTP {status_code}")))
action = "запущен" if enabled else "остановлен"
return {
"status": "ok",
"target": "output",
"id": target_id,
"enabled": enabled,
"message": f"Приём на выходе '{target_id}' {action}.",
}
raise ValueError("target must be 'input' or 'output'.")
def _sync_mock_input_frequencies(service: "AutoService") -> List[str]:
errors: List[str] = []
for receiver in service.receivers:
receiver_id = str(receiver.get("receiver_id", ""))
if not receiver_id:
continue
frequencies_mhz = _receiver_configured_frequencies_mhz(receiver)
if not frequencies_mhz:
continue
try:
_set_mock_control(
service=service,
target="input",
target_id=receiver_id,
enabled=None,
frequencies_mhz=frequencies_mhz,
)
except Exception as exc:
errors.append(f"{receiver_id}: {exc}")
return errors
class AutoService:
def __init__(self, config: Dict[str, object], config_path: Optional[str] = None) -> None:
self.config = config
self.config_path = config_path
self.model = _parse_model(config)
auth_config = _parse_auth_config(config)
self.auth_enabled = bool(auth_config["enabled"])
self.auth_provider = str(auth_config["provider"])
self.auth_session_ttl_s = int(auth_config["session_ttl_s"])
self.auth_cookie_name = str(auth_config["cookie_name"])
keycloak_config = auth_config["keycloak"]
self.keycloak_base_url = str(keycloak_config["base_url"])
self.keycloak_realm = str(keycloak_config["realm"])
self.keycloak_client_id = str(keycloak_config["client_id"])
self.keycloak_client_secret = str(keycloak_config["client_secret"])
self.keycloak_admin_client_id = str(keycloak_config["admin_client_id"])
self.keycloak_admin_client_secret = str(keycloak_config["admin_client_secret"])
self.keycloak_user_role = str(keycloak_config["user_role"])
self.keycloak_admin_role = str(keycloak_config["admin_role"])
self.keycloak_admin_console_url = str(keycloak_config["admin_console_url"])
self.keycloak_admin_token_cache: Dict[str, object] = {
"access_token": "",
"expires_at": 0.0,
}
self.keycloak_admin_token_lock = threading.Lock()
solver_obj = config.get("solver", {})
runtime_obj = config.get("runtime", {})
input_obj = config.get("input")
if not isinstance(solver_obj, dict):
raise ValueError("solver must be object.")
if not isinstance(runtime_obj, dict):
raise ValueError("runtime must be object.")
if not isinstance(input_obj, dict):
raise ValueError("input must be object.")
self.tolerance = float(solver_obj.get("tolerance", 1e-3))
self.z_preference = str(solver_obj.get("z_preference", "positive"))
if self.z_preference not in ("positive", "negative"):
raise ValueError("solver.z_preference must be 'positive' or 'negative'.")
self.poll_interval_s = float(runtime_obj.get("poll_interval_s", 1.0))
self.write_api_token = str(runtime_obj.get("write_api_token", "")).strip()
self.mock_input_frequency_sync_enabled = bool(
runtime_obj.get("mock_input_frequency_sync", False)
)
self.last_mock_sync_errors: List[str] = []
parsed_output_servers: List[Dict[str, object]] = []
output_servers_obj = runtime_obj.get("output_servers")
if output_servers_obj is not None:
if not isinstance(output_servers_obj, list):
raise ValueError("runtime.output_servers must be list.")
for index, output_obj in enumerate(output_servers_obj, start=1):
if not isinstance(output_obj, dict):
raise ValueError("runtime.output_servers[] must be object.")
parsed_output_servers.append(
_parse_output_server_config(
output_obj=output_obj,
default_name=f"output_{index}",
)
)
else:
output_obj = runtime_obj.get("output_server", {})
if output_obj is None:
output_obj = {}
if not isinstance(output_obj, dict):
raise ValueError("runtime.output_server must be object.")
parsed_output_servers.append(
_parse_output_server_config(
output_obj=output_obj,
default_name="output_1",
)
)
self.output_servers = parsed_output_servers
self.output_enabled = any(bool(server.get("enabled")) for server in self.output_servers)
self.source_timeout_s = float(input_obj.get("source_timeout_s", 3.0))
self.aggregation = str(input_obj.get("aggregation", "median"))
if self.aggregation not in ("median", "mean"):
raise ValueError("input.aggregation must be 'median' or 'mean'.")
input_mode = str(input_obj.get("mode", "http_sources"))
if input_mode != "http_sources":
raise ValueError("Automatic service requires input.mode = 'http_sources'.")
raw_default_filter = input_obj.get("default_input_filter")
default_filter_obj: Optional[Dict[str, object]] = None
if raw_default_filter is not None:
if not isinstance(raw_default_filter, dict):
raise ValueError("input.default_input_filter must be object.")
default_filter_obj = raw_default_filter
receivers = input_obj.get("receivers")
if not isinstance(receivers, list) or len(receivers) < 3:
raise ValueError("input.receivers must contain at least 3 objects.")
parsed_receivers: List[Dict[str, object]] = []
for receiver in receivers:
if not isinstance(receiver, dict):
raise ValueError("Each receiver must be object.")
access_obj = receiver.get("access", {})
if access_obj is None:
access_obj = {}
if not isinstance(access_obj, dict):
raise ValueError("receiver.access must be object.")
source_url = str(
receiver.get("source_url")
or access_obj.get("url")
or access_obj.get("source_url")
or ""
).strip()
if not source_url:
raise ValueError(
f"receiver '{receiver.get('receiver_id', '')}': source_url/access.url must be non-empty."
)
source_headers: Dict[str, str] = {}
source_api_token = str(
receiver.get("source_api_token") or access_obj.get("api_token") or ""
).strip()
if source_api_token:
source_headers["Authorization"] = f"Bearer {source_api_token}"
parsed_receivers.append(
{
"receiver_id": str(receiver["receiver_id"]),
"center": _center_from_obj(receiver),
"source_url": source_url,
"source_headers": source_headers,
"configured_frequencies_hz": _parse_receiver_configured_frequencies(
receiver_obj=receiver,
receiver_id=str(receiver["receiver_id"]),
),
"input_filter": _parse_receiver_input_filter(
receiver_obj=receiver,
receiver_id=str(receiver["receiver_id"]),
default_filter_obj=default_filter_obj,
),
}
)
self.receivers = parsed_receivers
self.state_lock = threading.Lock()
self.latest_payload: Optional[Dict[str, object]] = None
self.last_error: str = "no data yet"
self.updated_at_utc: Optional[str] = None
self.last_output_delivery: Dict[str, object] = {
"enabled": self.output_enabled,
"status": "disabled" if not self.output_enabled else "pending",
"http_status": None,
"response_body": "",
"sent_at_utc": None,
"servers": [
{
"name": server["name"],
"enabled": bool(server["enabled"]),
"status": "disabled" if not bool(server["enabled"]) else "pending",
"http_status": None,
"response_body": "",
"sent_at_utc": None,
"target": {
"ip": server["ip"],
"port": server["port"],
"path": server["path"],
},
"frequency_filter": {
"enabled": server["frequency_filter_enabled"],
"min_frequency_mhz": server["min_frequency_mhz"],
"max_frequency_mhz": server["max_frequency_mhz"],
},
}
for server in self.output_servers
],
}
self.stop_event = threading.Event()
self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
def start(self) -> None:
if self.mock_input_frequency_sync_enabled:
self.last_mock_sync_errors = _sync_mock_input_frequencies(self)
else:
self.last_mock_sync_errors = []
self.poll_thread.start()
def stop(self) -> None:
self.stop_event.set()
if self.poll_thread.is_alive():
self.poll_thread.join(timeout=2.0)
def keycloak_openid_base(self) -> str:
return (
f"{self.keycloak_base_url}/realms/{self.keycloak_realm}"
"/protocol/openid-connect"
)
def keycloak_token_url(self) -> str:
return f"{self.keycloak_openid_base()}/token"
def keycloak_userinfo_url(self) -> str:
return f"{self.keycloak_openid_base()}/userinfo"
def keycloak_admin_api_base(self) -> str:
return f"{self.keycloak_base_url}/admin/realms/{self.keycloak_realm}"
def role_from_token(self, access_token: str) -> Optional[str]:
payload = _decode_jwt_payload_unverified(access_token)
realm_access = payload.get("realm_access", {})
if not isinstance(realm_access, dict):
return None
roles = realm_access.get("roles", [])
if not isinstance(roles, list):
return None
normalized_roles = {str(role).strip() for role in roles if str(role).strip()}
if self.keycloak_admin_role in normalized_roles:
return "admin"
if self.keycloak_user_role in normalized_roles:
return "user"
return None
def get_admin_access_token(self) -> str:
if not self.auth_enabled:
raise RuntimeError("Authentication is disabled.")
with self.keycloak_admin_token_lock:
cached_token = str(self.keycloak_admin_token_cache.get("access_token", ""))
expires_at = float(self.keycloak_admin_token_cache.get("expires_at", 0.0) or 0.0)
if cached_token and expires_at > time.time() + 15.0:
return cached_token
status_code, payload, request_error = _http_json_request(
self.keycloak_token_url(),
method="POST",
form={
"grant_type": "client_credentials",
"client_id": self.keycloak_admin_client_id,
"client_secret": self.keycloak_admin_client_secret,
},
timeout_s=5.0,
)
if request_error:
raise RuntimeError(f"Keycloak admin token request failed: {request_error}")
if status_code < 200 or status_code >= 300:
raise RuntimeError(
f"Keycloak admin token request failed: {payload.get('error_description') or payload.get('error') or status_code}"
)
access_token = str(payload.get("access_token", "")).strip()
expires_in = float(payload.get("expires_in", 60.0) or 60.0)
if not access_token:
raise RuntimeError("Keycloak admin token response did not include access_token.")
self.keycloak_admin_token_cache = {
"access_token": access_token,
"expires_at": time.time() + max(30.0, expires_in),
}
return access_token
def refresh_once(self) -> None:
receiver_payloads: List[Dict[str, object]] = []
grouped_by_receiver: List[Dict[float, List[Tuple[float, float]]]] = []
for receiver in self.receivers:
receiver_id = str(receiver["receiver_id"])
center = receiver["center"]
source_url = str(receiver["source_url"])
source_headers = receiver.get("source_headers")
raw_measurements = _fetch_measurements(
source_url,
timeout_s=self.source_timeout_s,
expected_receiver_id=receiver_id,
headers=source_headers if isinstance(source_headers, dict) else None,
)
receiver_filter = receiver["input_filter"]
measurements = _apply_receiver_input_filter(
raw_measurements, receiver_filter=receiver_filter
)
configured_frequencies_hz = receiver.get("configured_frequencies_hz", [])
if isinstance(configured_frequencies_hz, list):
measurements = _apply_receiver_configured_frequencies(
measurements,
configured_frequencies_hz=configured_frequencies_hz,
)
if not measurements:
raise RuntimeError(
f"receiver '{receiver_id}': no measurements left after configured filters."
)
grouped = _group_by_frequency(measurements)
grouped_by_receiver.append(grouped)
radius_m = aggregate_radius(measurements, model=self.model, method=self.aggregation)
samples = []
for frequency_hz, amplitude_dbm in measurements:
samples.append(
{
"frequency_hz": frequency_hz,
"frequency_mhz": frequency_hz / HZ_IN_MHZ,
"amplitude_dbm": amplitude_dbm,
"distance_m": rssi_to_distance_m(
amplitude_dbm=amplitude_dbm,
frequency_hz=frequency_hz,
model=self.model,
),
}
)
receiver_payloads.append(
{
"receiver_id": receiver_id,
"center": {"x": center[0], "y": center[1], "z": center[2]},
"source_url": source_url,
"aggregation": self.aggregation,
"input_filter": receiver_filter,
"configured_frequencies_mhz": [
float(int(value)) / HZ_IN_MHZ
for value in (
configured_frequencies_hz
if isinstance(configured_frequencies_hz, list)
else []
)
],
"raw_samples_count": len(raw_measurements),
"filtered_samples_count": len(measurements),
"radius_m_all_freq": radius_m,
"samples": samples,
}
)
frequency_rows: List[Dict[str, object]] = []
best_row: Optional[Dict[str, object]] = None
all_frequencies = sorted(
{frequency for grouped in grouped_by_receiver for frequency in grouped.keys()}
)
for frequency_hz in all_frequencies:
available_indices = [
idx for idx, grouped in enumerate(grouped_by_receiver) if frequency_hz in grouped
]
if len(available_indices) < 3:
continue
best_combo_row: Optional[Dict[str, object]] = None
best_combo_result = None
best_combo_indices: Optional[Tuple[int, int, int]] = None
best_combo_spheres: Optional[List[Sphere]] = None
for combo in itertools.combinations(available_indices, 3):
spheres_for_frequency: List[Sphere] = []
row_receivers: List[Dict[str, object]] = []
for receiver_index in combo:
receiver = self.receivers[receiver_index]
measurement_subset = grouped_by_receiver[receiver_index][frequency_hz]
radius_m = aggregate_radius(
measurement_subset, model=self.model, method=self.aggregation
)
spheres_for_frequency.append(
Sphere(center=receiver["center"], radius=radius_m)
)
row_receivers.append(
{
"receiver_id": str(receiver["receiver_id"]),
"radius_m": radius_m,
"samples_count": len(measurement_subset),
}
)
result = solve_three_sphere_intersection(
spheres=spheres_for_frequency,
tolerance=self.tolerance,
z_preference=self.z_preference, # type: ignore[arg-type]
)
candidate_row = {
"frequency_hz": frequency_hz,
"frequency_mhz": frequency_hz / HZ_IN_MHZ,
"position": {
"x": result.point[0],
"y": result.point[1],
"z": result.point[2],
},
"exact": result.exact,
"rmse_m": result.rmse,
"receivers": row_receivers,
"used_receivers_count": 3,
"available_receivers_count": len(available_indices),
}
if (
best_combo_row is None
or float(candidate_row["rmse_m"]) < float(best_combo_row["rmse_m"])
):
best_combo_row = candidate_row
best_combo_result = result
best_combo_indices = combo
best_combo_spheres = spheres_for_frequency
if (
best_combo_row is None
or best_combo_result is None
or best_combo_indices is None
or best_combo_spheres is None
):
continue
row_receivers = best_combo_row["receivers"]
for local_index, receiver_index in enumerate(best_combo_indices):
residual = best_combo_result.residuals[local_index]
row_receivers[local_index]["residual_m"] = residual
receiver_payloads[receiver_index].setdefault("per_frequency", []).append(
{
"frequency_hz": frequency_hz,
"frequency_mhz": frequency_hz / HZ_IN_MHZ,
"radius_m": best_combo_spheres[local_index].radius,
"residual_m": residual,
"samples_count": len(grouped_by_receiver[receiver_index][frequency_hz]),
}
)
frequency_rows.append(best_combo_row)
if best_row is None or float(best_combo_row["rmse_m"]) < float(best_row["rmse_m"]):
best_row = best_combo_row
if best_row is None:
if len(self.receivers) == 3:
raise RuntimeError("No common frequencies across all 3 receivers.")
raise RuntimeError("Cannot build frequency table for trilateration.")
payload = {
"timestamp_utc": _utc_now_iso_seconds(),
"selected_frequency_hz": best_row["frequency_hz"],
"selected_frequency_mhz": float(best_row["frequency_hz"]) / HZ_IN_MHZ,
"position": best_row["position"],
"exact": best_row["exact"],
"rmse_m": best_row["rmse_m"],
"frequency_table": frequency_rows,
"model": {
"tx_power_dbm": self.model.tx_power_dbm,
"tx_gain_dbi": self.model.tx_gain_dbi,
"rx_gain_dbi": self.model.rx_gain_dbi,
"path_loss_exponent": self.model.path_loss_exponent,
"reference_distance_m": self.model.reference_distance_m,
},
"receivers": receiver_payloads,
}
with self.state_lock:
self.latest_payload = payload
self.updated_at_utc = payload["timestamp_utc"] # type: ignore[index]
self.last_error = ""
delivery = self._deliver_to_output_servers(payload)
with self.state_lock:
self.last_output_delivery = delivery
if delivery["status"] in ("error", "partial"):
failed_servers = [
row["name"]
for row in delivery.get("servers", [])
if isinstance(row, dict) and row.get("status") == "error"
]
raise RuntimeError(
"Output server(s) rejected payload: "
+ ", ".join(str(name) for name in failed_servers)
)
@staticmethod
def _row_frequency_mhz(row: Dict[str, object]) -> Optional[float]:
mhz = row.get("frequency_mhz")
if isinstance(mhz, (int, float)):
return float(mhz)
hz = row.get("frequency_hz")
if isinstance(hz, (int, float)):
return float(hz) / HZ_IN_MHZ
return None
@staticmethod
def _position_from_row(row: Dict[str, object]) -> Optional[Dict[str, float]]:
position_obj = row.get("position")
if not isinstance(position_obj, dict):
return None
try:
return {
"x": float(position_obj["x"]),
"y": float(position_obj["y"]),
"z": float(position_obj["z"]),
}
except (TypeError, ValueError, KeyError):
return None
def _build_output_payload(
self,
payload: Dict[str, object],
output_server: Dict[str, object],
) -> Optional[Dict[str, object]]:
table_obj = payload.get("frequency_table")
if not isinstance(table_obj, list):
return None
rows: List[Dict[str, object]] = []
for row in table_obj:
if not isinstance(row, dict):
continue
frequency_hz = row.get("frequency_hz")
if not isinstance(frequency_hz, (int, float)):
continue
if self._position_from_row(row) is None:
continue
if bool(output_server.get("frequency_filter_enabled", False)):
if not (
float(output_server.get("min_frequency_hz", 0.0))
<= float(frequency_hz)
<= float(output_server.get("max_frequency_hz", 0.0))
):
continue
rows.append(row)
if not rows:
return None
best_row = min(
rows,
key=lambda row: float(row.get("rmse_m", float("inf"))),
)
best_position = self._position_from_row(best_row)
if best_position is None:
return None
# Minimal transport payload for final server integration: coordinates only.
return best_position
def _deliver_to_output_servers(self, payload: Dict[str, object]) -> Dict[str, object]:
now = _utc_now_iso_seconds()
servers_delivery: List[Dict[str, object]] = []
enabled_targets = [server for server in self.output_servers if bool(server.get("enabled"))]
for server in self.output_servers:
server_delivery = {
"name": server["name"],
"enabled": bool(server["enabled"]),
"status": "disabled",
"http_status": None,
"response_body": "",
"sent_at_utc": now,
"target": {
"ip": server["ip"],
"port": server["port"],
"path": server["path"],
},
"frequency_filter": {
"enabled": server["frequency_filter_enabled"],
"min_frequency_mhz": server["min_frequency_mhz"],
"max_frequency_mhz": server["max_frequency_mhz"],
},
}
if not bool(server["enabled"]):
servers_delivery.append(server_delivery)
continue
output_payload = self._build_output_payload(payload=payload, output_server=server)
if output_payload is None:
server_delivery["status"] = "skipped"
server_delivery["response_body"] = "No frequencies in configured output range"
servers_delivery.append(server_delivery)
continue
status_code, response_body = send_payload_to_server(
server_ip=str(server["ip"]),
payload=output_payload,
port=int(server["port"]),
path=str(server["path"]),
timeout_s=float(server["timeout_s"]),
)
server_delivery["http_status"] = status_code
server_delivery["response_body"] = response_body
server_delivery["status"] = "ok" if 200 <= status_code < 300 else "error"
servers_delivery.append(server_delivery)
ok_count = sum(1 for row in servers_delivery if row["status"] == "ok")
error_count = sum(1 for row in servers_delivery if row["status"] == "error")
skipped_count = sum(1 for row in servers_delivery if row["status"] == "skipped")
if not enabled_targets:
status = "disabled"
elif error_count > 0 and ok_count > 0:
status = "partial"
elif error_count > 0:
status = "error"
elif ok_count == 0 and skipped_count > 0:
status = "skipped"
else:
status = "ok"
primary = next((row for row in servers_delivery if row["enabled"]), None)
if primary is None and servers_delivery:
primary = servers_delivery[0]
return {
"enabled": bool(enabled_targets),
"status": status,
"http_status": None if primary is None else primary["http_status"],
"response_body": "" if primary is None else primary["response_body"],
"sent_at_utc": now,
"target": None if primary is None else primary["target"],
"frequency_filter": None if primary is None else primary["frequency_filter"],
"ok_count": ok_count,
"error_count": error_count,
"skipped_count": skipped_count,
"servers": servers_delivery,
}
def _poll_loop(self) -> None:
while not self.stop_event.is_set():
try:
self.refresh_once()
except Exception as exc:
with self.state_lock:
self.last_error = str(exc)
self.stop_event.wait(self.poll_interval_s)
def snapshot(self) -> Dict[str, object]:
with self.state_lock:
return {
"updated_at_utc": self.updated_at_utc,
"last_error": self.last_error,
"payload": self.latest_payload,
"output_delivery": self.last_output_delivery,
}
def _make_handler(service: AutoService):
service_holder = {"current": service}
service_swap_lock = threading.Lock()
auth_sessions: Dict[str, Dict[str, object]] = {}
auth_sessions_lock = threading.Lock()
class ServiceHandler(BaseHTTPRequestHandler):
@staticmethod
def _current_service() -> AutoService:
return service_holder["current"]
def _api_token_authorized(self) -> bool:
service_obj = self._current_service()
expected_token = service_obj.write_api_token
if not expected_token:
return False
header_token = self.headers.get("X-API-Token", "")
if hmac.compare_digest(header_token, expected_token):
return True
authorization = self.headers.get("Authorization", "")
if authorization.lower().startswith("bearer "):
bearer_token = authorization[7:].strip()
if hmac.compare_digest(bearer_token, expected_token):
return True
return False
def _parse_cookies(self) -> Dict[str, str]:
raw_cookie = str(self.headers.get("Cookie", "")).strip()
cookies: Dict[str, str] = {}
if not raw_cookie:
return cookies
for part in raw_cookie.split(";"):
if "=" not in part:
continue
key, value = part.split("=", 1)
cookies[key.strip()] = value.strip()
return cookies
def _current_session(self) -> Optional[Dict[str, object]]:
service_obj = self._current_service()
if not service_obj.auth_enabled:
return {
"username": "local-admin",
"role": "admin",
"expires_at": None,
}
session_id = self._parse_cookies().get(service_obj.auth_cookie_name, "")
if not session_id:
return None
with auth_sessions_lock:
session_row = auth_sessions.get(session_id)
if not isinstance(session_row, dict):
return None
expires_at = float(session_row.get("expires_at", 0.0) or 0.0)
if expires_at <= time.time():
auth_sessions.pop(session_id, None)
return None
refreshed = dict(session_row)
refreshed["expires_at"] = time.time() + service_obj.auth_session_ttl_s
auth_sessions[session_id] = refreshed
return refreshed
def _auth_payload(self, session_row: Optional[Dict[str, object]]) -> Dict[str, object]:
service_obj = self._current_service()
authenticated = isinstance(session_row, dict)
role = str(session_row.get("role", "")) if authenticated else ""
is_admin = role == "admin"
if not authenticated and service_obj.auth_enabled:
visible_sections: List[str] = []
elif role == "user":
visible_sections = [
"overview",
"frequencies",
"io",
"history",
]
else:
visible_sections = [
"overview",
"frequencies",
"io",
"history",
"servers",
"json",
]
return {
"enabled": bool(service_obj.auth_enabled),
"provider": service_obj.auth_provider,
"authenticated": authenticated,
"username": "" if not authenticated else str(session_row.get("username", "")),
"role": role,
"capabilities": {
"view_result": authenticated or not service_obj.auth_enabled,
"view_frequencies": is_admin or role == "user" or not service_obj.auth_enabled,
"admin": is_admin or not service_obj.auth_enabled,
"manage_users": is_admin or not service_obj.auth_enabled,
"manage_system": is_admin or not service_obj.auth_enabled,
},
"visible_sections": visible_sections,
"admin_console_url": service_obj.keycloak_admin_console_url,
}
def _require_authenticated(self) -> bool:
service_obj = self._current_service()
if not service_obj.auth_enabled:
return True
if self._current_session() is not None:
return True
self._write_json(401, {"status": "error", "error": "authentication required"})
return False
def _require_admin(self) -> bool:
service_obj = self._current_service()
token_matches = self._api_token_authorized()
if token_matches:
return True
if service_obj.write_api_token and not service_obj.auth_enabled:
self._write_json(
401,
{"status": "error", "error": "unauthorized: missing or invalid API token"},
)
return False
if not service_obj.auth_enabled:
return True
session_row = self._current_session()
if session_row is None:
self._write_json(401, {"status": "error", "error": "authentication required"})
return False
if str(session_row.get("role", "")) != "admin":
self._write_json(403, {"status": "error", "error": "admin role required"})
return False
return True
def _issue_session(self, username: str, role: str) -> str:
service_obj = self._current_service()
session_id = secrets.token_urlsafe(32)
with auth_sessions_lock:
auth_sessions[session_id] = {
"username": str(username),
"role": str(role),
"expires_at": time.time() + service_obj.auth_session_ttl_s,
}
return session_id
def _drop_session(self) -> None:
service_obj = self._current_service()
session_id = self._parse_cookies().get(service_obj.auth_cookie_name, "")
if not session_id:
return
with auth_sessions_lock:
auth_sessions.pop(session_id, None)
def _session_cookie_headers(self, session_id: str = "", clear: bool = False) -> Dict[str, str]:
service_obj = self._current_service()
if clear:
return {
"Set-Cookie": (
f"{service_obj.auth_cookie_name}=; Path=/; HttpOnly; SameSite=Lax; Max-Age=0"
)
}
return {
"Set-Cookie": (
f"{service_obj.auth_cookie_name}={session_id}; Path=/; HttpOnly; SameSite=Lax; "
f"Max-Age={service_obj.auth_session_ttl_s}"
)
}
def _write_bytes(
self,
status_code: int,
content: bytes,
content_type: str,
extra_headers: Optional[Dict[str, str]] = None,
) -> None:
self.send_response(status_code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(content)))
if isinstance(extra_headers, dict):
for key, value in extra_headers.items():
self.send_header(str(key), str(value))
self.end_headers()
self.wfile.write(content)
def _write_json(
self,
status_code: int,
payload: Dict[str, object],
extra_headers: Optional[Dict[str, str]] = None,
) -> None:
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self._write_bytes(
status_code=status_code,
content=raw,
content_type="application/json; charset=utf-8",
extra_headers=extra_headers,
)
def _read_json_body(
self,
*,
max_bytes: int = MAX_CONFIG_BODY_BYTES,
empty_body_is_object: bool = False,
) -> Tuple[Optional[Dict[str, object]], Optional[str], int]:
try:
content_length = int(self.headers.get("Content-Length", "0"))
except ValueError:
return None, "Invalid Content-Length", 400
if content_length <= 0:
if empty_body_is_object:
return {}, None, 200
return None, "Empty request body", 400
if content_length > max_bytes:
return (
None,
f"Config payload too large: {content_length} bytes, max is {max_bytes}",
413,
)
body = self.rfile.read(content_length)
try:
parsed = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
return None, f"Invalid JSON: {exc}", 400
if not isinstance(parsed, dict):
return None, "JSON body must be object", 400
return parsed, None, 200
def _write_static(self, relative_path: str) -> None:
web_root = Path(__file__).resolve().parent / "web"
file_path = (web_root / relative_path).resolve()
# Protect against path traversal outside /web.
try:
file_path.relative_to(web_root.resolve())
except ValueError:
self._write_json(404, {"error": "not_found"})
return
if not file_path.exists() or not file_path.is_file():
self._write_json(404, {"error": "not_found"})
return
mime_type, _ = mimetypes.guess_type(str(file_path))
if mime_type is None:
mime_type = "application/octet-stream"
# Force UTF-8 for text assets to avoid mojibake in browsers.
if mime_type.startswith("text/") or mime_type in (
"application/javascript",
"application/x-javascript",
):
mime_type = f"{mime_type}; charset=utf-8"
self._write_bytes(
200,
file_path.read_bytes(),
mime_type,
extra_headers={
"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0",
"Pragma": "no-cache",
"Expires": "0",
},
)
def log_message(self, format: str, *args) -> None:
return
def do_GET(self) -> None:
path = parse.urlparse(self.path).path
service_obj = self._current_service()
snapshot = service_obj.snapshot()
session_row = self._current_session()
if path == "/" or path == "/ui":
self._write_static("index.html")
return
if path.startswith("/static/"):
self._write_static(path.removeprefix("/static/"))
return
if path == "/auth/session":
self._write_json(200, {"status": "ok", "auth": self._auth_payload(session_row)})
return
if path == "/health":
status = "ok" if snapshot["payload"] else "warming_up"
http_code = 200 if status == "ok" else 503
self._write_json(
http_code,
{
"status": status,
"updated_at_utc": snapshot["updated_at_utc"],
"error": snapshot["last_error"],
},
)
return
if path == "/result":
if not self._require_authenticated():
return
payload = snapshot["payload"]
if payload is None:
self._write_json(
503,
{
"status": "warming_up",
"updated_at_utc": snapshot["updated_at_utc"],
"error": snapshot["last_error"],
},
)
return
self._write_json(
200,
{
"status": "ok",
"updated_at_utc": snapshot["updated_at_utc"],
"data": payload,
"output_delivery": snapshot["output_delivery"],
},
)
return
if path == "/frequencies":
if not self._require_authenticated():
return
payload = snapshot["payload"]
if payload is None:
self._write_json(
503,
{
"status": "warming_up",
"updated_at_utc": snapshot["updated_at_utc"],
"error": snapshot["last_error"],
},
)
return
self._write_json(
200,
{
"status": "ok",
"updated_at_utc": snapshot["updated_at_utc"],
"selected_frequency_hz": payload.get("selected_frequency_hz"),
"selected_frequency_mhz": payload.get("selected_frequency_mhz"),
"frequency_table": payload.get("frequency_table", []),
"output_delivery": snapshot["output_delivery"],
},
)
return
if path == "/config":
if service_obj.auth_enabled and not self._require_admin():
return
public_config = _public_config_view(
config=service_obj.config,
write_api_token_set=bool(service_obj.write_api_token),
)
self._write_json(
200,
{
"status": "ok",
"config_path": service_obj.config_path,
"config": public_config,
},
)
return
if path == "/mock/controls":
if not self._require_authenticated():
return
self._write_json(200, _collect_mock_controls(service_obj))
return
if path == "/users":
if not self._require_admin():
return
try:
users = _keycloak_list_users(service_obj)
except Exception as exc:
self._write_json(500, {"status": "error", "error": str(exc)})
return
self._write_json(200, {"status": "ok", "users": users})
return
self._write_json(404, {"error": "not_found"})
def do_POST(self) -> None:
path = parse.urlparse(self.path).path
if path == "/auth/login":
service_obj = self._current_service()
if not service_obj.auth_enabled:
self._write_json(400, {"status": "error", "error": "authentication disabled"})
return
payload, read_error, read_status = self._read_json_body()
if read_error:
self._write_json(read_status, {"status": "error", "error": read_error})
return
username = str(payload.get("username", "")).strip() if payload else ""
password = str(payload.get("password", "")).strip() if payload else ""
if not username or not password:
self._write_json(
400,
{"status": "error", "error": "username and password are required"},
)
return
status_code, token_payload, request_error = _http_json_request(
service_obj.keycloak_token_url(),
method="POST",
form={
"grant_type": "password",
"client_id": service_obj.keycloak_client_id,
"client_secret": service_obj.keycloak_client_secret,
"username": username,
"password": password,
"scope": "openid profile email",
},
timeout_s=5.0,
)
if request_error:
self._write_json(
502,
{"status": "error", "error": f"Keycloak login failed: {request_error}"},
)
return
if status_code < 200 or status_code >= 300:
self._write_json(
401,
{
"status": "error",
"error": str(
token_payload.get("error_description")
or token_payload.get("error")
or "login failed"
),
},
)
return
access_token = str(token_payload.get("access_token", "")).strip()
role = service_obj.role_from_token(access_token)
if not role:
self._write_json(
403,
{
"status": "error",
"error": "user does not have required Keycloak role",
},
)
return
session_id = self._issue_session(username=username, role=role)
session_row = {"username": username, "role": role}
self._write_json(
200,
{"status": "ok", "auth": self._auth_payload(session_row)},
extra_headers=self._session_cookie_headers(session_id=session_id),
)
return
if path == "/auth/logout":
self._drop_session()
self._write_json(
200,
{"status": "ok"},
extra_headers=self._session_cookie_headers(clear=True),
)
return
if path == "/config":
if not self._require_admin():
return
service_obj = self._current_service()
new_config, read_error, read_status = self._read_json_body()
if read_error:
self._write_json(read_status, {"status": "error", "error": read_error})
return
_preserve_sensitive_config_values(service_obj, new_config)
try:
new_service = AutoService(new_config, config_path=service_obj.config_path)
except Exception as exc:
self._write_json(
400,
{"status": "error", "error": f"Config validation failed: {exc}"},
)
return
save_error = ""
if service_obj.config_path:
try:
Path(service_obj.config_path).write_text(
json.dumps(new_config, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except OSError as exc:
save_error = str(exc)
try:
new_service.start()
except Exception as exc:
self._write_json(
500,
{
"status": "error",
"error": f"Failed to start service with new config: {exc}",
},
)
return
with service_swap_lock:
old_service = service_holder["current"]
service_holder["current"] = new_service
old_service.stop()
self._write_json(
200,
{
"status": "ok",
"saved": bool(service_obj.config_path) and not bool(save_error),
"save_error": save_error,
"restart_required": False,
"applied": True,
"config_path": service_obj.config_path,
"mock_input_frequency_sync_enabled": bool(
new_service.mock_input_frequency_sync_enabled
),
"mock_input_frequency_sync_errors": list(
new_service.last_mock_sync_errors
),
},
)
return
if path == "/mock/control":
if not self._require_admin():
return
service_obj = self._current_service()
payload, read_error, read_status = self._read_json_body(empty_body_is_object=True)
if read_error:
self._write_json(read_status, {"status": "error", "error": read_error})
return
target = str(payload.get("target", "")).strip().lower()
target_id = str(payload.get("id", "")).strip()
enabled_value = payload.get("enabled")
frequencies_value = payload.get("frequencies_mhz")
if target not in ("input", "output"):
self._write_json(
400,
{"status": "error", "error": "target must be 'input' or 'output'"},
)
return
if not target_id:
self._write_json(400, {"status": "error", "error": "id is required"})
return
parsed_frequencies: Optional[List[float]] = None
if frequencies_value is not None:
if not isinstance(frequencies_value, list):
self._write_json(
400,
{"status": "error", "error": "frequencies_mhz must be array"},
)
return
parsed_frequencies = []
for index, value in enumerate(frequencies_value, start=1):
try:
numeric = float(value)
except (TypeError, ValueError):
self._write_json(
400,
{
"status": "error",
"error": f"frequencies_mhz[{index}] must be numeric",
},
)
return
if not math.isfinite(numeric) or numeric <= 0.0:
self._write_json(
400,
{
"status": "error",
"error": f"frequencies_mhz[{index}] must be > 0",
},
)
return
parsed_frequencies.append(round(numeric, 6))
if target == "output" and not isinstance(enabled_value, bool):
self._write_json(400, {"status": "error", "error": "enabled must be boolean"})
return
if target == "input" and not isinstance(enabled_value, bool) and parsed_frequencies is None:
self._write_json(
400,
{
"status": "error",
"error": "input control requires 'enabled' or 'frequencies_mhz'",
},
)
return
try:
response = _set_mock_control(
service=service_obj,
target=target,
target_id=target_id,
enabled=enabled_value if isinstance(enabled_value, bool) else None,
frequencies_mhz=parsed_frequencies,
)
except Exception as exc:
self._write_json(500, {"status": "error", "error": str(exc)})
return
self._write_json(200, response)
return
if path == "/users":
if not self._require_admin():
return
service_obj = self._current_service()
payload, read_error, read_status = self._read_json_body()
if read_error:
self._write_json(read_status, {"status": "error", "error": read_error})
return
action = str(payload.get("action", "")).strip().lower()
username = str(payload.get("username", "")).strip()
role = str(payload.get("role", "user")).strip().lower()
enabled = bool(payload.get("enabled", True))
password = str(payload.get("password", "")).strip()
first_name = str(payload.get("first_name", "")).strip()
last_name = str(payload.get("last_name", "")).strip()
user_id = str(payload.get("user_id", "")).strip()
if action not in ("create", "update", "set_password", "delete"):
self._write_json(400, {"status": "error", "error": "unsupported user action"})
return
if not user_id and username:
user_id = _keycloak_get_user_id_by_username(service_obj, username) or ""
try:
if action == "create":
if not username or not password:
raise ValueError("username and password are required")
if role not in ("admin", "user"):
raise ValueError("role must be admin or user")
status_code, _, request_error = _keycloak_admin_request(
service_obj,
"/users",
method="POST",
json_body={
"username": username,
"enabled": enabled,
"firstName": first_name,
"lastName": last_name,
},
)
if request_error:
raise RuntimeError(request_error)
if status_code < 200 or status_code >= 300:
raise RuntimeError("failed to create user in Keycloak")
user_id = _keycloak_get_user_id_by_username(service_obj, username) or ""
if not user_id:
raise RuntimeError("created user not found in Keycloak")
_keycloak_reset_password(service_obj, user_id, password)
_keycloak_apply_user_role(service_obj, user_id, role)
elif action == "update":
if not user_id:
raise ValueError("user_id or username is required")
if role not in ("admin", "user"):
raise ValueError("role must be admin or user")
status_code, _, request_error = _keycloak_admin_request(
service_obj,
f"/users/{parse.quote(user_id)}",
method="PUT",
json_body={
"enabled": enabled,
"firstName": first_name,
"lastName": last_name,
},
)
if request_error or status_code < 200 or status_code >= 300:
raise RuntimeError("failed to update user in Keycloak")
_keycloak_apply_user_role(service_obj, user_id, role)
elif action == "set_password":
if not user_id:
raise ValueError("user_id or username is required")
if not password:
raise ValueError("password is required")
_keycloak_reset_password(service_obj, user_id, password)
elif action == "delete":
if not user_id:
raise ValueError("user_id or username is required")
status_code, _, request_error = _keycloak_admin_request(
service_obj,
f"/users/{parse.quote(user_id)}",
method="DELETE",
)
if request_error or status_code < 200 or status_code >= 300:
raise RuntimeError("failed to delete user in Keycloak")
except Exception as exc:
self._write_json(400, {"status": "error", "error": str(exc)})
return
try:
users = _keycloak_list_users(service_obj)
except Exception as exc:
self._write_json(
200,
{
"status": "ok",
"message": f"action completed, but refresh failed: {exc}",
"users": [],
},
)
return
self._write_json(200, {"status": "ok", "users": users})
return
if path != "/refresh":
self._write_json(404, {"error": "not_found"})
return
if not self._require_admin():
return
try:
self._current_service().refresh_once()
except Exception as exc:
self._write_json(500, {"status": "error", "error": str(exc)})
return
snapshot = self._current_service().snapshot()
self._write_json(
200,
{
"status": "ok",
"updated_at_utc": snapshot["updated_at_utc"],
},
)
ServiceHandler.service_holder = service_holder # type: ignore[attr-defined]
return ServiceHandler
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Automatic trilateration service: polls 3 receiver servers and exposes result API."
)
parser.add_argument("--config", type=str, default="config.json")
parser.add_argument("--host", type=str, default="")
parser.add_argument("--port", type=int, default=0)
return parser.parse_args()
def main() -> int:
args = parse_args()
config = _load_json(args.config)
runtime = config.get("runtime", {})
if not isinstance(runtime, dict):
raise SystemExit("runtime must be object.")
host = args.host or str(runtime.get("listen_host", "0.0.0.0"))
port = args.port or int(runtime.get("listen_port", 8081))
service = AutoService(config, config_path=args.config)
service.start()
handler = _make_handler(service)
server = ThreadingHTTPServer((host, port), handler)
print(f"service_listen: http://{host}:{port}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
current_service = handler.service_holder["current"] # type: ignore[attr-defined]
current_service.stop()
return 0
if __name__ == "__main__":
raise SystemExit(main())