from __future__ import annotations import argparse import json import random import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Dict, List def _parse_frequency_list(raw_value: str) -> List[float]: values: List[float] = [] for item in str(raw_value).split(","): text = item.strip() if not text: continue try: value = float(text) except ValueError as exc: raise ValueError(f"invalid frequency value '{text}'") from exc if value <= 0.0: raise ValueError(f"frequency must be > 0, got {value}") values.append(round(value, 6)) if not values: raise ValueError("at least one frequency is required") deduplicated: List[float] = [] seen = set() for value in values: key = round(value, 6) if key in seen: continue seen.add(key) deduplicated.append(value) return deduplicated def _build_payload( receiver_id: str, base_rssi: float, frequencies_mhz: List[float], ) -> Dict[str, object]: rows: List[Dict[str, float]] = [] for index, frequency_mhz in enumerate(frequencies_mhz): noise = random.uniform(-1.2, 1.2) offset = -2.2 * index rows.append({"f_mhz": round(float(frequency_mhz), 6), "rssi": base_rssi + offset + noise}) return { "receiver_id": receiver_id, "timestamp_unix": time.time(), "samples": rows, } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--receiver-id", required=True) parser.add_argument("--port", type=int, default=9000) parser.add_argument("--base-rssi", type=float, default=-62.0) parser.add_argument("--frequencies-mhz", type=str, default="433.92,868.1") args = parser.parse_args() state = { "enabled": True, "frequencies_mhz": _parse_frequency_list(args.frequencies_mhz), } class Handler(BaseHTTPRequestHandler): def log_message(self, format: str, *args2) -> None: return def do_GET(self) -> None: if self.path == "/status": payload = { "receiver_id": args.receiver_id, "enabled": bool(state["enabled"]), "frequencies_mhz": list(state["frequencies_mhz"]), "status": "ok", } raw = json.dumps(payload).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) return if self.path != "/measurements": self.send_response(404) self.end_headers() return if not bool(state["enabled"]): raw = json.dumps( { "status": "error", "error": "receiver transmission is paused", "receiver_id": args.receiver_id, } ).encode("utf-8") self.send_response(503) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) return payload = _build_payload( receiver_id=args.receiver_id, base_rssi=args.base_rssi, frequencies_mhz=list(state["frequencies_mhz"]), ) raw = json.dumps(payload).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) def do_POST(self) -> None: if self.path != "/control": self.send_response(404) self.end_headers() return content_length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(content_length) if content_length > 0 else b"{}" try: payload = json.loads(body.decode("utf-8")) except json.JSONDecodeError: payload = {} enabled = payload.get("enabled") frequencies_raw = payload.get("frequencies_mhz") has_enabled = isinstance(enabled, bool) has_frequencies = frequencies_raw is not None if not has_enabled and not has_frequencies: raw = json.dumps( { "status": "error", "error": "at least one of fields 'enabled' or 'frequencies_mhz' must be provided", } ).encode("utf-8") self.send_response(400) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) return if has_enabled: state["enabled"] = bool(enabled) if has_frequencies: if not isinstance(frequencies_raw, list): raw = json.dumps( {"status": "error", "error": "field 'frequencies_mhz' must be an array"} ).encode("utf-8") self.send_response(400) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) return try: parsed_frequencies = _parse_frequency_list(",".join(str(x) for x in frequencies_raw)) except ValueError as exc: raw = json.dumps({"status": "error", "error": str(exc)}).encode("utf-8") self.send_response(400) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) return state["frequencies_mhz"] = parsed_frequencies raw = json.dumps( { "status": "ok", "receiver_id": args.receiver_id, "enabled": bool(state["enabled"]), "frequencies_mhz": list(state["frequencies_mhz"]), } ).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) server = ThreadingHTTPServer(("0.0.0.0", args.port), Handler) print(f"mock_receiver({args.receiver_id}) listening on :{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())