#!/usr/bin/env python3 # -*- coding: utf-8 -*- from gnuradio import blocks from gnuradio import gr import signal import sys import threading import time import osmosdr from common.runtime import load_root_env, resolve_hackrf_index from common.shared_stream_addrs import SHARED_VECTOR_LEN from shared_router_868_915 import SharedRouter868915 load_root_env(__file__) def get_hack_id(): return resolve_hackrf_index('hack_868', 'src/main_868_915_router.py') class get_center_freq(gr.top_block): def __init__(self): gr.top_block.__init__(self, 'get_center_freq') self.prob_freq = 0 self.poll_rate = 10000 self.vector_len = SHARED_VECTOR_LEN self.router = SharedRouter868915() self.active_lane = self.router.get_active_name() self.center_freq = self.router.get_start_freq() self._stop_polling = threading.Event() self._prob_freq_thread = None self.probSigVec = blocks.probe_signal_vc(self.vector_len) self.rtlsdr_source_0 = osmosdr.source( args='numchan=' + str(1) + ' ' + 'hackrf=' + get_hack_id() ) self.rtlsdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) self.rtlsdr_source_0.set_freq_corr(0, 0) self.rtlsdr_source_0.set_antenna('', 0) self.rtlsdr_source_0.set_min_output_buffer(self.vector_len) self.apply_active_frontend() self.rtlsdr_source_0.set_center_freq(self.center_freq, 0) self.blocks_stream_to_vector_1 = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, self.vector_len) self.connect((self.rtlsdr_source_0, 0), (self.blocks_stream_to_vector_1, 0)) self.connect((self.blocks_stream_to_vector_1, 0), (self.probSigVec, 0)) def start_polling(self): if self._prob_freq_thread is not None: return def _prob_freq_probe(): while not self._stop_polling.is_set(): self.set_prob_freq(self.probSigVec.level()) time.sleep(1.0 / self.poll_rate) self._prob_freq_thread = threading.Thread(target=_prob_freq_probe, daemon=True) self._prob_freq_thread.start() def apply_active_frontend(self): frontend = self.router.get_active_frontend() self.rtlsdr_source_0.set_sample_rate(frontend['sample_rate']) self.rtlsdr_source_0.set_gain(frontend['gain'], 0) self.rtlsdr_source_0.set_if_gain(frontend['if_gain'], 0) self.rtlsdr_source_0.set_bb_gain(frontend['bb_gain'], 0) self.rtlsdr_source_0.set_bandwidth(frontend['bandwidth'], 0) def get_prob_freq(self): return self.prob_freq def set_prob_freq(self, prob_freq): self.prob_freq = prob_freq next_center, lane_switched = self.router.route_vector(self.prob_freq) if lane_switched: self.active_lane = self.router.get_active_name() self.apply_active_frontend() if next_center != self.center_freq: self.set_center_freq(next_center) def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq self.rtlsdr_source_0.set_center_freq(self.center_freq, 0) def close(self): self._stop_polling.set() try: self.router.close() finally: self.stop() self.wait() def main(top_block_cls=get_center_freq, options=None): tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.close() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.start_polling() try: print('shared_router_active_lane:', tb.router.get_active_name()) print('shared_router_start_freq:', tb.get_center_freq()) except EOFError: pass tb.wait() if __name__ == '__main__': main()