from __future__ import annotations import argparse import json import random import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Dict, List def _build_payload(receiver_id: str, base_rssi: float) -> Dict[str, object]: noise_a = random.uniform(-1.2, 1.2) noise_b = random.uniform(-1.2, 1.2) rows: List[Dict[str, float]] = [ {"f_mhz": 433.920, "rssi": base_rssi + noise_a}, {"f_mhz": 868.100, "rssi": base_rssi - 4.0 + noise_b}, ] return { "receiver_id": receiver_id, "timestamp_unix": time.time(), "samples": rows, } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--receiver-id", required=True) parser.add_argument("--port", type=int, default=9000) parser.add_argument("--base-rssi", type=float, default=-62.0) args = parser.parse_args() class Handler(BaseHTTPRequestHandler): def log_message(self, format: str, *args2) -> None: return def do_GET(self) -> None: if self.path != "/measurements": self.send_response(404) self.end_headers() return payload = _build_payload(args.receiver_id, args.base_rssi) raw = json.dumps(payload).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) server = ThreadingHTTPServer(("0.0.0.0", args.port), Handler) print(f"mock_receiver({args.receiver_id}) listening on :{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())