""" ./.venv-sdr/bin/python read_energy_wide.py \ --serial 0000000000000000a18c63dc2a83b813 \ --sample-rate 20000000 \ --base 6000 \ --roof 5700 \ --step 20 """ #!/usr/bin/env python3 import argparse import math import re import signal import subprocess import sys import time from dataclasses import dataclass from typing import Dict, List, Optional, Tuple try: import numpy as np except Exception as exc: print(f"numpy import failed: {exc}", file=sys.stderr) sys.exit(1) try: from gnuradio import blocks, gr import osmosdr except Exception as exc: print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr) print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy_wide.py", file=sys.stderr) sys.exit(1) EPS = 1e-20 @dataclass class ScanWindow: seq: int start_mhz: float end_mhz: float low_mhz: float high_mhz: float center_mhz: float status: str = "INIT" rms: Optional[float] = None power_lin: Optional[float] = None dbfs: Optional[float] = None samples: int = 0 updated_at: float = 0.0 error: str = "" pass_no: int = 0 class WideProbeTop(gr.top_block): def __init__( self, index: int, center_freq_hz: float, sample_rate: float, vec_len: int, gain: float, if_gain: float, bb_gain: float, ): super().__init__("hackrf_energy_wide_probe") self.probe = blocks.probe_signal_vc(vec_len) self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") self.src.set_time_unknown_pps(osmosdr.time_spec_t()) self.src.set_sample_rate(sample_rate) self.src.set_center_freq(center_freq_hz, 0) try: self.src.set_freq_corr(0, 0) except Exception: pass try: self.src.set_gain_mode(False, 0) except Exception: pass for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): try: getattr(self.src, fn)(val, 0) except Exception: pass try: self.src.set_bandwidth(0, 0) except Exception: pass try: self.src.set_antenna("", 0) except Exception: pass self.connect((self.src, 0), (self.stream_to_vec, 0)) self.connect((self.stream_to_vec, 0), (self.probe, 0)) def tune(self, freq_hz: float) -> None: self.src.set_center_freq(freq_hz, 0) def read_metrics(self) -> Tuple[float, float, float, int]: arr = np.asarray(self.probe.level(), dtype=np.complex64) if arr.size == 0: raise RuntimeError("no samples") power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) rms = math.sqrt(max(power_lin, 0.0)) dbfs = 10.0 * math.log10(max(power_lin, EPS)) return rms, power_lin, dbfs, int(arr.size) def read_window(self, settle: float, avg_reads: int, pause_between_reads: float) -> Tuple[float, float, float, int]: if settle > 0: time.sleep(settle) read_count = max(1, avg_reads) powers: List[float] = [] sample_sizes: List[int] = [] last_error: Optional[Exception] = None for idx in range(read_count): deadline = time.time() + 1.0 while True: try: _, power_lin, _, samples = self.read_metrics() powers.append(power_lin) sample_sizes.append(samples) break except Exception as exc: last_error = exc if time.time() >= deadline: raise RuntimeError(str(last_error) if last_error else "no samples") time.sleep(0.02) if idx + 1 < read_count and pause_between_reads > 0: time.sleep(pause_between_reads) power_lin = float(sum(powers) / len(powers)) rms = math.sqrt(max(power_lin, 0.0)) dbfs = 10.0 * math.log10(max(power_lin, EPS)) samples = int(sum(sample_sizes) / len(sample_sizes)) return rms, power_lin, dbfs, samples def parse_hackrf_info() -> Dict[str, int]: try: proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) except FileNotFoundError: raise RuntimeError("hackrf_info not found") except subprocess.TimeoutExpired: raise RuntimeError("hackrf_info timeout") text = (proc.stdout or "") + "\n" + (proc.stderr or "") out: Dict[str, int] = {} cur_idx: Optional[int] = None for line in text.splitlines(): m = re.search(r"^Index:\s*(\d+)", line) if m: cur_idx = int(m.group(1)) continue m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) if m and cur_idx is not None: out[m.group(1).lower()] = cur_idx if not out: raise RuntimeError("no devices parsed from hackrf_info") return out def fmt(value: Optional[float], spec: str) -> str: return "-" if value is None else format(value, spec) def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[ScanWindow]: if step_mhz <= 0: raise ValueError("step must be > 0") if base_mhz == roof_mhz: raise ValueError("base and roof must be different") direction = -1.0 if roof_mhz < base_mhz else 1.0 edge = base_mhz seq = 1 windows: List[ScanWindow] = [] while True: next_edge = edge + direction * step_mhz if direction < 0 and next_edge < roof_mhz: next_edge = roof_mhz if direction > 0 and next_edge > roof_mhz: next_edge = roof_mhz low_mhz = min(edge, next_edge) high_mhz = max(edge, next_edge) center_mhz = (low_mhz + high_mhz) / 2.0 windows.append( ScanWindow( seq=seq, start_mhz=edge, end_mhz=next_edge, low_mhz=low_mhz, high_mhz=high_mhz, center_mhz=center_mhz, ) ) if next_edge == roof_mhz: break edge = next_edge seq += 1 return windows def render( windows: List[ScanWindow], serial: str, index: int, sample_rate: float, base_mhz: float, roof_mhz: float, step_mhz: float, started_at: float, pass_no: int, current_seq: int, ) -> None: now = time.time() capture_bw_mhz = sample_rate / 1e6 current_row = next((row for row in windows if row.seq == current_seq), None) best_row = max( (row for row in windows if row.status == "OK" and row.dbfs is not None), key=lambda row: row.dbfs if row.dbfs is not None else float("-inf"), default=None, ) print("\x1b[2J\x1b[H", end="") print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)") print( f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | " f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | " f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}" ) print() header = ( f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} " f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error" ) print(header) print("-" * len(header)) for row in windows: age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}" err = row.error if len(err) > 50: err = err[:47] + "..." marker = ">>>" if row.seq == current_seq else "" print( f"{marker:>3} {row.seq:>3} " f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} " f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} " f"{row.samples:>5} {age:>5} {err}" ) print() if best_row is not None: best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}" print( f"{'':>3} {'MAX':>3} " f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} " f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} " f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}" ) elif current_row is not None: current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}" print( f"{'':>3} {'MAX':>3} " f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} " f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} " f"{0:>5} {current_age:>5} no successful windows yet" ) print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.") sys.stdout.flush() def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy") parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info") parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz") parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz") parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz") parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz") parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)") parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window") parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)") parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite") parser.add_argument("--gain", type=float, default=16.0, help="General gain") parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain") parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") return parser def main() -> int: args = build_parser().parse_args() serial = args.serial.lower() try: windows = build_windows(args.base, args.roof, args.step) except ValueError as exc: print(f"invalid scan range: {exc}", file=sys.stderr) return 2 step_hz = args.step * 1e6 if args.sample_rate < step_hz: print( f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; " "this would leave gaps in the scan", file=sys.stderr, ) return 2 try: serial_to_index = parse_hackrf_info() except Exception as exc: print(f"hackrf discovery failed: {exc}", file=sys.stderr) return 3 index = serial_to_index.get(serial) if index is None: print(f"serial {serial} not found in hackrf_info", file=sys.stderr) print("available serials:", file=sys.stderr) for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]): print(f" idx={item_index} serial={item_serial}", file=sys.stderr) return 4 stop_requested = False def on_signal(signum, frame): nonlocal stop_requested stop_requested = True signal.signal(signal.SIGINT, on_signal) signal.signal(signal.SIGTERM, on_signal) probe: Optional[WideProbeTop] = None started_at = time.time() pass_no = 0 current_seq = windows[0].seq try: probe = WideProbeTop( index=index, center_freq_hz=windows[0].center_mhz * 1e6, sample_rate=args.sample_rate, vec_len=args.vec_len, gain=args.gain, if_gain=args.if_gain, bb_gain=args.bb_gain, ) probe.start() time.sleep(max(args.settle, 0.12)) while not stop_requested: pass_no += 1 for row in windows: if stop_requested: break current_seq = row.seq try: probe.tune(row.center_mhz * 1e6) rms, power_lin, dbfs, samples = probe.read_window( settle=args.settle, avg_reads=args.avg_reads, pause_between_reads=args.pause_between_reads, ) row.status = "OK" row.rms = rms row.power_lin = power_lin row.dbfs = dbfs row.samples = samples row.error = "" row.updated_at = time.time() row.pass_no = pass_no except Exception as exc: row.status = "ERR" row.error = str(exc) row.updated_at = time.time() render( windows=windows, serial=serial, index=index, sample_rate=args.sample_rate, base_mhz=args.base, roof_mhz=args.roof, step_mhz=args.step, started_at=started_at, pass_no=pass_no, current_seq=current_seq, ) if args.passes > 0 and pass_no >= args.passes: break except Exception as exc: print(f"scanner failed: {exc}", file=sys.stderr) return 5 finally: if probe is not None: try: probe.stop() probe.wait() except Exception: pass return 0 if __name__ == "__main__": raise SystemExit(main()) #!/usr/bin/env python3 import argparse import math import re import signal import subprocess import sys import time from dataclasses import dataclass from typing import Dict, List, Optional, Tuple try: import numpy as np except Exception as exc: print(f"numpy import failed: {exc}", file=sys.stderr) sys.exit(1) try: from gnuradio import blocks, gr import osmosdr except Exception as exc: print(f"gnuradio/osmosdr import failed: {exc}", file=sys.stderr) print("Run with the SDR venv, e.g. .venv-sdr/bin/python read_energy_wide.py", file=sys.stderr) sys.exit(1) EPS = 1e-20 @dataclass class ScanWindow: seq: int start_mhz: float end_mhz: float low_mhz: float high_mhz: float center_mhz: float status: str = "INIT" rms: Optional[float] = None power_lin: Optional[float] = None dbfs: Optional[float] = None samples: int = 0 updated_at: float = 0.0 error: str = "" pass_no: int = 0 class WideProbeTop(gr.top_block): def __init__( self, index: int, center_freq_hz: float, sample_rate: float, vec_len: int, gain: float, if_gain: float, bb_gain: float, ): super().__init__("hackrf_energy_wide_probe") self.probe = blocks.probe_signal_vc(vec_len) self.stream_to_vec = blocks.stream_to_vector(gr.sizeof_gr_complex * 1, vec_len) self.src = osmosdr.source(args=f"numchan=1 hackrf={index}") self.src.set_time_unknown_pps(osmosdr.time_spec_t()) self.src.set_sample_rate(sample_rate) self.src.set_center_freq(center_freq_hz, 0) try: self.src.set_freq_corr(0, 0) except Exception: pass try: self.src.set_gain_mode(False, 0) except Exception: pass for fn, val in (("set_gain", gain), ("set_if_gain", if_gain), ("set_bb_gain", bb_gain)): try: getattr(self.src, fn)(val, 0) except Exception: pass try: self.src.set_bandwidth(0, 0) except Exception: pass try: self.src.set_antenna("", 0) except Exception: pass self.connect((self.src, 0), (self.stream_to_vec, 0)) self.connect((self.stream_to_vec, 0), (self.probe, 0)) def tune(self, freq_hz: float) -> None: self.src.set_center_freq(freq_hz, 0) def read_metrics(self) -> Tuple[float, float, float, int]: arr = np.asarray(self.probe.level(), dtype=np.complex64) if arr.size == 0: raise RuntimeError("no samples") power_lin = float(np.mean(arr.real * arr.real + arr.imag * arr.imag)) rms = math.sqrt(max(power_lin, 0.0)) dbfs = 10.0 * math.log10(max(power_lin, EPS)) return rms, power_lin, dbfs, int(arr.size) def read_window(self, settle: float, avg_reads: int, pause_between_reads: float) -> Tuple[float, float, float, int]: if settle > 0: time.sleep(settle) read_count = max(1, avg_reads) powers: List[float] = [] sample_sizes: List[int] = [] last_error: Optional[Exception] = None for idx in range(read_count): deadline = time.time() + 1.0 while True: try: _, power_lin, _, samples = self.read_metrics() powers.append(power_lin) sample_sizes.append(samples) break except Exception as exc: last_error = exc if time.time() >= deadline: raise RuntimeError(str(last_error) if last_error else "no samples") time.sleep(0.02) if idx + 1 < read_count and pause_between_reads > 0: time.sleep(pause_between_reads) power_lin = float(sum(powers) / len(powers)) rms = math.sqrt(max(power_lin, 0.0)) dbfs = 10.0 * math.log10(max(power_lin, EPS)) samples = int(sum(sample_sizes) / len(sample_sizes)) return rms, power_lin, dbfs, samples def parse_hackrf_info() -> Dict[str, int]: try: proc = subprocess.run(["hackrf_info"], capture_output=True, text=True, timeout=15) except FileNotFoundError: raise RuntimeError("hackrf_info not found") except subprocess.TimeoutExpired: raise RuntimeError("hackrf_info timeout") text = (proc.stdout or "") + "\n" + (proc.stderr or "") out: Dict[str, int] = {} cur_idx: Optional[int] = None for line in text.splitlines(): m = re.search(r"^Index:\s*(\d+)", line) if m: cur_idx = int(m.group(1)) continue m = re.search(r"^Serial number:\s*([0-9a-fA-F]+)", line) if m and cur_idx is not None: out[m.group(1).lower()] = cur_idx if not out: raise RuntimeError("no devices parsed from hackrf_info") return out def fmt(value: Optional[float], spec: str) -> str: return "-" if value is None else format(value, spec) def build_windows(base_mhz: float, roof_mhz: float, step_mhz: float) -> List[ScanWindow]: if step_mhz <= 0: raise ValueError("step must be > 0") if base_mhz == roof_mhz: raise ValueError("base and roof must be different") direction = -1.0 if roof_mhz < base_mhz else 1.0 edge = base_mhz seq = 1 windows: List[ScanWindow] = [] while True: next_edge = edge + direction * step_mhz if direction < 0 and next_edge < roof_mhz: next_edge = roof_mhz if direction > 0 and next_edge > roof_mhz: next_edge = roof_mhz low_mhz = min(edge, next_edge) high_mhz = max(edge, next_edge) center_mhz = (low_mhz + high_mhz) / 2.0 windows.append( ScanWindow( seq=seq, start_mhz=edge, end_mhz=next_edge, low_mhz=low_mhz, high_mhz=high_mhz, center_mhz=center_mhz, ) ) if next_edge == roof_mhz: break edge = next_edge seq += 1 return windows def render( windows: List[ScanWindow], serial: str, index: int, sample_rate: float, base_mhz: float, roof_mhz: float, step_mhz: float, started_at: float, pass_no: int, current_seq: int, ) -> None: now = time.time() capture_bw_mhz = sample_rate / 1e6 current_row = next((row for row in windows if row.seq == current_seq), None) best_row = max( (row for row in windows if row.status == "OK" and row.dbfs is not None), key=lambda row: row.dbfs if row.dbfs is not None else float("-inf"), default=None, ) print("\x1b[2J\x1b[H", end="") print("HackRF Wide Energy Monitor (relative power: RMS / linear / dBFS)") print( f"serial: {serial} | idx: {index} | sample-rate: {capture_bw_mhz:.3f} MHz | " f"scan: {base_mhz:.3f}->{roof_mhz:.3f} MHz step {step_mhz:.3f} MHz | " f"pass: {pass_no} | uptime: {int(now-started_at)}s | {time.strftime('%Y-%m-%d %H:%M:%S')}" ) print() header = ( f"{'cur':>3} {'seq':>3} {'window MHz':>23} {'center':>9} {'status':>8} " f"{'dBFS':>9} {'rms':>10} {'power':>12} {'N':>5} {'age':>5} error" ) print(header) print("-" * len(header)) for row in windows: age = "-" if row.updated_at <= 0 else f"{(now-row.updated_at):.1f}" err = row.error if len(err) > 50: err = err[:47] + "..." marker = ">>>" if row.seq == current_seq else "" print( f"{marker:>3} {row.seq:>3} " f"{f'{row.high_mhz:.3f}-{row.low_mhz:.3f}':>23} {row.center_mhz:>9.3f} {row.status:>8} " f"{fmt(row.dbfs, '.2f'):>9} {fmt(row.rms, '.6f'):>10} {fmt(row.power_lin, '.8f'):>12} " f"{row.samples:>5} {age:>5} {err}" ) print() if best_row is not None: best_age = "-" if best_row.updated_at <= 0 else f"{(now-best_row.updated_at):.1f}" print( f"{'':>3} {'MAX':>3} " f"{f'{best_row.high_mhz:.3f}-{best_row.low_mhz:.3f}':>23} {best_row.center_mhz:>9.3f} {best_row.status:>8} " f"{fmt(best_row.dbfs, '.2f'):>9} {fmt(best_row.rms, '.6f'):>10} {fmt(best_row.power_lin, '.8f'):>12} " f"{best_row.samples:>5} {best_age:>5} pass={best_row.pass_no}" ) elif current_row is not None: current_age = "-" if current_row.updated_at <= 0 else f"{(now-current_row.updated_at):.1f}" print( f"{'':>3} {'MAX':>3} " f"{f'{current_row.high_mhz:.3f}-{current_row.low_mhz:.3f}':>23} {current_row.center_mhz:>9.3f} {'INIT':>8} " f"{fmt(None, '.2f'):>9} {fmt(None, '.6f'):>10} {fmt(None, '.8f'):>12} " f"{0:>5} {current_age:>5} no successful windows yet" ) print("Ctrl+C to stop. Window width equals step; sample-rate must be >= step to cover each window.") sys.stdout.flush() def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Retune one HackRF across a wide frequency range and measure energy") parser.add_argument("--serial", required=True, help="HackRF serial number from hackrf_info") parser.add_argument("--sample-rate", type=float, required=True, help="Sample rate in Hz") parser.add_argument("--base", type=float, required=True, help="Scan start edge in MHz") parser.add_argument("--roof", type=float, required=True, help="Scan end edge in MHz") parser.add_argument("--step", type=float, required=True, help="Window width / retune step in MHz") parser.add_argument("--vec-len", type=int, default=4096, help="Probe vector length") parser.add_argument("--settle", type=float, default=0.12, help="Wait time after retune before reading (s)") parser.add_argument("--avg-reads", type=int, default=3, help="How many probe reads to average per window") parser.add_argument("--pause-between-reads", type=float, default=0.02, help="Pause between averaged reads (s)") parser.add_argument("--passes", type=int, default=0, help="Number of sweep passes, 0 means infinite") parser.add_argument("--gain", type=float, default=16.0, help="General gain") parser.add_argument("--if-gain", type=float, default=16.0, help="IF gain") parser.add_argument("--bb-gain", type=float, default=16.0, help="BB gain") return parser def main() -> int: args = build_parser().parse_args() serial = args.serial.lower() try: windows = build_windows(args.base, args.roof, args.step) except ValueError as exc: print(f"invalid scan range: {exc}", file=sys.stderr) return 2 step_hz = args.step * 1e6 if args.sample_rate < step_hz: print( f"sample-rate {args.sample_rate:.0f} Hz is smaller than step window {step_hz:.0f} Hz; " "this would leave gaps in the scan", file=sys.stderr, ) return 2 try: serial_to_index = parse_hackrf_info() except Exception as exc: print(f"hackrf discovery failed: {exc}", file=sys.stderr) return 3 index = serial_to_index.get(serial) if index is None: print(f"serial {serial} not found in hackrf_info", file=sys.stderr) print("available serials:", file=sys.stderr) for item_serial, item_index in sorted(serial_to_index.items(), key=lambda item: item[1]): print(f" idx={item_index} serial={item_serial}", file=sys.stderr) return 4 stop_requested = False def on_signal(signum, frame): nonlocal stop_requested stop_requested = True signal.signal(signal.SIGINT, on_signal) signal.signal(signal.SIGTERM, on_signal) probe: Optional[WideProbeTop] = None started_at = time.time() pass_no = 0 current_seq = windows[0].seq try: probe = WideProbeTop( index=index, center_freq_hz=windows[0].center_mhz * 1e6, sample_rate=args.sample_rate, vec_len=args.vec_len, gain=args.gain, if_gain=args.if_gain, bb_gain=args.bb_gain, ) probe.start() time.sleep(max(args.settle, 0.12)) while not stop_requested: pass_no += 1 for row in windows: if stop_requested: break current_seq = row.seq try: probe.tune(row.center_mhz * 1e6) rms, power_lin, dbfs, samples = probe.read_window( settle=args.settle, avg_reads=args.avg_reads, pause_between_reads=args.pause_between_reads, ) row.status = "OK" row.rms = rms row.power_lin = power_lin row.dbfs = dbfs row.samples = samples row.error = "" row.updated_at = time.time() row.pass_no = pass_no except Exception as exc: row.status = "ERR" row.error = str(exc) row.updated_at = time.time() render( windows=windows, serial=serial, index=index, sample_rate=args.sample_rate, base_mhz=args.base, roof_mhz=args.roof, step_mhz=args.step, started_at=started_at, pass_no=pass_no, current_seq=current_seq, ) if args.passes > 0 and pass_no >= args.passes: break except Exception as exc: print(f"scanner failed: {exc}", file=sys.stderr) return 5 finally: if probe is not None: try: probe.stop() probe.wait() except Exception: pass return 0 if __name__ == "__main__": raise SystemExit(main())