#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: get_center_freq # GNU Radio version: 3.8.1.0 from gnuradio import blocks from gnuradio import gr import sys import signal import embedded_3300 as my_freq # embedded python module import osmosdr import time import threading import subprocess import os from common.runtime import load_root_env, resolve_hackrf_index load_root_env(__file__) def get_hack_id(): return resolve_hackrf_index('hack_3300', 'src/main_3300.py') serial_number = os.getenv('hack_3300') pos = None output = [] try: # command = '/home/orangepi/hackrf/host/build/hackrf-tools/src/hackrf_info' command = 'lsusb -v -d 1d50:6089 | grep iSerial' output.append(subprocess.check_output(command, shell=True, text=True)) # indexes = [line.split(":")[1].strip() for line in output_lines if "Index" in line] # serial_numbers = [line.split(":")[1].strip() for line in output_lines if "Serial number" in line] # print(indexes) # print(serial_numbers) # for i, number in enumerate(serial_numbers): # if number == serial_number: # pos = i # break # if pos is not None: # id = indexes[pos] # else: # print('Такого хака нет!') except subprocess.CalledProcessError as e: print(f"Команда завершилась с кодом возврата {e.returncode}") print(e) print(output) output_lines = output[0].strip().split('\n') print(output_lines) serial_numbers = [line.split()[-1] for line in output_lines] print(serial_numbers) for i, number in enumerate(serial_numbers): if number == serial_number: id = i break if id is not None: print('HackId is: {0}'.format(id)) return str(id) else: print('Такого хака нет!') class get_center_freq(gr.top_block): def __init__(self): gr.top_block.__init__(self, "get_center_freq") ################################################## # Variables ################################################## self.prob_freq = prob_freq = 0 self.top_peaks_amount = top_peaks_amount = 20 self.samp_rate = samp_rate = 20e6 self.poll_rate = poll_rate = 10000 self.num_points = num_points = 8192 self.flag = flag = 1 self.decimation = decimation = 1 self.center_freq = center_freq = my_freq.work(prob_freq) ################################################## # Blocks ################################################## self.probSigVec = blocks.probe_signal_vc(4096) self.rtlsdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + 'hackrf=' + get_hack_id() ) self.rtlsdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) self.rtlsdr_source_0.set_sample_rate(samp_rate) self.rtlsdr_source_0.set_center_freq(center_freq, 0) self.rtlsdr_source_0.set_freq_corr(0, 0) self.rtlsdr_source_0.set_gain(100, 0) self.rtlsdr_source_0.set_if_gain(100, 0) self.rtlsdr_source_0.set_bb_gain(100, 0) self.rtlsdr_source_0.set_antenna('', 0) self.rtlsdr_source_0.set_bandwidth(0, 0) self.rtlsdr_source_0.set_min_output_buffer(4096) def _prob_freq_probe(): while True: val = self.probSigVec.level() try: self.set_prob_freq(val) except AttributeError: pass time.sleep(1.0 / (poll_rate)) _prob_freq_thread = threading.Thread(target=_prob_freq_probe) _prob_freq_thread.daemon = True _prob_freq_thread.start() self.blocks_stream_to_vector_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, 4096) ################################################## # Connections ################################################## self.connect((self.blocks_stream_to_vector_1, 0), (self.probSigVec, 0)) self.connect((self.rtlsdr_source_0, 0), (self.blocks_stream_to_vector_1, 0)) def get_prob_freq(self): return self.prob_freq def set_prob_freq(self, prob_freq): self.prob_freq = prob_freq self.set_center_freq(my_freq.work(self.prob_freq)) def get_top_peaks_amount(self): return self.top_peaks_amount def set_top_peaks_amount(self, top_peaks_amount): self.top_peaks_amount = top_peaks_amount def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.rtlsdr_source_0.set_sample_rate(self.samp_rate) def get_poll_rate(self): return self.poll_rate def set_poll_rate(self, poll_rate): self.poll_rate = poll_rate def get_num_points(self): return self.num_points def set_num_points(self, num_points): self.num_points = num_points def get_flag(self): return self.flag def set_flag(self, flag): self.flag = flag def get_decimation(self): return self.decimation def set_decimation(self, decimation): self.decimation = decimation def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq self.rtlsdr_source_0.set_center_freq(self.center_freq, 0) def main(top_block_cls=get_center_freq, options=None): #for k in range(0, 3): # light_diods_on_boot() tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() try: print('СЕРВИСНАЯ ИНФОРМАЦИЯ: ') print('debug_flag: ', my_freq.debug_flag) print('save_data_flag: ', my_freq.save_data_flag) print('send_to_module_flag: ', my_freq.send_to_module_flag) #print('multiply_factor: ', float(os.getenv('multiply_factor_' + '1200'))) #print('multiply_factor: ', float(os.getenv('multiply_factor_' + '715'))) except EOFError: pass #tb.stop() tb.wait() if __name__ == '__main__': main()