You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/src/main_915.py

105 lines
3.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from gnuradio import blocks
from gnuradio import gr
import signal
import sys
import threading
import time
import osmosdr
import embedded_915 as my_freq
from common.runtime import load_root_env, resolve_hackrf_index
load_root_env(__file__)
def get_hack_id():
return resolve_hackrf_index('hack_915', 'src/main_915.py')
class get_center_freq(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, 'get_center_freq')
self.prob_freq = 0
self.samp_rate = 20e6
self.poll_rate = 10000
self.vector_len = 4096
self.center_freq = my_freq.work(self.prob_freq)
self._prob_freq_thread = None
self.probSigVec = blocks.probe_signal_vc(self.vector_len)
self.rtlsdr_source_0 = osmosdr.source(
args='numchan=' + str(1) + ' ' + 'hackrf=' + get_hack_id()
)
self.rtlsdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
self.rtlsdr_source_0.set_sample_rate(self.samp_rate)
self.rtlsdr_source_0.set_center_freq(self.center_freq, 0)
self.rtlsdr_source_0.set_freq_corr(0, 0)
self.rtlsdr_source_0.set_gain(24, 0)
self.rtlsdr_source_0.set_if_gain(24, 0)
self.rtlsdr_source_0.set_bb_gain(100, 0)
self.rtlsdr_source_0.set_antenna('', 0)
self.rtlsdr_source_0.set_bandwidth(0, 0)
self.rtlsdr_source_0.set_min_output_buffer(self.vector_len)
self.blocks_stream_to_vector_1 = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, self.vector_len)
self.connect((self.blocks_stream_to_vector_1, 0), (self.probSigVec, 0))
self.connect((self.rtlsdr_source_0, 0), (self.blocks_stream_to_vector_1, 0))
def _prob_freq_probe():
while True:
val = self.probSigVec.level()
try:
self.set_prob_freq(val)
except AttributeError:
pass
time.sleep(1.0 / self.poll_rate)
self._prob_freq_thread = threading.Thread(target=_prob_freq_probe, daemon=True)
self._prob_freq_thread.start()
def get_prob_freq(self):
return self.prob_freq
def set_prob_freq(self, prob_freq):
self.prob_freq = prob_freq
self.set_center_freq(my_freq.work(self.prob_freq))
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
self.rtlsdr_source_0.set_center_freq(self.center_freq, 0)
def main(top_block_cls=get_center_freq, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
try:
print('СЕРВИСНАЯ ИНФОРМАЦИЯ: ')
print('debug_flag: ', my_freq.debug_flag)
print('save_data_flag: ', my_freq.save_data_flag)
print('send_to_module_flag: ', my_freq.send_to_module_flag)
except EOFError:
pass
tb.wait()
if __name__ == '__main__':
main()