You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
58 lines
1.8 KiB
Python
58 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import random
|
|
import time
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from typing import Dict, List
|
|
|
|
|
|
def _build_payload(receiver_id: str, base_rssi: float) -> Dict[str, object]:
|
|
noise_a = random.uniform(-1.2, 1.2)
|
|
noise_b = random.uniform(-1.2, 1.2)
|
|
rows: List[Dict[str, float]] = [
|
|
{"f_mhz": 433.920, "rssi": base_rssi + noise_a},
|
|
{"f_mhz": 868.100, "rssi": base_rssi - 4.0 + noise_b},
|
|
]
|
|
return {
|
|
"receiver_id": receiver_id,
|
|
"timestamp_unix": time.time(),
|
|
"samples": rows,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--receiver-id", required=True)
|
|
parser.add_argument("--port", type=int, default=9000)
|
|
parser.add_argument("--base-rssi", type=float, default=-62.0)
|
|
args = parser.parse_args()
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def log_message(self, format: str, *args2) -> None:
|
|
return
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path != "/measurements":
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
|
|
payload = _build_payload(args.receiver_id, args.base_rssi)
|
|
raw = json.dumps(payload).encode("utf-8")
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(raw)))
|
|
self.end_headers()
|
|
self.wfile.write(raw)
|
|
|
|
server = ThreadingHTTPServer(("0.0.0.0", args.port), Handler)
|
|
print(f"mock_receiver({args.receiver_id}) listening on :{args.port}")
|
|
server.serve_forever()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|