from __future__ import annotations import argparse import json from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=8080) args = parser.parse_args() latest = {"count": 0, "last_payload": None} class Handler(BaseHTTPRequestHandler): def log_message(self, format: str, *args2) -> None: return def do_GET(self) -> None: if self.path != "/latest": self.send_response(404) self.end_headers() return raw = json.dumps(latest).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) def do_POST(self) -> None: if self.path != "/triangulation": self.send_response(404) self.end_headers() return content_length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(content_length) if content_length > 0 else b"{}" payload = json.loads(body.decode("utf-8")) x = payload.get("x") y = payload.get("y") z = payload.get("z") latest["count"] = int(latest["count"]) + 1 latest["last_payload"] = payload print(f"received payload, x={x}, y={y}, z={z}") raw = json.dumps({"status": "ok"}).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) server = ThreadingHTTPServer(("0.0.0.0", args.port), Handler) print(f"mock_output_sink listening on :{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())