You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DroneDetector/common/nn_profile_schedule.py

192 lines
6.0 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import re
from typing import Mapping
DEFAULT_PROFILE = "DEFAULT"
MODEL_VALUE_SEPARATOR = " && "
_DEFAULT_MODEL_KEY_RE = re.compile(r"^NN_(?P<id>\d+)$")
_PROFILE_MODEL_KEY_RE = re.compile(r"^NN_PROFILE_(?P<profile>[A-Za-z0-9_]+)_(?P<id>\d+)$")
_SCHEDULE_RULE_RE = re.compile(
r"^\s*(?P<start>\d{2}:\d{2})\s*-\s*(?P<end>\d{2}:\d{2})\s*=\s*(?P<profile>[A-Za-z0-9_]+)\s*$"
)
@dataclass(frozen=True)
class ScheduleRule:
start_minutes: int
end_minutes: int
profile: str
raw_rule: str
def normalize_profile_name(value: str | None) -> str:
normalized = str(value or "").strip().upper()
return normalized or DEFAULT_PROFILE
def load_simple_env_file(path: str | Path) -> dict[str, str]:
env_path = Path(path)
if not env_path.exists():
return {}
values: dict[str, str] = {}
for line in env_path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.startswith("export "):
stripped = stripped[len("export ") :].strip()
if "=" not in stripped:
continue
key, raw_value = stripped.split("=", 1)
key = key.strip()
raw_value = raw_value.strip()
if (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {"'", '"'}
):
raw_value = raw_value[1:-1]
values[key] = raw_value
return values
def parse_schedule(schedule_raw: str | None) -> list[ScheduleRule]:
schedule_text = str(schedule_raw or "").strip()
if not schedule_text:
return []
rules: list[ScheduleRule] = []
for raw_rule in schedule_text.split(";"):
candidate = raw_rule.strip()
if not candidate:
continue
match = _SCHEDULE_RULE_RE.fullmatch(candidate)
if match is None:
raise ValueError(f"invalid NN_SCHEDULE rule: {candidate!r}")
start_minutes = _parse_hhmm(match.group("start"))
end_minutes = _parse_hhmm(match.group("end"))
if start_minutes == end_minutes:
raise ValueError(f"invalid NN_SCHEDULE rule with zero-length interval: {candidate!r}")
rules.append(
ScheduleRule(
start_minutes=start_minutes,
end_minutes=end_minutes,
profile=normalize_profile_name(match.group("profile")),
raw_rule=candidate,
)
)
if not rules:
raise ValueError("NN_SCHEDULE does not contain any usable rules")
return rules
def get_requested_active_profile(
config: Mapping[str, str | None], now: datetime | None = None
) -> str:
default_profile = normalize_profile_name(config.get("NN_ACTIVE_PROFILE"))
rules = parse_schedule(config.get("NN_SCHEDULE"))
if not rules:
return default_profile
current = now or datetime.now()
current_minutes = current.hour * 60 + current.minute
for rule in rules:
if _rule_matches(rule, current_minutes):
return rule.profile
return default_profile
def resolve_active_profile(config: Mapping[str, str | None], now: datetime | None = None) -> str:
available_profiles = get_available_profiles(config)
if not available_profiles:
raise ValueError("no NN_* model entries configured")
requested_profile = get_requested_active_profile(config, now=now)
if requested_profile in available_profiles:
return requested_profile
if DEFAULT_PROFILE in available_profiles:
return DEFAULT_PROFILE
raise ValueError(
f"requested NN profile {requested_profile!r} is not configured and DEFAULT profile is unavailable"
)
def get_available_profiles(config: Mapping[str, str | None]) -> set[str]:
profiles: set[str] = set()
for key, value in config.items():
if not _is_model_value(value):
continue
if _DEFAULT_MODEL_KEY_RE.fullmatch(key):
profiles.add(DEFAULT_PROFILE)
continue
match = _PROFILE_MODEL_KEY_RE.fullmatch(key)
if match is not None:
profiles.add(normalize_profile_name(match.group("profile")))
return profiles
def get_profile_model_entries(
config: Mapping[str, str | None], profile: str
) -> list[tuple[str, str]]:
selected_profile = normalize_profile_name(profile)
entries: list[tuple[int, str, str]] = []
for key, value in config.items():
if not _is_model_value(value):
continue
if selected_profile == DEFAULT_PROFILE:
match = _DEFAULT_MODEL_KEY_RE.fullmatch(key)
if match is None:
continue
entries.append((int(match.group("id")), key, value))
continue
match = _PROFILE_MODEL_KEY_RE.fullmatch(key)
if match is None:
continue
if normalize_profile_name(match.group("profile")) != selected_profile:
continue
logical_key = f"NN_{match.group('id')}"
entries.append((int(match.group("id")), logical_key, value))
entries.sort(key=lambda item: item[0])
return [(logical_key, value) for _, logical_key, value in entries]
def format_active_profile_env(profile: str) -> str:
return f"NN_ACTIVE_PROFILE={normalize_profile_name(profile)}\n"
def _parse_hhmm(token: str) -> int:
hours_str, minutes_str = token.split(":", 1)
hours = int(hours_str)
minutes = int(minutes_str)
if not (0 <= hours <= 23 and 0 <= minutes <= 59):
raise ValueError(f"invalid time value {token!r}")
return hours * 60 + minutes
def _rule_matches(rule: ScheduleRule, current_minutes: int) -> bool:
if rule.start_minutes < rule.end_minutes:
return rule.start_minutes <= current_minutes < rule.end_minutes
return current_minutes >= rule.start_minutes or current_minutes < rule.end_minutes
def _is_model_value(value: str | None) -> bool:
return isinstance(value, str) and MODEL_VALUE_SEPARATOR in value