from __future__ import annotations from dataclasses import dataclass from datetime import datetime from pathlib import Path import re from typing import Mapping DEFAULT_PROFILE = "DEFAULT" MODEL_VALUE_SEPARATOR = " && " _DEFAULT_MODEL_KEY_RE = re.compile(r"^NN_(?P\d+)$") _PROFILE_MODEL_KEY_RE = re.compile(r"^NN_PROFILE_(?P[A-Za-z0-9_]+)_(?P\d+)$") _SCHEDULE_RULE_RE = re.compile( r"^\s*(?P\d{2}:\d{2})\s*-\s*(?P\d{2}:\d{2})\s*=\s*(?P[A-Za-z0-9_]+)\s*$" ) @dataclass(frozen=True) class ScheduleRule: start_minutes: int end_minutes: int profile: str raw_rule: str def normalize_profile_name(value: str | None) -> str: normalized = str(value or "").strip().upper() return normalized or DEFAULT_PROFILE def load_simple_env_file(path: str | Path) -> dict[str, str]: env_path = Path(path) if not env_path.exists(): return {} values: dict[str, str] = {} for line in env_path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if stripped.startswith("export "): stripped = stripped[len("export ") :].strip() if "=" not in stripped: continue key, raw_value = stripped.split("=", 1) key = key.strip() raw_value = raw_value.strip() if ( len(raw_value) >= 2 and raw_value[0] == raw_value[-1] and raw_value[0] in {"'", '"'} ): raw_value = raw_value[1:-1] values[key] = raw_value return values def parse_schedule(schedule_raw: str | None) -> list[ScheduleRule]: schedule_text = str(schedule_raw or "").strip() if not schedule_text: return [] rules: list[ScheduleRule] = [] for raw_rule in schedule_text.split(";"): candidate = raw_rule.strip() if not candidate: continue match = _SCHEDULE_RULE_RE.fullmatch(candidate) if match is None: raise ValueError(f"invalid NN_SCHEDULE rule: {candidate!r}") start_minutes = _parse_hhmm(match.group("start")) end_minutes = _parse_hhmm(match.group("end")) if start_minutes == end_minutes: raise ValueError(f"invalid NN_SCHEDULE rule with zero-length interval: {candidate!r}") rules.append( ScheduleRule( start_minutes=start_minutes, end_minutes=end_minutes, profile=normalize_profile_name(match.group("profile")), raw_rule=candidate, ) ) if not rules: raise ValueError("NN_SCHEDULE does not contain any usable rules") return rules def get_requested_active_profile( config: Mapping[str, str | None], now: datetime | None = None ) -> str: default_profile = normalize_profile_name(config.get("NN_ACTIVE_PROFILE")) rules = parse_schedule(config.get("NN_SCHEDULE")) if not rules: return default_profile current = now or datetime.now() current_minutes = current.hour * 60 + current.minute for rule in rules: if _rule_matches(rule, current_minutes): return rule.profile return default_profile def resolve_active_profile(config: Mapping[str, str | None], now: datetime | None = None) -> str: available_profiles = get_available_profiles(config) if not available_profiles: raise ValueError("no NN_* model entries configured") requested_profile = get_requested_active_profile(config, now=now) if requested_profile in available_profiles: return requested_profile if DEFAULT_PROFILE in available_profiles: return DEFAULT_PROFILE raise ValueError( f"requested NN profile {requested_profile!r} is not configured and DEFAULT profile is unavailable" ) def get_available_profiles(config: Mapping[str, str | None]) -> set[str]: profiles: set[str] = set() for key, value in config.items(): if not _is_model_value(value): continue if _DEFAULT_MODEL_KEY_RE.fullmatch(key): profiles.add(DEFAULT_PROFILE) continue match = _PROFILE_MODEL_KEY_RE.fullmatch(key) if match is not None: profiles.add(normalize_profile_name(match.group("profile"))) return profiles def get_profile_model_entries( config: Mapping[str, str | None], profile: str ) -> list[tuple[str, str]]: selected_profile = normalize_profile_name(profile) entries: list[tuple[int, str, str]] = [] for key, value in config.items(): if not _is_model_value(value): continue if selected_profile == DEFAULT_PROFILE: match = _DEFAULT_MODEL_KEY_RE.fullmatch(key) if match is None: continue entries.append((int(match.group("id")), key, value)) continue match = _PROFILE_MODEL_KEY_RE.fullmatch(key) if match is None: continue if normalize_profile_name(match.group("profile")) != selected_profile: continue logical_key = f"NN_{match.group('id')}" entries.append((int(match.group("id")), logical_key, value)) entries.sort(key=lambda item: item[0]) return [(logical_key, value) for _, logical_key, value in entries] def format_active_profile_env(profile: str) -> str: return f"NN_ACTIVE_PROFILE={normalize_profile_name(profile)}\n" def _parse_hhmm(token: str) -> int: hours_str, minutes_str = token.split(":", 1) hours = int(hours_str) minutes = int(minutes_str) if not (0 <= hours <= 23 and 0 <= minutes <= 59): raise ValueError(f"invalid time value {token!r}") return hours * 60 + minutes def _rule_matches(rule: ScheduleRule, current_minutes: int) -> bool: if rule.start_minutes < rule.end_minutes: return rule.start_minutes <= current_minutes < rule.end_minutes return current_minutes >= rule.start_minutes or current_minutes < rule.end_minutes def _is_model_value(value: str | None) -> bool: return isinstance(value, str) and MODEL_VALUE_SEPARATOR in value