You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/orange_scripts/compose_send_data_1200.py

115 lines
3.3 KiB
Python

from common.runtime import load_root_env, validate_env, as_float, as_int, as_str
import numpy as np
import requests
import os
import sys
import json
import time
load_root_env(__file__)
validate_env("orange_scripts/compose_send_data_1200.py", {
"POROG_1200": as_float,
"SERVER_IP_2": as_str,
"SERVER_PORT_2": as_int,
})
porog = float(os.getenv('POROG_1200'))
server_ip_1 = os.getenv('SERVER_IP_1')
server_port_1 = os.getenv('SERVER_PORT_1')
server_ip_2 = os.getenv('SERVER_IP_2')
server_port_2 = os.getenv('SERVER_PORT_2')
PARAMS = {'split_size': 400_000, 'point_amount': 100_000}
PARAMS['show_amount'] = 0.8 * PARAMS['point_amount']
token = 0
channel = 1
flag = 0
##############################
# HYPERPARAMETERS
##############################
f_base = 1.1e9
f_step = 20e6
f_roof = 1.3e9
##############################
# Variables
##############################
f = f_base
EOCF = 0
signal_arr = []
class NumpyArrayEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NumpyArrayEncoder, self).default(obj)
def send_data(sig):
try:
global token
print('#' * 10)
print('\nОтправка пакета ' + str(token+1))
data_to_send = {
"freq": 1200,
"channel": int(channel),
"token": int(token+1),
"data_real": np.asarray(np.array(sig, dtype=np.complex64).real, dtype=np.float32),
"data_imag": np.asarray(np.array(sig, dtype=np.complex64).imag, dtype=np.float32)
}
mod_data_to_send = json.dumps(data_to_send, cls=NumpyArrayEncoder)
response = requests.post("http://{0}:{1}/receive_data".format(server_ip_2, server_port_2), json=mod_data_to_send)
if response.status_code == 200:
token += 1
print(response.text)
print('#' * 10)
else:
print("Ошибка при отправке данных: ", response.status_code)
print('#' * 10)
except Exception as exc:
print(str(exc))
def median(sig):
global flag
median = abs(float(np.median(sorted(np.asarray(np.abs(np.array(sig, dtype=np.complex64)), dtype=np.float32))[int(PARAMS['show_amount']):])))
flag = 0 if porog > median else 1
print(channel, median, flag)
def work(lvl):
global flag
global channel
global f_base
global f_step
global f_roof
global f
global EOCF
global signal_arr
y = np.array(lvl).ravel()
signal_arr = np.concatenate((signal_arr, y), axis=None)
if f >= f_roof:
f = f_base
signal_arr = []
channel = 1
return f, EOCF
else:
if flag == 0 and len(signal_arr) >= PARAMS['point_amount']:
median(signal_arr[:PARAMS['point_amount']])
signal_arr = []
if flag == 0:
f += f_step
channel += 1
if len(signal_arr) >= PARAMS['split_size']:
send_data(signal_arr[:PARAMS['split_size']])
flag = 0
signal_arr = []
channel += 1
f += f_step
return f, EOCF