You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/orange_scripts/main_915.py

173 lines
5.3 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from gnuradio import blocks, gr
import sys
import signal
import compose_send_data_915 as my_freq
import osmosdr
import time
import threading
import subprocess
import os
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('HACKID_915', 'orange_scripts/main_915.py')
serial_number = os.getenv('HACKID_915')
pos = None
output = []
try:
command = 'lsusb -v -d 1d50:6089 | grep iSerial'
output.append(subprocess.check_output(command, shell=True, text=True))
except subprocess.CalledProcessError as e:
print(f"Команда завершилась с кодом возврата {e.returncode}")
print(e)
print(output)
output_lines = output[0].strip().split('\n')
print(output_lines)
serial_numbers = [line.split()[-1] for line in output_lines]
print(serial_numbers)
for i, number in enumerate(serial_numbers):
if number == serial_number:
id = i
break
if id is not None:
print('HackId is: {0}'.format(id))
return str(id)
else:
print('Такого хака нет!')
class get_center_freq(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "get_center_freq")
##################################################
# Variables
##################################################
self.prob_freq = prob_freq = 0
self.top_peaks_amount = top_peaks_amount = 20
self.samp_rate = samp_rate = 20e6
self.poll_rate = poll_rate = 10000
self.num_points = num_points = 8192
self.flag = flag = 1
self.decimation = decimation = 1
self.center_freq = center_freq = my_freq.work(prob_freq)[0]
##################################################
# Blocks
##################################################
self.probSigVec = blocks.probe_signal_vc(4096)
self.rtlsdr_source_0 = osmosdr.source(
args="numchan=" + str(1) + " " + 'hackrf=' + get_hack_id()
)
self.rtlsdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
self.rtlsdr_source_0.set_sample_rate(samp_rate)
self.rtlsdr_source_0.set_center_freq(center_freq, 0)
self.rtlsdr_source_0.set_freq_corr(0, 0)
self.rtlsdr_source_0.set_gain(16, 0)
self.rtlsdr_source_0.set_if_gain(16, 0)
self.rtlsdr_source_0.set_bb_gain(0, 0)
self.rtlsdr_source_0.set_antenna('', 0)
self.rtlsdr_source_0.set_bandwidth(0, 0)
self.rtlsdr_source_0.set_min_output_buffer(4096)
def _prob_freq_probe():
while True:
val = self.probSigVec.level()
try:
self.set_prob_freq(val)
except AttributeError:
pass
time.sleep(1.0 / (poll_rate))
_prob_freq_thread = threading.Thread(target=_prob_freq_probe)
_prob_freq_thread.daemon = True
_prob_freq_thread.start()
self.blocks_stream_to_vector_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, 4096)
##################################################
# Connections
##################################################
self.connect((self.blocks_stream_to_vector_1, 0), (self.probSigVec, 0))
self.connect((self.rtlsdr_source_0, 0), (self.blocks_stream_to_vector_1, 0))
def get_prob_freq(self):
return self.prob_freq
def set_prob_freq(self, prob_freq):
self.prob_freq = prob_freq
self.set_center_freq(my_freq.work(self.prob_freq)[0])
def get_top_peaks_amount(self):
return self.top_peaks_amount
def set_top_peaks_amount(self, top_peaks_amount):
self.top_peaks_amount = top_peaks_amount
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.rtlsdr_source_0.set_sample_rate(self.samp_rate)
def get_poll_rate(self):
return self.poll_rate
def set_poll_rate(self, poll_rate):
self.poll_rate = poll_rate
def get_num_points(self):
return self.num_points
def set_num_points(self, num_points):
self.num_points = num_points
def get_flag(self):
return self.flag
def set_flag(self, flag):
self.flag = flag
def get_decimation(self):
return self.decimation
def set_decimation(self, decimation):
self.decimation = decimation
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
self.rtlsdr_source_0.set_center_freq(self.center_freq, 0)
def main(top_block_cls=get_center_freq, options=None):
time.sleep(3)
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
try:
input('Press Enter to quit: ')
except EOFError:
pass
tb.wait()
if __name__ == '__main__':
main()