You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
192 lines
6.0 KiB
Python
192 lines
6.0 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import re
|
|
from typing import Mapping
|
|
|
|
|
|
DEFAULT_PROFILE = "DEFAULT"
|
|
MODEL_VALUE_SEPARATOR = " && "
|
|
|
|
_DEFAULT_MODEL_KEY_RE = re.compile(r"^NN_(?P<id>\d+)$")
|
|
_PROFILE_MODEL_KEY_RE = re.compile(r"^NN_PROFILE_(?P<profile>[A-Za-z0-9_]+)_(?P<id>\d+)$")
|
|
_SCHEDULE_RULE_RE = re.compile(
|
|
r"^\s*(?P<start>\d{2}:\d{2})\s*-\s*(?P<end>\d{2}:\d{2})\s*=\s*(?P<profile>[A-Za-z0-9_]+)\s*$"
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScheduleRule:
|
|
start_minutes: int
|
|
end_minutes: int
|
|
profile: str
|
|
raw_rule: str
|
|
|
|
|
|
def normalize_profile_name(value: str | None) -> str:
|
|
normalized = str(value or "").strip().upper()
|
|
return normalized or DEFAULT_PROFILE
|
|
|
|
|
|
def load_simple_env_file(path: str | Path) -> dict[str, str]:
|
|
env_path = Path(path)
|
|
if not env_path.exists():
|
|
return {}
|
|
|
|
values: dict[str, str] = {}
|
|
for line in env_path.read_text(encoding="utf-8").splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
if stripped.startswith("export "):
|
|
stripped = stripped[len("export ") :].strip()
|
|
if "=" not in stripped:
|
|
continue
|
|
|
|
key, raw_value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
raw_value = raw_value.strip()
|
|
if (
|
|
len(raw_value) >= 2
|
|
and raw_value[0] == raw_value[-1]
|
|
and raw_value[0] in {"'", '"'}
|
|
):
|
|
raw_value = raw_value[1:-1]
|
|
values[key] = raw_value
|
|
return values
|
|
|
|
|
|
def parse_schedule(schedule_raw: str | None) -> list[ScheduleRule]:
|
|
schedule_text = str(schedule_raw or "").strip()
|
|
if not schedule_text:
|
|
return []
|
|
|
|
rules: list[ScheduleRule] = []
|
|
for raw_rule in schedule_text.split(";"):
|
|
candidate = raw_rule.strip()
|
|
if not candidate:
|
|
continue
|
|
|
|
match = _SCHEDULE_RULE_RE.fullmatch(candidate)
|
|
if match is None:
|
|
raise ValueError(f"invalid NN_SCHEDULE rule: {candidate!r}")
|
|
|
|
start_minutes = _parse_hhmm(match.group("start"))
|
|
end_minutes = _parse_hhmm(match.group("end"))
|
|
if start_minutes == end_minutes:
|
|
raise ValueError(f"invalid NN_SCHEDULE rule with zero-length interval: {candidate!r}")
|
|
|
|
rules.append(
|
|
ScheduleRule(
|
|
start_minutes=start_minutes,
|
|
end_minutes=end_minutes,
|
|
profile=normalize_profile_name(match.group("profile")),
|
|
raw_rule=candidate,
|
|
)
|
|
)
|
|
|
|
if not rules:
|
|
raise ValueError("NN_SCHEDULE does not contain any usable rules")
|
|
return rules
|
|
|
|
|
|
def get_requested_active_profile(
|
|
config: Mapping[str, str | None], now: datetime | None = None
|
|
) -> str:
|
|
default_profile = normalize_profile_name(config.get("NN_ACTIVE_PROFILE"))
|
|
rules = parse_schedule(config.get("NN_SCHEDULE"))
|
|
if not rules:
|
|
return default_profile
|
|
|
|
current = now or datetime.now()
|
|
current_minutes = current.hour * 60 + current.minute
|
|
for rule in rules:
|
|
if _rule_matches(rule, current_minutes):
|
|
return rule.profile
|
|
return default_profile
|
|
|
|
|
|
def resolve_active_profile(config: Mapping[str, str | None], now: datetime | None = None) -> str:
|
|
available_profiles = get_available_profiles(config)
|
|
if not available_profiles:
|
|
raise ValueError("no NN_* model entries configured")
|
|
|
|
requested_profile = get_requested_active_profile(config, now=now)
|
|
if requested_profile in available_profiles:
|
|
return requested_profile
|
|
if DEFAULT_PROFILE in available_profiles:
|
|
return DEFAULT_PROFILE
|
|
raise ValueError(
|
|
f"requested NN profile {requested_profile!r} is not configured and DEFAULT profile is unavailable"
|
|
)
|
|
|
|
|
|
def get_available_profiles(config: Mapping[str, str | None]) -> set[str]:
|
|
profiles: set[str] = set()
|
|
for key, value in config.items():
|
|
if not _is_model_value(value):
|
|
continue
|
|
|
|
if _DEFAULT_MODEL_KEY_RE.fullmatch(key):
|
|
profiles.add(DEFAULT_PROFILE)
|
|
continue
|
|
|
|
match = _PROFILE_MODEL_KEY_RE.fullmatch(key)
|
|
if match is not None:
|
|
profiles.add(normalize_profile_name(match.group("profile")))
|
|
return profiles
|
|
|
|
|
|
def get_profile_model_entries(
|
|
config: Mapping[str, str | None], profile: str
|
|
) -> list[tuple[str, str]]:
|
|
selected_profile = normalize_profile_name(profile)
|
|
entries: list[tuple[int, str, str]] = []
|
|
|
|
for key, value in config.items():
|
|
if not _is_model_value(value):
|
|
continue
|
|
|
|
if selected_profile == DEFAULT_PROFILE:
|
|
match = _DEFAULT_MODEL_KEY_RE.fullmatch(key)
|
|
if match is None:
|
|
continue
|
|
entries.append((int(match.group("id")), key, value))
|
|
continue
|
|
|
|
match = _PROFILE_MODEL_KEY_RE.fullmatch(key)
|
|
if match is None:
|
|
continue
|
|
if normalize_profile_name(match.group("profile")) != selected_profile:
|
|
continue
|
|
logical_key = f"NN_{match.group('id')}"
|
|
entries.append((int(match.group("id")), logical_key, value))
|
|
|
|
entries.sort(key=lambda item: item[0])
|
|
return [(logical_key, value) for _, logical_key, value in entries]
|
|
|
|
|
|
def format_active_profile_env(profile: str) -> str:
|
|
return f"NN_ACTIVE_PROFILE={normalize_profile_name(profile)}\n"
|
|
|
|
|
|
def _parse_hhmm(token: str) -> int:
|
|
hours_str, minutes_str = token.split(":", 1)
|
|
hours = int(hours_str)
|
|
minutes = int(minutes_str)
|
|
if not (0 <= hours <= 23 and 0 <= minutes <= 59):
|
|
raise ValueError(f"invalid time value {token!r}")
|
|
return hours * 60 + minutes
|
|
|
|
|
|
def _rule_matches(rule: ScheduleRule, current_minutes: int) -> bool:
|
|
if rule.start_minutes < rule.end_minutes:
|
|
return rule.start_minutes <= current_minutes < rule.end_minutes
|
|
return current_minutes >= rule.start_minutes or current_minutes < rule.end_minutes
|
|
|
|
|
|
def _is_model_value(value: str | None) -> bool:
|
|
return isinstance(value, str) and MODEL_VALUE_SEPARATOR in value
|