You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
4.3 KiB
Python

from __future__ import annotations
import argparse
import json
import random
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Dict, List
def _build_payload(receiver_id: str, base_rssi: float) -> Dict[str, object]:
noise_a = random.uniform(-1.2, 1.2)
noise_b = random.uniform(-1.2, 1.2)
rows: List[Dict[str, float]] = [
{"f_mhz": 433.920, "rssi": base_rssi + noise_a},
{"f_mhz": 868.100, "rssi": base_rssi - 4.0 + noise_b},
]
return {
"receiver_id": receiver_id,
"timestamp_unix": time.time(),
"samples": rows,
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--receiver-id", required=True)
parser.add_argument("--port", type=int, default=9000)
parser.add_argument("--base-rssi", type=float, default=-62.0)
args = parser.parse_args()
state = {"enabled": True}
class Handler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args2) -> None:
return
def do_GET(self) -> None:
if self.path == "/status":
payload = {
"receiver_id": args.receiver_id,
"enabled": bool(state["enabled"]),
"status": "ok",
}
raw = json.dumps(payload).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
return
if self.path != "/measurements":
self.send_response(404)
self.end_headers()
return
if not bool(state["enabled"]):
raw = json.dumps(
{
"status": "error",
"error": "receiver transmission is paused",
"receiver_id": args.receiver_id,
}
).encode("utf-8")
self.send_response(503)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
return
payload = _build_payload(args.receiver_id, args.base_rssi)
raw = json.dumps(payload).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
def do_POST(self) -> None:
if self.path != "/control":
self.send_response(404)
self.end_headers()
return
content_length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(content_length) if content_length > 0 else b"{}"
try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
payload = {}
enabled = payload.get("enabled")
if not isinstance(enabled, bool):
raw = json.dumps({"status": "error", "error": "field 'enabled' must be boolean"}).encode("utf-8")
self.send_response(400)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
return
state["enabled"] = enabled
raw = json.dumps(
{
"status": "ok",
"receiver_id": args.receiver_id,
"enabled": bool(state["enabled"]),
}
).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
server = ThreadingHTTPServer(("0.0.0.0", args.port), Handler)
print(f"mock_receiver({args.receiver_id}) listening on :{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())