From 0d23ffd0455050a9fb70bf36e61fd657b891d40a Mon Sep 17 00:00:00 2001 From: Sergey Revyakin Date: Mon, 27 Apr 2026 12:14:32 +0700 Subject: [PATCH] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=2091?= =?UTF-8?q?5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- orange_scripts/main_915.py | 174 +++++++++++++++++++++++++++---------- 1 file changed, 127 insertions(+), 47 deletions(-) diff --git a/orange_scripts/main_915.py b/orange_scripts/main_915.py index 39e8577..1d2ba95 100644 --- a/orange_scripts/main_915.py +++ b/orange_scripts/main_915.py @@ -1,88 +1,168 @@ -from gnuradio import blocks, gr, zeromq -import signal +from gnuradio import blocks, gr import sys -import threading -import time - +import signal import compose_send_data_915 as my_freq - -from common.runtime import load_root_env -from common.shared_stream_addrs import SHARED_915_ADDR, SHARED_VECTOR_LEN +import osmosdr +import time +import threading +import subprocess +import os +from common.runtime import load_root_env, resolve_hackrf_index load_root_env(__file__) +def get_hack_id(): + return resolve_hackrf_index('HACKID_915', 'orange_scripts/main_915.py') + serial_number = os.getenv('HACKID_1200') + pos = None + output = [] + try: + command = 'lsusb -v -d 1d50:6089 | grep iSerial' + output.append(subprocess.check_output(command, shell=True, text=True)) + except subprocess.CalledProcessError as e: + print(f"Команда завершилась с кодом возврата {e.returncode}") + print(e) + print(output) + output_lines = output[0].strip().split('\n') + print(output_lines) + serial_numbers = [line.split()[-1] for line in output_lines] + print(serial_numbers) + for i, number in enumerate(serial_numbers): + if number == serial_number: + id = i + break + if id is not None: + print('HackId is: {0}'.format(id)) + return str(id) + else: + print('Такого хака нет!') + class get_center_freq(gr.top_block): + def __init__(self): - gr.top_block.__init__(self, 'get_center_freq') - - self.prob_freq = 0 - self.poll_rate = 10000 - self.vector_len = SHARED_VECTOR_LEN - self.center_freq = 0 - self.shared_addr = SHARED_915_ADDR - self._stop_polling = threading.Event() - self._prob_freq_thread = None - - self.probSigVec = blocks.probe_signal_vc(self.vector_len) - self.shared_source_0 = zeromq.pull_source( - gr.sizeof_gr_complex, - self.vector_len, - self.shared_addr, - 100, - False, - -1, - False, + gr.top_block.__init__(self, "get_center_freq") + + ################################################## + # Variables + ################################################## + self.prob_freq = prob_freq = 0 + self.top_peaks_amount = top_peaks_amount = 20 + self.samp_rate = samp_rate = 20e6 + self.poll_rate = poll_rate = 10000 + self.num_points = num_points = 8192 + self.flag = flag = 1 + self.decimation = decimation = 1 + self.center_freq = center_freq = my_freq.work(prob_freq)[0] + + ################################################## + # Blocks + ################################################## + self.probSigVec = blocks.probe_signal_vc(4096) + self.rtlsdr_source_0 = osmosdr.source( + args="numchan=" + str(1) + " " + 'hackrf=' + get_hack_id() ) - self.connect((self.shared_source_0, 0), (self.probSigVec, 0)) - - def start_polling(self): - if self._prob_freq_thread is not None: - return + self.rtlsdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) + self.rtlsdr_source_0.set_sample_rate(samp_rate) + self.rtlsdr_source_0.set_center_freq(center_freq, 0) + self.rtlsdr_source_0.set_freq_corr(0, 0) + self.rtlsdr_source_0.set_gain(16, 0) + self.rtlsdr_source_0.set_if_gain(16, 0) + self.rtlsdr_source_0.set_bb_gain(0, 0) + self.rtlsdr_source_0.set_antenna('', 0) + self.rtlsdr_source_0.set_bandwidth(0, 0) + self.rtlsdr_source_0.set_min_output_buffer(4096) def _prob_freq_probe(): - while not self._stop_polling.is_set(): - self.set_prob_freq(self.probSigVec.level()) - time.sleep(1.0 / self.poll_rate) + while True: + + val = self.probSigVec.level() + try: + self.set_prob_freq(val) + except AttributeError: + pass + time.sleep(1.0 / (poll_rate)) + _prob_freq_thread = threading.Thread(target=_prob_freq_probe) + _prob_freq_thread.daemon = True + _prob_freq_thread.start() + + self.blocks_stream_to_vector_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, 4096) + - self._prob_freq_thread = threading.Thread(target=_prob_freq_probe, daemon=True) - self._prob_freq_thread.start() + + ################################################## + # Connections + ################################################## + self.connect((self.blocks_stream_to_vector_1, 0), (self.probSigVec, 0)) + self.connect((self.rtlsdr_source_0, 0), (self.blocks_stream_to_vector_1, 0)) def get_prob_freq(self): return self.prob_freq def set_prob_freq(self, prob_freq): self.prob_freq = prob_freq - self.center_freq = my_freq.work(self.prob_freq)[0] + self.set_center_freq(my_freq.work(self.prob_freq)[0]) + + def get_top_peaks_amount(self): + return self.top_peaks_amount + + def set_top_peaks_amount(self, top_peaks_amount): + self.top_peaks_amount = top_peaks_amount + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + self.rtlsdr_source_0.set_sample_rate(self.samp_rate) + + def get_poll_rate(self): + return self.poll_rate + + def set_poll_rate(self, poll_rate): + self.poll_rate = poll_rate + + def get_num_points(self): + return self.num_points + + def set_num_points(self, num_points): + self.num_points = num_points + + def get_flag(self): + return self.flag + + def set_flag(self, flag): + self.flag = flag + + def get_decimation(self): + return self.decimation + + def set_decimation(self, decimation): + self.decimation = decimation def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq - - def close(self): - self._stop_polling.set() - self.stop() - self.wait() + self.rtlsdr_source_0.set_center_freq(self.center_freq, 0) def main(top_block_cls=get_center_freq, options=None): time.sleep(3) tb = top_block_cls() - def sig_handler(sig=None, frame=None): - tb.close() + tb.stop() + tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() - tb.start_polling() try: - print('shared_pull_addr:', SHARED_915_ADDR) + input('Press Enter to quit: ') except EOFError: pass tb.wait()