You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/orange_scripts/compose_send_data_915.py

120 lines
3.1 KiB
Python

from common.runtime import load_root_env, validate_env, as_float, as_int, as_str
import json
import os
import numpy as np
import requests
load_root_env(__file__)
validate_env('orange_scripts/compose_send_data_915.py', {
'POROG_915': as_float,
'SERVER_IP_1': as_str,
'SERVER_PORT_1': as_int,
})
porog = float(os.getenv('POROG_915'))
server_ip_1 = os.getenv('SERVER_IP_1')
server_port_1 = os.getenv('SERVER_PORT_1')
server_ip_2 = os.getenv('SERVER_IP_2')
server_port_2 = os.getenv('SERVER_PORT_2')
PARAMS = {'split_size': 400_000, 'point_amount': 100_000}
PARAMS['show_amount'] = int(0.8 * PARAMS['point_amount'])
token = 0
channel = 1
flag = 0
f_base = 0.91e9
f_step = 20e6
f_roof = 0.98e9
f = f_base
EOCF = 0
signal_arr = np.array([], dtype=np.complex64)
class NumpyArrayEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
def send_data(sig):
try:
global token
print('#' * 10)
print('\nОтправка пакета ' + str(token + 1))
data_to_send = {
'freq': 915,
'channel': int(channel),
'token': int(token + 1),
'data_real': np.asarray(np.array(sig, dtype=np.complex64).real, dtype=np.float32),
'data_imag': np.asarray(np.array(sig, dtype=np.complex64).imag, dtype=np.float32),
}
mod_data_to_send = json.dumps(data_to_send, cls=NumpyArrayEncoder)
response = requests.post(
'http://{0}:{1}/receive_data'.format(server_ip_1, server_port_1),
json=mod_data_to_send,
)
if response.status_code == 200:
token += 1
print(response.text)
print('#' * 10)
else:
print('Ошибка при отправке данных: ', response.status_code)
print('#' * 10)
except Exception as exc:
print(str(exc))
def median(sig):
global flag
samples = np.asarray(np.abs(np.array(sig, dtype=np.complex64)), dtype=np.float32)
med = abs(float(np.median(sorted(samples)[int(PARAMS['show_amount']):])))
flag = 0 if porog > med else 1
print(channel, med, flag)
def advance_freq():
global channel
global f
next_freq = f + f_step
if next_freq >= f_roof:
f = f_base
channel = 1
return f, True
f = next_freq
channel += 1
return f, False
def work(lvl):
global flag
global f
global EOCF
global signal_arr
y = np.asarray(lvl, dtype=np.complex64).ravel()
signal_arr = np.concatenate((signal_arr, y), axis=None)
if flag == 0 and len(signal_arr) >= PARAMS['point_amount']:
median(signal_arr[:PARAMS['point_amount']])
signal_arr = np.array([], dtype=np.complex64)
if flag == 0:
f, _ = advance_freq()
return f, EOCF
if len(signal_arr) >= PARAMS['split_size']:
send_data(signal_arr[:PARAMS['split_size']])
flag = 0
signal_arr = np.array([], dtype=np.complex64)
f, _ = advance_freq()
return f, EOCF
return f, EOCF