sync
parent
4b185700bd
commit
815daf4be3
@ -0,0 +1,47 @@
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
_DEFAULT_FLAG_PATH = Path(__file__).resolve().parents[2] / "runtime" / "jammer_active.flag"
|
||||
_FLAG_PATH = Path(os.getenv("JAMMER_STATE_FILE", str(_DEFAULT_FLAG_PATH)))
|
||||
_CACHE_TTL_SEC = float(os.getenv("JAMMER_STATE_CACHE_TTL_SEC", "0.25"))
|
||||
_STALE_SEC = float(os.getenv("JAMMER_STATE_STALE_SEC", "5.0"))
|
||||
|
||||
_cached_value = False
|
||||
_cached_checked_monotonic = 0.0
|
||||
|
||||
|
||||
def _read_uncached() -> bool:
|
||||
try:
|
||||
stat = _FLAG_PATH.stat()
|
||||
if time.time() - stat.st_mtime > _STALE_SEC:
|
||||
return False
|
||||
return _FLAG_PATH.read_text(encoding="ascii").strip() == "1"
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def is_jammer_active() -> bool:
|
||||
global _cached_value
|
||||
global _cached_checked_monotonic
|
||||
|
||||
now = time.monotonic()
|
||||
if now - _cached_checked_monotonic < _CACHE_TTL_SEC:
|
||||
return _cached_value
|
||||
|
||||
_cached_value = _read_uncached()
|
||||
_cached_checked_monotonic = now
|
||||
return _cached_value
|
||||
|
||||
|
||||
def set_jammer_active(active: bool) -> None:
|
||||
global _cached_value
|
||||
global _cached_checked_monotonic
|
||||
|
||||
_FLAG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = _FLAG_PATH.with_name(f"{_FLAG_PATH.name}.{os.getpid()}.tmp")
|
||||
tmp_path.write_text("1" if active else "0", encoding="ascii")
|
||||
os.replace(tmp_path, _FLAG_PATH)
|
||||
|
||||
_cached_value = bool(active)
|
||||
_cached_checked_monotonic = time.monotonic()
|
||||
Loading…
Reference in New Issue