from gnuradio import blocks, gr, zeromq import signal import sys import threading import time import compose_send_data_915 as my_freq from common.runtime import load_root_env from common.shared_stream_addrs import SHARED_915_ADDR, SHARED_VECTOR_LEN load_root_env(__file__) class get_center_freq(gr.top_block): def __init__(self): gr.top_block.__init__(self, 'get_center_freq') self.prob_freq = 0 self.poll_rate = 10000 self.vector_len = SHARED_VECTOR_LEN self.center_freq = 0 self.shared_addr = SHARED_915_ADDR self._stop_polling = threading.Event() self._prob_freq_thread = None self.probSigVec = blocks.probe_signal_vc(self.vector_len) self.shared_source_0 = zeromq.pull_source( gr.sizeof_gr_complex, self.vector_len, self.shared_addr, 100, False, -1, False, ) self.connect((self.shared_source_0, 0), (self.probSigVec, 0)) def start_polling(self): if self._prob_freq_thread is not None: return def _prob_freq_probe(): while not self._stop_polling.is_set(): self.set_prob_freq(self.probSigVec.level()) time.sleep(1.0 / self.poll_rate) self._prob_freq_thread = threading.Thread(target=_prob_freq_probe, daemon=True) self._prob_freq_thread.start() def get_prob_freq(self): return self.prob_freq def set_prob_freq(self, prob_freq): self.prob_freq = prob_freq self.center_freq = my_freq.work(self.prob_freq)[0] def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq def close(self): self._stop_polling.set() self.stop() self.wait() def main(top_block_cls=get_center_freq, options=None): time.sleep(3) tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.close() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.start_polling() try: print('shared_pull_addr:', SHARED_915_ADDR) except EOFError: pass tb.wait() if __name__ == '__main__': main()