From: Thomas Walker Lynch Date: Fri, 19 Sep 2025 10:44:01 +0000 (-0700) Subject: initial commit X-Git-Url: https://git.reasoningtechnology.com/usr/lib/python2.7/opcode.py?a=commitdiff_plain;h=fae51bafc0358f78d26ab5e7553135dbab607f06;p=Man-In-Grey initial commit --- diff --git "a/developer/document\360\237\226\211/.githolder" "b/developer/document\360\237\226\211/.githolder" new file mode 100644 index 0000000..e69de29 diff --git a/developer/python/test_env.py b/developer/python/test_env.py deleted file mode 100644 index d47e8a1..0000000 --- a/developer/python/test_env.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 - -import os -import sys - -def print_env_var(name): - value = os.getenv(name) - print(f"{name:<16}: {value if value else ''}") - -def main(): - print("=== Python Environment Test ===") - print(f"Python executable : {sys.executable}") - print(f"Python version : {sys.version}") - print() - - print("=== Harmony Environment Variables ===") - for var in ["ROLE", "REPO_HOME", "PYTHON_HOME", "VIRTUAL_ENV", "ENV"]: - print_env_var(var) - - print() - print("=== Current Working Directory ===") - print(os.getcwd()) - -if __name__ == "__main__": - main() diff --git a/developer/source/Planner.py b/developer/source/Planner.py new file mode 100644 index 0000000..94d8226 --- /dev/null +++ b/developer/source/Planner.py @@ -0,0 +1,515 @@ +#!/usr/bin/env -S python3 -B +""" +Planner.py — plan builder for staged configuration (UNPRIVILEGED). + +Given: runner-side provenance (PlanProvenance) and optional defaults (WriteFileMeta). +Does: expose Planner whose command methods (copy/displace/delete) build Command entries, + resolving arguments with precedence: kwarg > per-call WriteFileMeta > planner default + (and for filename, fallback to provenance-derived basename). On any argument error, + the Command is returned with errors and NOT appended to the Journal. +Returns: Journal (model only; dict in/out) via planner.journal(). +""" + +from __future__ import annotations + +# no bytecode anywhere (works under sudo/root shells too) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import getpass + +# ===== Utilities ===== + +def norm_perm(value: int|str)-> tuple[int ,str]|None: + "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int ,'%04o') or None." + if isinstance(value ,int): + if 0 <= value <= 0o7777: + return value ,f"{value:04o}" + return None + if isinstance(value ,str): + s = value.strip().lower() + if s.startswith("0o"): + try: + v = int(s ,8) + return v ,f"{v:04o}" + except Exception: + return None + if len(s) in (3 ,4) and all(ch in "01234567" for ch in s): + try: + v = int(s ,8) + return v ,f"{v:04o}" + except Exception: + return None + return None + +def is_abs_dpath(dpath_str: str|None)-> bool: + "Given path string. Does quick abs dir check. Returns bool." + return isinstance(dpath_str ,str) and dpath_str.startswith("/") and "\x00" not in dpath_str + +def norm_abs_dpath_str(value: str|Path|None)-> str|None: + "Given str/Path/None. Does normalize absolute dir path string. Returns str or None." + if value is None: return None + s = value.as_posix() if isinstance(value ,Path) else str(value) + return s if is_abs_dpath(s) else None + +def norm_dpath_str(value: str|Path|None)-> str|None: + "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None." + if value is None: return None + s = value.as_posix() if isinstance(value ,Path) else str(value) + if not s or "\x00" in s: return None + return s + +def norm_fname_or_none(value: str|None)-> str|None: + "Given candidate filename or None. Does validate bare filename. Returns str or None." + if value is None: return None + s = str(value) + if not s: return None + if "/" in s or s in ("." ,"..") or "\x00" in s: return None + return s + +def norm_nonempty_owner(value: str|None)-> str|None: + "Given owner string or None. Does minimally validate (non-empty). Returns str or None." + if value is None: return None + s = str(value).strip() + return s if s else None + +def parse_mode(value: int|str|None)-> tuple[int|None ,str|None]: + "Given int/str/None. Does normalize via norm_perm. Returns (int ,'%04o') or (None ,None)." + if value is None: return None ,None + r = norm_perm(value) + return r if r is not None else (None ,None) + +def norm_content_bytes(value: bytes|str|None)-> bytes|None: + "Given bytes/str/None. Does normalize to UTF-8 bytes or None. Returns bytes|None." + if value is None: return None + if isinstance(value ,bytes): return value + return value.encode("utf-8") + +# ===== Wire-ready model types (no CBOR here) ===== + +class Command: + """ + Command — a single planned operation. + + Given name_str ('copy'|'displace'|'delete'), optional arg_dict, optional errors_list. + Does hold op name, own a fresh arg_dict, collect per-entry errors. + Returns dictionary via as_dictionary(). + """ + __slots__ = ( + "name_str" + ,"arg_dict" + ,"errors_list" + ) + + def __init__(self ,name_str: str ,arg_dict: dict|None=None ,errors_list: list[str]|None=None)-> None: + self.name_str = name_str + self.arg_dict = dict(arg_dict) if arg_dict is not None else {} + self.errors_list = list(errors_list) if errors_list is not None else [] + + def add_error(self ,msg_str: str)-> None: + self.errors_list.append(msg_str) + + def as_dictionary(self)-> dict: + return { + "op": self.name_str + ,"arg_dict": dict(self.arg_dict) + ,"errors_list": list(self.errors_list) + } + + def print(self ,* ,index: int|None=None ,file=None)-> None: + """ + Given: optional index for numbering and optional file-like (defaults to stdout). + Does: print a compact, human-readable one-line summary of this command; prints any errors indented below. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + op = self.name_str + ad = self.arg_dict or {} + + # Compose destination path for display (normalize to collapse '..') + d = ad.get("write_file_dpath_str") or "" + f = ad.get("write_file_fname") or "" + try: + from pathlib import Path as _Path + if d and f and "/" not in f: + dst = (_Path(d)/f).resolve().as_posix() + else: + dst = "?" + except Exception: + dst = "?" + + prefix = f"{index:02d}. " if index is not None else "" + + if op == "copy": + mode = ad.get("mode_int") + owner = ad.get("owner_name") + size = len(ad.get("content_bytes") or b"") + line = f"{prefix}copy -> {dst} mode {mode:04o} owner {owner} bytes {size}" + elif op == "displace": + line = f"{prefix}displace -> {dst}" + elif op == "delete": + line = f"{prefix}delete -> {dst}" + else: + line = f"{prefix}?op? -> {dst}" + + print(line ,file=file) + + for err in self.errors_list: + print(f" ! {err}" ,file=file) + +class Journal: + """ + Journal — ordered list of Command plus provenance metadata (model only; no CBOR). + + Given optional plan_dict in wire shape (for reconstruction). + Does manage meta, append commands, expose entries, and pack to dict. + Returns dict via as_dictionary(). + """ + __slots__ = ( + "meta_dict" + ,"command_list" + ) + + def __init__(self ,plan_dict: dict|None=None)-> None: + self.meta_dict = {} + self.command_list = [] + if plan_dict is not None: + self._init_from_dict(plan_dict) + + def _init_from_dict(self ,plan_dict: dict)-> None: + if not isinstance(plan_dict ,dict): + raise ValueError("plan_dict must be a dict") + meta = dict(plan_dict.get("meta_dict") or {}) + entries = plan_dict.get("entries_list") or [] + self.meta_dict.update(meta) + for e in entries: + if not isinstance(e ,dict): + continue + op = e.get("op") or "?" + args = e.get("arg_dict") or {} + errs = e.get("errors_list") or [] + self.command_list.append(Command(name_str=op ,arg_dict=dict(args) ,errors_list=list(errs))) + + def set_meta(self ,**kv)-> None: + self.meta_dict.update(kv) + + def append(self ,cmd: Command)-> None: + self.command_list.append(cmd) + + def entries_list(self)-> list[dict]: + return [c.as_dictionary() for c in self.command_list] + + def as_dictionary(self)-> dict: + return { + "version_int": 1 + ,"meta_dict": dict(self.meta_dict) + ,"entries_list": self.entries_list() + } + + def print(self ,* ,index_start: int=1 ,file=None)-> None: + """ + Given: optional starting index and optional file-like (defaults to stdout). + Does: print each Command on a single line via Command.print(), numbered. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + if not self.command_list: + print("(plan is empty)" ,file=file) + return + + for i ,cmd in enumerate(self.command_list ,start=index_start): + cmd.print(index=i ,file=file) + +# ===== Runner-provided provenance ===== + +class PlanProvenance: + """ + Runner-provided, read-only provenance for a single config script. + """ + __slots__ = ( + "stage_root_dpath" + ,"config_abs_fpath" + ,"config_rel_fpath" + ,"read_dir_dpath" + ,"read_fname" + ,"process_user" + ,"cwd_dpath" + ) + + def __init__(self ,* ,stage_root: Path ,config_path: Path): + self.stage_root_dpath = stage_root.resolve() + self.config_abs_fpath = config_path.resolve() + try: + self.config_rel_fpath = self.config_abs_fpath.relative_to(self.stage_root_dpath) + except Exception: + self.config_rel_fpath = Path(self.config_abs_fpath.name) + + self.read_dir_dpath = self.config_abs_fpath.parent + + name = self.config_abs_fpath.name + if name.endswith(".stage.py"): + self.read_fname = name[:-len(".stage.py")] + elif name.endswith(".py"): + self.read_fname = name[:-3] + else: + self.read_fname = name + + self.process_user = getpass.getuser() + self.cwd_dpath = Path.cwd().resolve() + + def print(self ,* ,file=None)-> None: + if file is None: + import sys as _sys + file = _sys.stdout + print(f"Stage root: {self.stage_root_dpath}" ,file=file) + print(f"Config (rel): {self.config_rel_fpath.as_posix()}" ,file=file) + print(f"Config (abs): {self.config_abs_fpath}" ,file=file) + print(f"Read dir: {self.read_dir_dpath}" ,file=file) + print(f"Read fname: {self.read_fname}" ,file=file) + print(f"Process user: {self.process_user}" ,file=file) + +# ===== Admin-facing defaults carrier ===== + +class WriteFileMeta: + """ + WriteFileMeta — per-call or planner-default write-file attributes. + + Given dpath (str/Path, may be relative) ,fname (bare name or None) ,owner (str) + ,mode (int|'0644') ,content (bytes|str|None). + Does normalize into fields (may remain None if absent/invalid). + Returns object suitable for providing defaults to Planner methods. + """ + __slots__ = ( + "dpath_str" + ,"fname" + ,"owner_name_str" + ,"mode_int" + ,"mode_octal_str" + ,"content_bytes" + ) + + def __init__(self + ,* + ,dpath="/" + ,fname=None + ,owner="root" + ,mode=0o444 + ,content=None + ): + self.dpath_str = norm_dpath_str(dpath) + self.fname = norm_fname_or_none(fname) + self.owner_name_str = norm_nonempty_owner(owner) + self.mode_int ,self.mode_octal_str = parse_mode(mode) + self.content_bytes = norm_content_bytes(content) + + def print(self ,* ,label: str|None=None ,file=None)-> None: + if file is None: + import sys as _sys + file = _sys.stdout + dpath = self.dpath_str or "?" + fname = self.fname or "?" + owner = self.owner_name_str or "?" + mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int ,int) else (self.mode_octal_str or "?") + size = len(self.content_bytes) if isinstance(self.content_bytes ,(bytes ,bytearray)) else 0 + prefix = (label + ": ") if label else "" + print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}" ,file=file) + +# ===== Planner ===== + +class Planner: + """ + Planner — constructs a Journal of Commands from config scripts. + + Given provenance (PlanProvenance) and optional default WriteFileMeta. + Does resolve command parameters by precedence: kwarg > per-call WriteFileMeta > planner default, + with a final filename fallback to provenance basename if still missing. + On any argument error, returns the Command with errors and DOES NOT append it to Journal. + Returns live Journal via journal(). + """ + __slots__ = ( + "_prov" + ,"_defaults" + ,"_journal" + ) + + def __init__(self ,provenance: PlanProvenance ,defaults: WriteFileMeta|None=None)-> None: + self._prov = provenance + self._defaults = defaults if defaults is not None else WriteFileMeta( + dpath="/" + ,fname=provenance.read_fname + ,owner="root" + ,mode=0o444 + ,content=None + ) + self._journal = Journal() + self._journal.set_meta( + stage_root_dpath_str=str(self._prov.stage_root_dpath) + ,config_rel_fpath_str=self._prov.config_rel_fpath.as_posix() + ) + + # --- provenance/defaults/journal access --- + + def set_provenance(self ,prov: PlanProvenance)-> None: + self._prov = prov + + def set_defaults(self ,defaults: WriteFileMeta)-> None: + self._defaults = defaults + + def defaults(self)-> WriteFileMeta: + return self._defaults + + def journal(self)-> Journal: + return self._journal + + # --- resolution helpers --- + + def _pick(self ,kw ,meta_attr ,default_attr): + "Pick first non-None among kw ,meta_attr ,default_attr." + return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr) + + def _resolve_write_file(self ,wfm ,dpath ,fname)-> tuple[str|None ,str|None]: + # normalize explicit kwargs + dpath_str = norm_dpath_str(dpath) if dpath is not None else None + fname_str = norm_fname_or_none(fname) if fname is not None else None + + # precedence: kwarg > per-call meta > planner default + dpath_val = self._pick(dpath_str ,(wfm.dpath_str if wfm else None) ,self._defaults.dpath_str) + fname_val = self._pick(fname_str ,(wfm.fname if wfm else None) ,self._defaults.fname) + + # final fallback for filename: derive from config name + if fname_val is None: + fname_val = self._prov.read_fname + + # anchor/normalize dpath + if dpath_val is not None: + p = Path(dpath_val) + if not p.is_absolute(): + p = (self._prov.cwd_dpath/p) + dpath_val = p.resolve().as_posix() + + return dpath_val ,fname_val + + def _resolve_owner_mode_content(self + ,wfm: WriteFileMeta|None + ,owner: str|None + ,mode: int|str|None + ,content: bytes|str|None + )-> tuple[str|None ,tuple[int|None ,str|None] ,bytes|None]: + owner_norm = norm_nonempty_owner(owner) if owner is not None else None + mode_norm = parse_mode(mode) if mode is not None else (None ,None) + content_b = norm_content_bytes(content) if content is not None else None + + owner_v = self._pick(owner_norm ,(wfm.owner_name_str if wfm else None) ,self._defaults.owner_name_str) + mode_v = (mode_norm if mode_norm != (None ,None) else + ((wfm.mode_int ,wfm.mode_octal_str) if wfm else (self._defaults.mode_int ,self._defaults.mode_octal_str))) + content_v = self._pick(content_b ,(wfm.content_bytes if wfm else None) ,self._defaults.content_bytes) + return owner_v ,mode_v ,content_v + + # --- printing --- + + def print(self ,* ,show_journal: bool=True ,file=None)-> None: + if file is None: + import sys as _sys + file = _sys.stdout + + print("== Provenance ==" ,file=file) + self._prov.print(file=file) + + print("\n== Defaults ==" ,file=file) + self._defaults.print(label="defaults" ,file=file) + + if show_journal: + entries = getattr(self._journal ,"command_list" ,[]) + n_total = len(entries) + n_copy = sum(1 for c in entries if getattr(c ,"name_str" ,None) == "copy") + n_disp = sum(1 for c in entries if getattr(c ,"name_str" ,None) == "displace") + n_del = sum(1 for c in entries if getattr(c ,"name_str" ,None) == "delete") + + print("\n== Journal ==" ,file=file) + print(f"entries: {n_total} copy:{n_copy} displace:{n_disp} delete:{n_del}" ,file=file) + if n_total: + self._journal.print(index_start=1 ,file=file) + else: + print("(plan is empty)" ,file=file) + + # --- Command builders (first arg may be WriteFileMeta) --- + + def copy(self + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + ,owner: str|None=None + ,mode: int|str|None=None + ,content: bytes|str|None=None + )-> Command: + cmd = Command("copy") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + owner_v ,(mode_int ,mode_oct) ,content_b = self._resolve_owner_mode_content(wfm ,owner ,mode ,content) + + # well-formed checks + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + if not owner_v: cmd.add_error("owner must be non-empty") + if (mode_int ,mode_oct) == (None ,None): + cmd.add_error("mode must be int <= 0o7777 or 3/4-digit octal string") + if content_b is None: + cmd.add_error("content is required for copy() (bytes or str)") + + cmd.arg_dict.update({ + "write_file_dpath_str": dpath + ,"write_file_fname": fname + ,"owner_name": owner_v + ,"mode_int": mode_int + ,"mode_octal_str": mode_oct + ,"content_bytes": content_b + ,"provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix() + }) + + if not cmd.errors_list: + self._journal.append(cmd) + return cmd + + def displace(self + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + )-> Command: + cmd = Command("displace") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + cmd.arg_dict.update({ + "write_file_dpath_str": dpath + ,"write_file_fname": fname + }) + if not cmd.errors_list: + self._journal.append(cmd) + return cmd + + def delete(self + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + )-> Command: + cmd = Command("delete") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + cmd.arg_dict.update({ + "write_file_dpath_str": dpath + ,"write_file_fname": fname + }) + if not cmd.errors_list: + self._journal.append(cmd) + return cmd diff --git a/developer/source/deprecated/.githolder b/developer/source/deprecated/.githolder new file mode 100644 index 0000000..e69de29 diff --git a/developer/source/deprecated/Planner.py b/developer/source/deprecated/Planner.py new file mode 100644 index 0000000..b1cf34f --- /dev/null +++ b/developer/source/deprecated/Planner.py @@ -0,0 +1,514 @@ +#!/usr/bin/env -S python3 -B +""" +Planner.py — plan builder for staged configuration (UNPRIVILEGED). + +Given: runner-side provenance (PlanProvenance) and optional defaults (WriteFileMeta). +Does: expose Planner whose command methods (copy/displace/delete) build Command entries, + resolving arguments with precedence: kwarg > per-call WriteFileMeta > planner default + (and for filename, fallback to provenance-derived basename). On any argument error, + the Command is returned with errors and NOT appended to the Journal. +Returns: Journal (model only; dict in/out) via planner.journal(). +""" + +from __future__ import annotations + +# no bytecode anywhere (works under sudo/root shells too) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import getpass + + +# ===== Utilities ===== + +def norm_perm(value: int|str)-> tuple[int,str]|None: + "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int,'%04o') or None." + if isinstance(value ,int): + if 0 <= value <= 0o7777: + return value ,f"{value:04o}" + return None + if isinstance(value ,str): + s = value.strip().lower() + if s.startswith("0o"): + try: + v = int(s ,8) + return v ,f"{v:04o}" + except Exception: + return None + if len(s) in (3 ,4) and all(ch in "01234567" for ch in s): + try: + v = int(s ,8) + return v ,f"{v:04o}" + except Exception: + return None + return None + +def is_abs_dpath(dpath_str: str|None)-> bool: + "Given path string. Does quick abs dir check. Returns bool." + return isinstance(dpath_str ,str) and dpath_str.startswith("/") and "\x00" not in dpath_str + +def norm_abs_dpath_str(value: str|Path|None)-> str|None: + "Given str/Path/None. Does normalize absolute dir path string. Returns str or None." + if value is None: return None + s = value.as_posix() if isinstance(value ,Path) else str(value) + return s if is_abs_dpath(s) else None + +def norm_fname_or_none(value: str|None)-> str|None: + "Given candidate filename or None. Does validate bare filename. Returns str or None." + if value is None: return None + s = str(value) + if not s: return None + if "/" in s or s in ("." ,"..") or "\x00" in s: return None + return s + +def norm_nonempty_owner(value: str|None)-> str|None: + "Given owner string or None. Does minimally validate (non-empty). Returns str or None." + if value is None: return None + s = str(value).strip() + return s if s else None + +def parse_mode(value: int|str|None)-> tuple[int|None ,str|None]: + "Given int/str/None. Does normalize via norm_perm. Returns (int,'%04o') or (None,None)." + if value is None: return None ,None + r = norm_perm(value) + return r if r is not None else (None ,None) + +def norm_content_bytes(value: bytes|str|None)-> bytes|None: + "Given bytes/str/None. Does normalize to UTF-8 bytes or None. Returns bytes|None." + if value is None: return None + if isinstance(value ,bytes): return value + return value.encode("utf-8") + +def norm_dpath_str(value: str|Path|None)-> str|None: + "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None." + if value is None: return None + s = value.as_posix() if isinstance(value ,Path) else str(value) + if not s or "\x00" in s: return None + return s + + +# ===== Wire-ready model types (no CBOR here) ===== + +class Command: + """ + Command — a single planned operation. + + Given name_str ('copy'|'displace'|'delete'), optional arg_dict, optional errors_list. + Does hold op name, own a fresh arg_dict, collect per-entry errors. + Returns dictionary via as_dictionary(). + """ + __slots__ = ("name_str" ,"arg_dict" ,"errors_list") + + def __init__(self ,name_str: str ,arg_dict: dict|None=None ,errors_list: list[str]|None=None)-> None: + self.name_str = name_str + self.arg_dict = dict(arg_dict) if arg_dict is not None else {} + self.errors_list = list(errors_list) if errors_list is not None else [] + + def add_error(self ,msg_str: str)-> None: + self.errors_list.append(msg_str) + + def as_dictionary(self)-> dict: + return { + "op": self.name_str + ,"arg_dict": dict(self.arg_dict) + ,"errors_list": list(self.errors_list) + } + + def print(self, *, index: int|None=None, file=None)-> None: + """ + Given: optional index for numbering and optional file-like (defaults to stdout). + Does: print a compact, human-readable one-line summary of this command; prints any errors indented below. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + op = self.name_str + ad = self.arg_dict or {} + + # Compose destination path for display + d = ad.get("write_file_dpath_str") or "" + f = ad.get("write_file_fname") or "" + try: + from pathlib import Path as _Path + dst = (_Path(d)/f).as_posix() if d and f and "/" not in f else "?" + except Exception: + dst = "?" + + # Numbering prefix + prefix = f"{index:02d}. " if index is not None else "" + + if op == "copy": + mode = ad.get("mode_int") + owner = ad.get("owner_name") + size = len(ad.get("content_bytes") or b"") + line = f"{prefix}copy -> {dst} mode {mode:04o} owner {owner} bytes {size}" + elif op == "displace": + line = f"{prefix}displace -> {dst}" + elif op == "delete": + line = f"{prefix}delete -> {dst}" + else: + line = f"{prefix}?op? -> {dst}" + + print(line, file=file) + + # Print any per-entry errors underneath + for err in self.errors_list: + print(f" ! {err}", file=file) + + +class Journal: + """ + Journal — ordered list of Command plus provenance metadata (model only; no CBOR). + + Given optional plan_dict in wire shape (for reconstruction). + Does manage meta, append commands, expose entries, and pack to dict. + Returns dict via as_dictionary(). + """ + __slots__ = ("meta_dict" ,"command_list") + + def __init__(self ,plan_dict: dict|None=None)-> None: + self.meta_dict = {} + self.command_list = [] + if plan_dict is not None: + self._init_from_dict(plan_dict) + + def _init_from_dict(self ,plan_dict: dict)-> None: + if not isinstance(plan_dict ,dict): + raise ValueError("plan_dict must be a dict") + meta = dict(plan_dict.get("meta_dict") or {}) + entries = plan_dict.get("entries_list") or [] + self.meta_dict.update(meta) + for e in entries: + if not isinstance(e ,dict): + continue + op = e.get("op") or "?" + args = e.get("arg_dict") or {} + errs = e.get("errors_list") or [] + self.command_list.append(Command(name_str=op ,arg_dict=dict(args) ,errors_list=list(errs))) + + def set_meta(self ,**kv)-> None: + self.meta_dict.update(kv) + + def append(self ,cmd: Command)-> None: + self.command_list.append(cmd) + + def entries_list(self)-> list[dict]: + return [c.as_dictionary() for c in self.command_list] + + def as_dictionary(self)-> dict: + return { + "version_int": 1 + ,"meta_dict": dict(self.meta_dict) + ,"entries_list": self.entries_list() + } + + def print(self, *, index_start: int = 1, file=None) -> None: + """ + Given: optional starting index and optional file-like (defaults to stdout). + Does: print each Command on a single line via Command.print(), numbered. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + if not self.command_list: + print("(plan is empty)", file=file) + return + + for i, cmd in enumerate(self.command_list, start=index_start): + cmd.print(index=i, file=file) + +# ===== Runner-provided provenance ===== + +# Planner.py +class PlanProvenance: + """ + Runner-provided, read-only provenance for a single config script. + """ + __slots__ = ("stage_root_dpath","config_abs_fpath","config_rel_fpath", + "read_dir_dpath","read_fname","process_user") + + def __init__(self, *, stage_root: Path, config_path: Path): + import getpass + self.stage_root_dpath = stage_root.resolve() + self.config_abs_fpath = config_path.resolve() + try: + self.config_rel_fpath = self.config_abs_fpath.relative_to(self.stage_root_dpath) + except Exception: + self.config_rel_fpath = Path(self.config_abs_fpath.name) + + self.read_dir_dpath = self.config_abs_fpath.parent + + name = self.config_abs_fpath.name + if name.endswith(".stage.py"): + self.read_fname = name[:-len(".stage.py")] + elif name.endswith(".py"): + self.read_fname = name[:-3] + else: + self.read_fname = name + + # NEW: owner of the StageHand process + self.process_user = getpass.getuser() + + def print(self, *, file=None) -> None: + if file is None: + import sys as _sys + file = _sys.stdout + print(f"Stage root: {self.stage_root_dpath}", file=file) + print(f"Config (rel): {self.config_rel_fpath.as_posix()}", file=file) + print(f"Config (abs): {self.config_abs_fpath}", file=file) + print(f"Read dir: {self.read_dir_dpath}", file=file) + print(f"Read fname: {self.read_fname}", file=file) + print(f"Process user: {self.process_user}", file=file) # NEW + +# ===== Admin-facing defaults carrier ===== + +class WriteFileMeta: + """ + WriteFileMeta — per-call or planner-default write-file attributes. + + Given dpath (abs str/Path) ,fname (bare name or None) ,owner (str) + ,mode (int|'0644') ,content (bytes|str|None). + Does normalize into fields (may remain None if absent/invalid). + Returns object suitable for providing defaults to Planner methods. + """ + __slots__ = ("dpath_str" ,"fname" ,"owner_name_str" ,"mode_int" ,"mode_octal_str" ,"content_bytes") + + def __init__(self + ,* + ,dpath="/" + ,fname=None # None → let Planner/provenance choose + ,owner="root" + ,mode=0o444 + ,content=None + ): + self.dpath_str = norm_dpath_str(dpath) + self.fname = norm_fname_or_none(fname) # '.' no longer special → None + self.owner_name_str = norm_nonempty_owner(owner) # '.' rejected → None + self.mode_int, self.mode_octal_str = parse_mode(mode) + self.content_bytes = norm_content_bytes(content) + + def print(self, *, label: str | None = None, file=None) -> None: + """ + Given: optional label and optional file-like (defaults to stdout). + Does: print a single-line summary of defaults/overrides. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + dpath = self.dpath_str or "?" + fname = self.fname or "?" + owner = self.owner_name_str or "?" + mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int, int) else (self.mode_octal_str or "?") + size = len(self.content_bytes) if isinstance(self.content_bytes, (bytes, bytearray)) else 0 + prefix = (label + ": ") if label else "" + print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}", file=file) + + +# ===== Planner ===== + +class Planner: + """ + Planner — constructs a Journal of Commands from config scripts. + + Given provenance (PlanProvenance) and optional default WriteFileMeta. + Does resolve command parameters by precedence: kwarg > per-call WriteFileMeta > planner default, + with a final filename fallback to provenance basename if still missing. + On any argument error, returns the Command with errors and DOES NOT append it to Journal. + Returns live Journal via journal(). + """ + __slots__ = ("_prov" ,"_defaults" ,"_journal") + + def __init__(self ,provenance: PlanProvenance ,defaults: WriteFileMeta|None=None)-> None: + self._prov = provenance + self._defaults = defaults if defaults is not None else WriteFileMeta( + dpath="/" + ,fname=provenance.read_fname + ,owner="root" + ,mode=0o444 + ,content=None + ) + self._journal = Journal() + self._journal.set_meta( + stage_root_dpath_str=str(self._prov.stage_root_dpath) + ,config_rel_fpath_str=self._prov.config_rel_fpath.as_posix() + ) + + # --- defaults management / access --- + + # in Planner.py, inside class Planner + def set_provenance(self, prov: PlanProvenance) -> None: + """Switch the current provenance used for fallbacks & per-command provenance tagging.""" + self._prov = prov + + def set_defaults(self ,defaults: WriteFileMeta)-> None: + "Given WriteFileMeta. Does replace planner defaults. Returns None." + self._defaults = defaults + + def defaults(self)-> WriteFileMeta: + "Given n/a. Does return current WriteFileMeta defaults. Returns WriteFileMeta." + return self._defaults + + def journal(self)-> Journal: + "Given n/a. Returns Journal reference (live, still being modified here)." + return self._journal + + # --- resolution helpers --- + + def _pick(self ,kw ,meta_attr ,default_attr): + "Given three sources. Does pick first non-None. Returns value or None." + return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr) + + def _resolve_write_file(self, wfm, dpath, fname) -> tuple[str|None, str|None]: + dpath_str = norm_dpath_str(dpath) if dpath is not None else None + fname = norm_fname_or_none(fname) if fname is not None else None + + dpath_val = self._pick(dpath_str, (wfm.dpath_str if wfm else None), self._defaults.dpath_str) + fname_val = self._pick(fname, (wfm.fname if wfm else None), self._defaults.fname) + + # final fallback for filename: derive from config name + if fname_val is None: + fname_val = self._prov.read_fname + + # anchor relative dpaths against the config’s directory + if dpath_val is not None and not is_abs_dpath(dpath_val): + dpath_val = (self._prov.read_dir_dpath / dpath_val).as_posix() + + return dpath_val, fname_val + + def _resolve_owner_mode_content(self + ,wfm: WriteFileMeta|None + ,owner: str|None + ,mode: int|str|None + ,content: bytes|str|None + )-> tuple[str|None ,tuple[int|None ,str|None] ,bytes|None]: + owner_norm = norm_nonempty_owner(owner) if owner is not None else None + mode_norm = parse_mode(mode) if mode is not None else (None ,None) + content_b = norm_content_bytes(content) if content is not None else None + + owner_v = self._pick(owner_norm, (wfm.owner_name_str if wfm else None), self._defaults.owner_name_str) + mode_v = (mode_norm if mode_norm != (None ,None) else + ((wfm.mode_int ,wfm.mode_octal_str) if wfm else (self._defaults.mode_int ,self._defaults.mode_octal_str))) + content_v = self._pick(content_b ,(wfm.content_bytes if wfm else None) ,self._defaults.content_bytes) + return owner_v ,mode_v ,content_v + + def print(self, *, show_journal: bool = True, file=None) -> None: + """ + Given: flags (show_journal) and optional file-like (defaults to stdout). + Does: print provenance, defaults, and optionally the journal via delegation. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + print("== Provenance ==", file=file) + self._prov.print(file=file) + + print("\n== Defaults ==", file=file) + self._defaults.print(label="defaults", file=file) + + if show_journal: + entries = getattr(self._journal, "command_list", []) + n_total = len(entries) + n_copy = sum(1 for c in entries if getattr(c, "name_str", None) == "copy") + n_disp = sum(1 for c in entries if getattr(c, "name_str", None) == "displace") + n_del = sum(1 for c in entries if getattr(c, "name_str", None) == "delete") + + print("\n== Journal ==", file=file) + print(f"entries: {n_total} copy:{n_copy} displace:{n_disp} delete:{n_del}", file=file) + if n_total: + self._journal.print(index_start=1, file=file) + else: + print("(plan is empty)", file=file) + + # --- Command builders (first arg may be WriteFileMeta) --- + + def copy(self + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + ,owner: str|None=None + ,mode: int|str|None=None + ,content: bytes|str|None=None + )-> Command: + """ + Given optional WriteFileMeta plus keyword overrides. + Does build a 'copy' command; on any argument error the command is returned with errors and NOT appended. + Returns Command. + """ + cmd = Command("copy") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + owner_v ,(mode_int ,mode_oct) ,content_b = self._resolve_owner_mode_content(wfm ,owner ,mode ,content) + + # well-formed checks + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + if not owner_v: cmd.add_error("owner must be non-empty") + if (mode_int ,mode_oct) == (None ,None): + cmd.add_error("mode must be int <= 0o7777 or 3/4-digit octal string") + if content_b is None: + cmd.add_error("content is required for copy() (bytes or str)") + + cmd.arg_dict.update({ + "write_file_dpath_str": dpath, + "write_file_fname": fname, # was write_file_fname + "owner_name": owner_v, # was owner_name_str + "mode_int": mode_int, + "mode_octal_str": mode_oct, + "content_bytes": content_b, + "provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix(), + }) + + if not cmd.errors_list: + self._journal.append(cmd) + return cmd + + def displace(self + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + )-> Command: + "Given optional WriteFileMeta plus overrides. Does build 'displace' entry or return errors. Returns Command." + cmd = Command("displace") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + cmd.arg_dict.update({ + "write_file_dpath_str": dpath, + "write_file_fname": fname, + }) + if not cmd.errors_list: + self._journal.append(cmd) + return cmd + + def delete(self + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + )-> Command: + "Given optional WriteFileMeta plus overrides. Does build 'delete' entry or return errors. Returns Command." + cmd = Command("delete") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + cmd.arg_dict.update({ + "write_file_dpath_str": dpath, + "write_file_fname": fname, + }) + if not cmd.errors_list: + self._journal.append(cmd) + return cmd + + + diff --git a/developer/source/deprecated/Stage.py b/developer/source/deprecated/Stage.py new file mode 100644 index 0000000..5cb0ba2 --- /dev/null +++ b/developer/source/deprecated/Stage.py @@ -0,0 +1,175 @@ +#!/usr/bin/env -S python3 -B +""" +Stage.py — planner runtime for staged config programs (UNPRIVILEGED). + +Config usage: + import Stage + + Stage.init( + write_file_name="." + , write_dpath="/etc/unbound" + , write_file_owner_name="root" + , write_file_permissions=0o644 # or "0644" + , read_file_contents=b"...bytes..."# bytes preferred; str is utf-8 encoded + ) + Stage.displace() + Stage.copy() + # Stage.delete() + +Notes: + - This module only RECORDS plan steps using native Python values (ints/bytes/str). + - The outer tool CBOR-encodes the accumulated plan AFTER all configs run. +""" + +from __future__ import annotations +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from dataclasses import dataclass ,field +from pathlib import Path +from typing import Any + +# ---------- helpers ---------- + +def _norm_perm(value: int|str)-> tuple[int,str]|None: + "Given: an int or a 4-char octal string. Does: validate/normalize to (int,'%04o'). Returns: tuple or None." + if isinstance(value ,int): + if 0 <= value <= 0o7777: + return value ,f"{value:04o}" + return None + if isinstance(value ,str): + s = value.strip() + if len(s)==4 and all(ch in "01234567" for ch in s): + try: + v = int(s ,8) + return v ,s + except Exception: + return None + return None + +@dataclass +class _Ctx: + "Information used by many entries in the plan, plan specific command defaults, i.e. the plan context." + read_rel_fpath: Path + stage_root_dpath: Path + defaults_map: dict[str,Any] = field(default_factory=dict) # this syntax gives each context instance a distinct dictionary. + +# ---------- planner singleton ---------- + +class _Planner: + "Given: staged config executions. Does: accumulate plan entries. Returns: plan map." + def __init__(self)-> None: + self._ctx: _Ctx|None = None + self._entries_list: list[dict[str,Any]] = [] + self._meta_map: dict[str,Any] = {} + + # ---- framework (called by outer tools) ---- + def _begin(self ,read_rel_fpath: Path ,stage_root_dpath: Path)-> None: + "Given: a config’s relative file path and stage root. Does: start context. Returns: None." + self._ctx = _Ctx(read_rel_fpath=read_rel_fpath ,stage_root_dpath=stage_root_dpath) + + def _end(self)-> None: + "Given: active context. Does: end it. Returns: None." + self._ctx = None + + def _reset(self)-> None: + "Given: n/a. Does: clear meta and entries. Returns: None." + self._entries_list.clear() + self._meta_map.clear() + self._ctx = None + + # ---- exported for outer tools ---- + def plan_entries(self)-> list[dict[str,Any]]: + "Given: n/a. Does: return a shallow copy of current entries. Returns: list[dict]." + return list(self._entries_list) + + def set_meta(self ,**kv)-> None: + "Given: keyword meta. Does: merge into meta_map. Returns: None." + self._meta_map.update(kv) + + def plan_object(self)-> dict[str,Any]: + "Packages a self-contained plan map ready for CBOR encoding. + Given: accumulated meta/entries. Does: freeze a copy and stamp a version. Returns: dict. + " + return { + "version_int": 1 + ,"meta_map": dict(self._meta_map) + ,"entries_list": list(self._entries_list) + } + + # ---- config API ---- + def init( + self + ,write_file_name: str + ,write_dpath: str + ,write_file_owner_name: str + ,write_file_permissions: int|str + ,read_file_contents: bytes|str|None=None + )-> None: + """ + Given: write filename ('.' → basename of config), destination dir path, owner name, + permissions (int or '0644'), and optional read content (bytes or str). + Does: store per-config defaults used by subsequent Stage.* calls. + Returns: None. + """ + if self._ctx is None: + raise RuntimeError("Stage.init used without active context") + fname = self._ctx.read_rel_fpath.name if write_file_name == "." else write_file_name + if isinstance(read_file_contents ,str): + content_bytes = read_file_contents.encode("utf-8") + else: + content_bytes = read_file_contents + perm_norm = _norm_perm(write_file_permissions) + if perm_norm is None: + mode_int ,mode_octal_str = None ,None + else: + mode_int ,mode_octal_str = perm_norm + self._ctx.defaults_map = { + "dst_fname": fname + ,"dst_dpath": write_dpath + ,"owner_name": write_file_owner_name + ,"mode_int": mode_int + ,"mode_octal_str": mode_octal_str + ,"content_bytes": content_bytes + } + + def _require_defaults(self)-> dict[str,Any]: + "Given: current ctx. Does: ensure Stage.init ran. Returns: defaults_map." + if self._ctx is None or not self._ctx.defaults_map: + raise RuntimeError("Stage.* called before Stage.init in this config") + return self._ctx.defaults_map + + def displace(self)-> None: + "Given: defaults. Does: append a displace op. Returns: None." + d = self._require_defaults() + self._entries_list.append({ + "op":"displace" + ,"dst_dpath": d["dst_dpath"] + ,"dst_fname": d["dst_fname"] + }) + + def copy(self)-> None: + "Given: defaults. Does: append a copy op. Returns: None." + d = self._require_defaults() + self._entries_list.append({ + "op":"copy" + ,"dst_dpath": d["dst_dpath"] + ,"dst_fname": d["dst_fname"] + ,"owner_name": d["owner_name"] + ,"mode_int": d["mode_int"] + ,"mode_octal_str": d["mode_octal_str"] + ,"content_bytes": d["content_bytes"] + }) + + def delete(self)-> None: + "Given: defaults. Does: append a delete op. Returns: None." + d = self._require_defaults() + self._entries_list.append({ + "op":"delete" + ,"dst_dpath": d["dst_dpath"] + ,"dst_fname": d["dst_fname"] + }) + +# exported singleton +Stage = _Planner() diff --git a/developer/source/deprecated/executor.py b/developer/source/deprecated/executor.py new file mode 100755 index 0000000..d8e3248 --- /dev/null +++ b/developer/source/deprecated/executor.py @@ -0,0 +1,359 @@ +#!/usr/bin/env -S python3 -B +""" +executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now) + +Phase 0 (bootstrap): + - Ensure filter program exists (create default in CWD if --filter omitted) + - Validate --stage exists + - If --phase-0-then-stop: exit here (no scan ,no execution) + +Phase 1 (outer): + - Discover every file under --stage; acceptance filter decides which to include + - Execute each config’s configure(prov ,planner ,WriteFileMeta) into ONE Planner + - Optionally print the planner; optionally stop + +Phase 2 (inner shim in same program for now; no privilege yet): + - Encode plan to CBOR and hand to inner path + - Inner decodes to a Journal and can print it +""" + +from __future__ import annotations + +# no bytecode anywhere +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import getpass +import tempfile +import runpy +import subprocess +import datetime as _dt +import stat + +# Local module: Planner.py (same directory) +from Planner import ( + Planner ,PlanProvenance ,WriteFileMeta ,Journal ,Command, +) + +# -------- default filter template (written to CWD when --filter not provided) -------- + +DEFAULT_FILTER_FILENAME = "stagehand_filter.py" + +DEFAULT_FILTER_SOURCE = """# StageHand acceptance filter (default template) +# Return True to include a config file ,False to skip it. +# You receive a PlanProvenance object named `prov`. +# +# prov fields commonly used here: +# prov.stage_root_dpath : Path → absolute path to the stage root +# prov.config_abs_fpath : Path → absolute path to the candidate file +# prov.config_rel_fpath : Path → path relative to the stage root +# prov.read_dir_dpath : Path → directory of the candidate file +# prov.read_fname : str → filename with trailing '.py' stripped (if present) +# +# Examples: +# +# 1) Accept everything (default behavior): +# def accept(prov): +# return True +# +# 2) Only accept configs in a 'dns/' namespace under the stage: +# def accept(prov): +# return prov.config_rel_fpath.as_posix().startswith("dns/") +# +# 3) Exclude editor backup files: +# def accept(prov): +# rel = prov.config_rel_fpath.as_posix() +# return not (rel.endswith("~") or rel.endswith(".swp")) +# +# 4) Only accept Python files + a few non-Python names: +# def accept(prov): +# name = prov.config_abs_fpath.name +# return name.endswith(".py") or name in {"hosts" ,"resolv.conf"} +# +# Choose ONE 'accept' definition. Below is the default: + +def accept(prov): + return True +""" + +# -------- utilities -------- + +def iso_utc_now_str() -> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _ensure_filter_file(filter_arg: str|None) -> Path: + """ + If --filter is provided ,return that path (must exist). + Otherwise ,create ./stagehand_filter.py in the CWD if missing (writing a helpful template), + and return its path. + """ + if filter_arg: + p = Path(filter_arg) + if not p.is_file(): + raise RuntimeError(f"--filter file not found: {p}") + return p + + p = Path.cwd() / DEFAULT_FILTER_FILENAME + if not p.exists(): + try: + p.write_text(DEFAULT_FILTER_SOURCE ,encoding="utf-8") + print(f"(created default filter at {p})") + except Exception as e: + raise RuntimeError(f"failed to create default filter {p}: {e}") + return p + +def _load_accept_func(filter_path: Path): + env = runpy.run_path(str(filter_path)) + fn = env.get("accept") + if not callable(fn): + raise RuntimeError(f"{filter_path}: missing callable 'accept(prov)'") + return fn + +def _walk_all_files(stage_root: Path): + """ + Yield every file (regular or symlink) under stage_root recursively. + We do not follow symlinked directories to avoid cycles. + """ + root = stage_root.resolve() + for dirpath ,dirnames ,filenames in os.walk(root ,followlinks=False): + # prune symlinked dirs (files can still be symlinks) + dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath ,d))] + for fname in filenames: + p = Path(dirpath ,fname) + try: + st = p.lstat() + if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): + yield p.resolve() + except Exception: + # unreadable/broken entries skipped + continue + +def find_config_paths(stage_root: Path ,accept_func) -> list[Path]: + out: list[tuple[int ,str ,Path]] = [] + root = stage_root.resolve() + for p in _walk_all_files(stage_root): + prov = PlanProvenance(stage_root=stage_root ,config_path=p) + try: + if accept_func(prov): + rel = p.resolve().relative_to(root) + out.append((len(rel.parts) ,rel.as_posix() ,p.resolve())) + except Exception as e: + raise RuntimeError(f"accept() failed on {prov.config_rel_fpath.as_posix()}: {e}") + out.sort(key=lambda t: (t[0] ,t[1])) # (depth ,name) + return [t[2] for t in out] + +# --- run all configs into ONE planner --- + +def _run_all_configs_into_single_planner(stage_root: Path ,cfgs: list[Path]) -> Planner: + """ + Create a single Planner and execute each config's configure(prov ,planner ,WriteFileMeta) + against it. Returns that single Planner containing the entire plan. + """ + # seed with synthetic provenance; we overwrite per config before execution + aggregate_prov = PlanProvenance(stage_root=stage_root ,config_path=stage_root / "(aggregate).py") + planner = Planner(provenance=aggregate_prov) + + for cfg in cfgs: + prov = PlanProvenance(stage_root=stage_root ,config_path=cfg) + planner.set_provenance(prov) + + env = runpy.run_path(str(cfg)) + fn = env.get("configure") + if not callable(fn): + raise RuntimeError(f"{cfg}: missing callable configure(prov ,planner ,WriteFileMeta)") + + fn(prov ,planner ,WriteFileMeta) + + # annotate meta once ,on the single planner's journal + j = planner.journal() + j.set_meta( + generator_prog_str="executor.py", + generated_at_utc_str=iso_utc_now_str(), + user_name_str=getpass.getuser(), + host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown", + stage_root_dpath_str=str(stage_root.resolve()), + configs_list=[str(p.resolve().relative_to(stage_root.resolve())) for p in cfgs], + ) + return planner + +# ----- CBOR “matchbox” (simple wrapper kept local to executor) ----- + +def _plan_to_cbor_bytes(planner: Planner) -> bytes: + """Serialize a Planner's Journal to CBOR bytes.""" + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + plan_dict = planner.journal().as_dictionary() + return cbor2.dumps(plan_dict ,canonical=True) + +def _journal_from_cbor_bytes(data: bytes) -> Journal: + """Rebuild a Journal from CBOR bytes.""" + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + obj = cbor2.loads(data) + if not isinstance(obj ,dict): + raise ValueError("CBOR root must be a dict") + return Journal(plan_dict=obj) + +# -------- inner executor (phase 2) -------- + +def _inner_main(plan_path: Path ,phase2_print: bool ,phase2_then_stop: bool) -> int: + """Inner executor path: decode CBOR → Journal; optionally print; (apply TBD).""" + try: + data = Path(plan_path).read_bytes() + except Exception as e: + print(f"error: failed to read plan file: {e}" ,file=sys.stderr) + return 2 + + try: + journal = _journal_from_cbor_bytes(data) + except Exception as e: + print(f"error: failed to decode CBOR: {e}" ,file=sys.stderr) + return 2 + + if phase2_print: + journal.print() + + if phase2_then_stop: + return 0 + + # (Stage 3 apply would go here; omitted in MVP) + return 0 + +# -------- outer executor (phase 1 & handoff) -------- + +def _outer_main(stage_root: Path ,accept_func ,args) -> int: + if not stage_root.is_dir(): + print(f"error: --stage not a directory: {stage_root}" ,file=sys.stderr) + return 2 + + cfgs = find_config_paths(stage_root ,accept_func) + if not cfgs: + print("No configuration files found.") + return 0 + + try: + master = _run_all_configs_into_single_planner(stage_root ,cfgs) + except SystemExit: + raise + except Exception as e: + print(f"error: executing configs: {e}" ,file=sys.stderr) + return 2 + + if args.phase_1_print: + master.print() + + if args.phase_1_then_stop: + return 0 + + # Phase 2: encode CBOR and invoke inner path (same script ,--inner) + try: + cbor_bytes = _plan_to_cbor_bytes(master) + except Exception as e: + print(f"error: CBOR encode failed: {e}" ,file=sys.stderr) + return 2 + + with tempfile.NamedTemporaryFile(prefix="stagehand_plan_" ,suffix=".cbor" ,delete=False) as tf: + tf.write(cbor_bytes) + plan_path = tf.name + + try: + cmd = [ + sys.executable, + str(Path(__file__).resolve()), + "--inner", + "--plan" ,plan_path, + ] + if args.phase_2_print: + cmd.append("--phase-2-print") + if args.phase_2_then_stop: + cmd.append("--phase-2-then-stop") + + proc = subprocess.run(cmd) + return proc.returncode + finally: + try: + os.unlink(plan_path) + except Exception: + pass + +# -------- CLI -------- + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser( + prog="executor.py", + description="StageHand outer/inner executor (plan → CBOR → decode).", + ) + ap.add_argument("--stage" ,default="stage", + help="stage root directory (default: ./stage)") + ap.add_argument( + "--filter", + default="", + help=f"path to acceptance filter program exporting accept(prov) " + f"(default: ./{DEFAULT_FILTER_FILENAME}; created if missing)" + ) + ap.add_argument( + "--phase-0-then-stop", + action="store_true", + help="stop after arg checks & filter bootstrap (no stage scan)" + ) + + # Phase-1 (outer) controls + ap.add_argument("--phase-1-print" ,action="store_true" ,help="print master planner (phase 1)") + ap.add_argument("--phase-1-then-stop" ,action="store_true" ,help="stop after phase 1") + + # Phase-2 (inner) controls (outer forwards these to inner) + ap.add_argument("--phase-2-print" ,action="store_true" ,help="print decoded journal (phase 2)") + ap.add_argument("--phase-2-then-stop" ,action="store_true" ,help="stop after phase 2 decode") + + # Inner-only flags (not for users) + ap.add_argument("--inner" ,action="store_true" ,help=argparse.SUPPRESS) + ap.add_argument("--plan" ,default=None ,help=argparse.SUPPRESS) + + args = ap.parse_args(argv) + + # Inner path + if args.inner: + if not args.plan: + print("error: --inner requires --plan " ,file=sys.stderr) + return 2 + return _inner_main(Path(args.plan), + phase2_print=args.phase_2_print, + phase2_then_stop=args.phase_2_then_stop) + + # Phase 0: bootstrap & stop (no scan) + stage_root = Path(args.stage) + try: + filter_path = _ensure_filter_file(args.filter or None) + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + + if not stage_root.exists(): + print(f"error: --stage not found: {stage_root}" ,file=sys.stderr) + return 2 + if not stage_root.is_dir(): + print(f"error: --stage is not a directory: {stage_root}" ,file=sys.stderr) + return 2 + + if args.phase_0_then_stop: + print(f"phase-0 OK: stage at {stage_root.resolve()} and filter at {filter_path}") + return 0 + + # Load acceptance function and proceed with outer + try: + accept_func = _load_accept_func(filter_path) + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + + return _outer_main(stage_root ,accept_func ,args) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/deprecated/executor_2.py b/developer/source/deprecated/executor_2.py new file mode 100644 index 0000000..ee13bdd --- /dev/null +++ b/developer/source/deprecated/executor_2.py @@ -0,0 +1,360 @@ + +#!/usr/bin/env -S python3 -B +""" +executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now) + +Phase 0 (bootstrap): + - Ensure filter program exists (create default in CWD if --filter omitted) + - Validate --stage exists + - If --phase-0-then-stop: exit here (no scan, no execution) + +Phase 1 (outer): + - Discover every file under --stage; acceptance filter decides which to include + - Execute each config’s configure(prov, planner, WriteFileMeta) into ONE Planner + - Optionally print the planner; optionally stop + +Phase 2 (inner shim in same program for now; no privilege yet): + - Encode plan to CBOR and hand to inner path + - Inner decodes to a Journal and can print it +""" + +from __future__ import annotations + +# no bytecode anywhere +import sys, os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1") + +from pathlib import Path +import argparse +import getpass +import tempfile +import runpy +import subprocess +import datetime as _dt +import stat + +# Local module: Planner.py (same directory) +from Planner import ( + Planner, PlanProvenance, WriteFileMeta, Journal, Command, +) + +# -------- default filter template (written to CWD when --filter not provided) -------- + +DEFAULT_FILTER_FILENAME = "stagehand_filter.py" + +DEFAULT_FILTER_SOURCE = """# StageHand acceptance filter (default template) +# Return True to include a config file, False to skip it. +# You receive a PlanProvenance object named `prov`. +# +# prov fields commonly used here: +# prov.stage_root_dpath : Path → absolute path to the stage root +# prov.config_abs_fpath : Path → absolute path to the candidate file +# prov.config_rel_fpath : Path → path relative to the stage root +# prov.read_dir_dpath : Path → directory of the candidate file +# prov.read_fname : str → filename with trailing '.py' stripped (if present) +# +# Examples: +# +# 1) Accept everything (default behavior): +# def accept(prov): +# return True +# +# 2) Only accept configs in a 'dns/' namespace under the stage: +# def accept(prov): +# return prov.config_rel_fpath.as_posix().startswith("dns/") +# +# 3) Exclude editor backup files: +# def accept(prov): +# rel = prov.config_rel_fpath.as_posix() +# return not (rel.endswith("~") or rel.endswith(".swp")) +# +# 4) Only accept Python files + a few non-Python names: +# def accept(prov): +# name = prov.config_abs_fpath.name +# return name.endswith(".py") or name in {"hosts", "resolv.conf"} +# +# Choose ONE 'accept' definition. Below is the default: + +def accept(prov): + return True +""" + +# -------- utilities -------- + +def iso_utc_now_str() -> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _ensure_filter_file(filter_arg: str|None) -> Path: + """ + If --filter is provided, return that path (must exist). + Otherwise, create ./stagehand_filter.py in the CWD if missing (writing a helpful template), + and return its path. + """ + if filter_arg: + p = Path(filter_arg) + if not p.is_file(): + raise RuntimeError(f"--filter file not found: {p}") + return p + + p = Path.cwd() / DEFAULT_FILTER_FILENAME + if not p.exists(): + try: + p.write_text(DEFAULT_FILTER_SOURCE, encoding="utf-8") + print(f"(created default filter at {p})") + except Exception as e: + raise RuntimeError(f"failed to create default filter {p}: {e}") + return p + +def _load_accept_func(filter_path: Path): + env = runpy.run_path(str(filter_path)) + fn = env.get("accept") + if not callable(fn): + raise RuntimeError(f"{filter_path}: missing callable 'accept(prov)'") + return fn + +def _walk_all_files(stage_root: Path): + """ + Yield every file (regular or symlink) under stage_root recursively. + We do not follow symlinked directories to avoid cycles. + """ + root = stage_root.resolve() + for dirpath, dirnames, filenames in os.walk(root, followlinks=False): + # prune symlinked dirs (files can still be symlinks) + dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath, d))] + for fname in filenames: + p = Path(dirpath, fname) + try: + st = p.lstat() + if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): + yield p.resolve() + except Exception: + # unreadable/broken entries skipped + continue + +def find_config_paths(stage_root: Path, accept_func) -> list[Path]: + """ + Return files accepted by the Python acceptance function: accept(prov) → True/False. + """ + out: list[Path] = [] + for p in _walk_all_files(stage_root): + prov = PlanProvenance(stage_root=stage_root, config_path=p) + try: + if accept_func(prov): + out.append(p) + except Exception as e: + raise RuntimeError(f"accept() failed on {prov.config_rel_fpath.as_posix()}: {e}") + return sorted(out, key=lambda q: q.as_posix()) + +# --- run all configs into ONE planner --- + +def _run_all_configs_into_single_planner(stage_root: Path, cfgs: list[Path]) -> Planner: + """ + Create a single Planner and execute each config's configure(prov, planner, WriteFileMeta) + against it. Returns that single Planner containing the entire plan. + """ + # seed with synthetic provenance; we overwrite per config before execution + aggregate_prov = PlanProvenance(stage_root=stage_root, config_path=stage_root / "(aggregate).py") + planner = Planner(provenance=aggregate_prov) + + for cfg in cfgs: + prov = PlanProvenance(stage_root=stage_root, config_path=cfg) + planner.set_provenance(prov) + + env = runpy.run_path(str(cfg)) + fn = env.get("configure") + if not callable(fn): + raise RuntimeError(f"{cfg}: missing callable configure(prov, planner, WriteFileMeta)") + + fn(prov, planner, WriteFileMeta) + + # annotate meta once, on the single planner's journal + j = planner.journal() + j.set_meta( + generator_prog_str="executor.py", + generated_at_utc_str=iso_utc_now_str(), + user_name_str=getpass.getuser(), + host_name_str=os.uname().nodename if hasattr(os, "uname") else "unknown", + stage_root_dpath_str=str(stage_root.resolve()), + configs_list=[str(p.resolve().relative_to(stage_root.resolve())) for p in cfgs], + ) + return planner + +# ----- CBOR “matchbox” (simple wrapper kept local to executor) ----- + +def _plan_to_cbor_bytes(planner: Planner) -> bytes: + """Serialize a Planner's Journal to CBOR bytes.""" + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + plan_dict = planner.journal().as_dictionary() + return cbor2.dumps(plan_dict, canonical=True) + +def _journal_from_cbor_bytes(data: bytes) -> Journal: + """Rebuild a Journal from CBOR bytes.""" + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + obj = cbor2.loads(data) + if not isinstance(obj, dict): + raise ValueError("CBOR root must be a dict") + return Journal(plan_dict=obj) + +# -------- inner executor (phase 2) -------- + +def _inner_main(plan_path: Path, phase2_print: bool, phase2_then_stop: bool) -> int: + """Inner executor path: decode CBOR → Journal; optionally print; (apply TBD).""" + try: + data = Path(plan_path).read_bytes() + except Exception as e: + print(f"error: failed to read plan file: {e}", file=sys.stderr) + return 2 + + try: + journal = _journal_from_cbor_bytes(data) + except Exception as e: + print(f"error: failed to decode CBOR: {e}", file=sys.stderr) + return 2 + + if phase2_print: + journal.print() + + if phase2_then_stop: + return 0 + + # (Stage 3 apply would go here; omitted in MVP) + return 0 + +# -------- outer executor (phase 1 & handoff) -------- + +def _outer_main(stage_root: Path, accept_func, args) -> int: + if not stage_root.is_dir(): + print(f"error: --stage not a directory: {stage_root}", file=sys.stderr) + return 2 + + cfgs = find_config_paths(stage_root, accept_func) + if not cfgs: + print("No configuration files found.") + return 0 + + try: + master = _run_all_configs_into_single_planner(stage_root, cfgs) + except SystemExit: + raise + except Exception as e: + print(f"error: executing configs: {e}", file=sys.stderr) + return 2 + + if args.phase_1_print: + master.print() + + if args.phase_1_then_stop: + return 0 + + # Phase 2: encode CBOR and invoke inner path (same script, --inner) + try: + cbor_bytes = _plan_to_cbor_bytes(master) + except Exception as e: + print(f"error: CBOR encode failed: {e}", file=sys.stderr) + return 2 + + with tempfile.NamedTemporaryFile(prefix="stagehand_plan_", suffix=".cbor", delete=False) as tf: + tf.write(cbor_bytes) + plan_path = tf.name + + try: + cmd = [ + sys.executable, + str(Path(__file__).resolve()), + "--inner", + "--plan", plan_path, + ] + if args.phase_2_print: + cmd.append("--phase-2-print") + if args.phase_2_then_stop: + cmd.append("--phase-2-then-stop") + + proc = subprocess.run(cmd) + return proc.returncode + finally: + try: + os.unlink(plan_path) + except Exception: + pass + +# -------- CLI -------- + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser( + prog="executor.py", + description="StageHand outer/inner executor (plan → CBOR → decode).", + ) + ap.add_argument("--stage", default="stage", + help="stage root directory (default: ./stage)") + ap.add_argument( + "--filter", + default="", + help=f"path to acceptance filter program exporting accept(prov) " + f"(default: ./{DEFAULT_FILTER_FILENAME}; created if missing)" + ) + ap.add_argument( + "--phase-0-then-stop", + action="store_true", + help="stop after arg checks & filter bootstrap (no stage scan)" + ) + + # Phase-1 (outer) controls + ap.add_argument("--phase-1-print", action="store_true", help="print master planner (phase 1)") + ap.add_argument("--phase-1-then-stop", action="store_true", help="stop after phase 1") + + # Phase-2 (inner) controls (outer forwards these to inner) + ap.add_argument("--phase-2-print", action="store_true", help="print decoded journal (phase 2)") + ap.add_argument("--phase-2-then-stop", action="store_true", help="stop after phase 2 decode") + + # Inner-only flags (not for users) + ap.add_argument("--inner", action="store_true", help=argparse.SUPPRESS) + ap.add_argument("--plan", default=None, help=argparse.SUPPRESS) + + args = ap.parse_args(argv) + + # Inner path + if args.inner: + if not args.plan: + print("error: --inner requires --plan ", file=sys.stderr) + return 2 + return _inner_main(Path(args.plan), + phase2_print=args.phase_2_print, + phase2_then_stop=args.phase_2_then_stop) + + # Phase 0: bootstrap & stop (no scan) + stage_root = Path(args.stage) + try: + filter_path = _ensure_filter_file(args.filter or None) + except Exception as e: + print(f"error: {e}", file=sys.stderr) + return 2 + + if not stage_root.exists(): + print(f"error: --stage not found: {stage_root}", file=sys.stderr) + return 2 + if not stage_root.is_dir(): + print(f"error: --stage is not a directory: {stage_root}", file=sys.stderr) + return 2 + + if args.phase_0_then_stop: + print(f"phase-0 OK: stage at {stage_root.resolve()} and filter at {filter_path}") + return 0 + + # Load acceptance function and proceed with outer + try: + accept_func = _load_accept_func(filter_path) + except Exception as e: + print(f"error: {e}", file=sys.stderr) + return 2 + + return _outer_main(stage_root, accept_func, args) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/deprecated/stage_ls.py b/developer/source/deprecated/stage_ls.py new file mode 100755 index 0000000..93dd3d2 --- /dev/null +++ b/developer/source/deprecated/stage_ls.py @@ -0,0 +1,193 @@ +#!/usr/bin/env -S python3 -B +""" +ls_stage.py — list staged files and their header-declared install metadata. + +Header line format (first line of each file): + + +- owner: username string (need not exist until install time) +- permissions: four octal digits, e.g. 0644 +- write_file_name: '.' means use the read file's basename, else use the given POSIX filename +- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound) + +Output formats: +- list (default): "read_file_path: owner permissions write_file_name target_directory_path" +- table: columns aligned for readability +""" + +from __future__ import annotations + +# never write bytecode (root/sudo friendly) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from dataclasses import dataclass +from pathlib import Path +import argparse +import re + +# === Stage utilities (importable) === + +def stage_read_file_paths(stage_root: Path)-> list[Path]: + """Given: stage_root directory. + Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root. + Returns: list[Path] of POSIX-order sorted relative paths (no leading slash). + """ + rels: list[Path] = [] + for p in stage_root.rglob("*"): + try: + if p.is_file(): # follows symlink-to-file + rels.append(p.relative_to(stage_root)) + except (FileNotFoundError ,RuntimeError): + # broken link or race; skip conservatively + continue + return sorted(rels ,key=lambda x: x.as_posix()) + +@dataclass +class StageRow: + read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged") + owner: str # token[0] + perm_octal_str: str # token[1], exactly as in header (validated ####) + perm_int: int # token[1] parsed as base-8 + write_name: str # token[2] ('.' resolved to read_rel.name) + target_dir: Path # token[3] (Path) + header_raw: str # original header line (sans newline) + + # convenience + def write_abs(self ,root: Path)-> Path: + return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name + +# header parsing rules +_PERM_RE = re.compile(r"^[0-7]{4}$") + +def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]: + """Given: raw first line of a staged file and its stage-relative path. + Does: parse ' ' with max 4 tokens (target_dir may contain spaces if quoted not required). + Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem. + """ + # strip BOM and trailing newline/spaces + h = header.lstrip("\ufeff").strip() + if not h: + return None ,f"empty header line in {read_rel}" + parts = h.split(maxsplit=3) + if len(parts) != 4: + return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}" + owner ,perm_s ,write_name ,target_dir_s = parts + + if not _PERM_RE.fullmatch(perm_s): + return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits" + + # resolve '.' → basename + resolved_write_name = read_rel.name if write_name == "." else write_name + + # MVP guard: write_name should be a single filename (no '/') + if "/" in resolved_write_name: + return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}" + + # target dir may be absolute (recommended) or relative (we treat relative as under the install root) + target_dir = Path(target_dir_s) + + try: + row = StageRow( + read_rel = read_rel + ,owner = owner + ,perm_octal_str = perm_s + ,perm_int = int(perm_s ,8) + ,write_name = resolved_write_name + ,target_dir = target_dir + ,header_raw = h + ) + return row ,None + except Exception as e: + return None ,f"internal parse error in {read_rel}: {e}" + +def read_first_line(p: Path)-> str: + """Return the first line (sans newline). UTF-8 with BOM tolerant.""" + with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh: + line = fh.readline() + return line.rstrip("\n\r") + +def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]: + """Given: stage_root. + Does: enumerate files, parse each header line, collect rows and errors. + Returns: (rows, errors) + """ + rows: list[StageRow] = [] + errs: list[str] = [] + for rel in stage_read_file_paths(stage_root): + abs_path = stage_root / rel + try: + header = read_first_line(abs_path) + except Exception as e: + errs.append(f"read error in {rel}: {e}") + continue + row ,err = parse_stage_header_line(header ,rel) + if err: + errs.append(err) + else: + rows.append(row) # type: ignore[arg-type] + return rows ,errs + +# === Printers === + +def print_list(rows: list[StageRow])-> None: + """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line.""" + for r in rows: + print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}") + +def print_table(rows: list[StageRow])-> None: + """Aligned table printer (no headers, just data in columns).""" + if not rows: + return + a = [r.read_rel.as_posix() for r in rows] + b = [r.owner for r in rows] + c = [r.perm_octal_str for r in rows] + d = [r.write_name for r in rows] + e = [str(r.target_dir) for r in rows] + wa = max(len(s) for s in a) + wb = max(len(s) for s in b) + wc = max(len(s) for s in c) + wd = max(len(s) for s in d) + # e (target_dir) left ragged + for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e): + print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}") + +# === Orchestrator === + +def ls_stage(stage_root: Path ,fmt: str="list")-> int: + """Given: stage_root and output format ('list'|'table'). + Does: scan and parse staged files, print in the requested format; report syntax errors to stderr. + Returns: 0 on success; 1 if any syntax errors were encountered. + """ + rows ,errs = scan_stage(stage_root) + if fmt == "table": + print_table(rows) + else: + print_list(rows) + if errs: + print("\nerror(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 1 + return 0 + +# === CLI === + +def main(argv: list[str] | None=None)-> int: + ap = argparse.ArgumentParser( + prog="ls_stage.py" + ,description="List staged files and their header-declared install metadata." + ) + ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list" + ,help="output format (default: list)") + args = ap.parse_args(argv) + stage_root = Path(args.stage) + if not stage_root.exists() or not stage_root.is_dir(): + print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr) + return 2 + return ls_stage(stage_root ,fmt=args.format) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft b/developer/source/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft new file mode 100644 index 0000000..eaee5be --- /dev/null +++ b/developer/source/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft @@ -0,0 +1,16 @@ +table inet NO-IPV6 { + chain input { + type filter hook input priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop + } + + chain output { + type filter hook output priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop + } + + chain forward { + type filter hook forward priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 forward" drop + } +} diff --git a/developer/source/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft b/developer/source/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft new file mode 100644 index 0000000..6c31446 --- /dev/null +++ b/developer/source/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft @@ -0,0 +1,47 @@ +table inet SUBU-DNS-REDIRECT { + chain output { + type nat hook output priority -100; policy accept; + + # Redirect DNS for the subu UIDs to local Unbound listeners + meta skuid 2017 udp dport 53 redirect to :5301 + meta skuid 2018 udp dport 53 redirect to :5302 + meta skuid 2017 tcp dport 53 redirect to :5301 + meta skuid 2018 tcp dport 53 redirect to :5302 + } +} + +table inet SUBU-PORT-EGRESS { + chain output { + type filter hook output priority 0; policy accept; + + # Always allow loopback on egress + oifname "lo" accept + + # No IPv6 for subu (until you reintroduce v6) + meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop + + ##### x6 (UID 2018) + # Block some exfil channels regardless of iface + meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop + meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop + meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop + + # (Optional) allow ICMP echo out via x6 + meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept + + # Enforce interface binding + meta skuid 2018 oifname "x6" accept + meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop + + ##### US (UID 2017) + meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission" + meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN" + meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)" + + # (Optional) ICMP via US + meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept + + meta skuid 2017 oifname "US" accept + meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop + } +} diff --git a/developer/source/deprecated/stage_orig/etc/systemd/system/unbound@.service b/developer/source/deprecated/stage_orig/etc/systemd/system/unbound@.service new file mode 100644 index 0000000..ba2919b --- /dev/null +++ b/developer/source/deprecated/stage_orig/etc/systemd/system/unbound@.service @@ -0,0 +1,19 @@ +[Unit] +Description=Unbound DNS instance for %i (per-subu tunnel egress) +After=network-online.target wg-quick@%i.service +Requires=wg-quick@%i.service +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf +User=unbound +Group=unbound +Restart=on-failure +RestartSec=2s +AmbientCapabilities=CAP_NET_BIND_SERVICE +CapabilityBoundingSet=CAP_NET_BIND_SERVICE +NoNewPrivileges=true + +[Install] +WantedBy=multi-user.target diff --git a/developer/source/deprecated/stage_orig/etc/unbound/unbound-US.conf b/developer/source/deprecated/stage_orig/etc/unbound/unbound-US.conf new file mode 100644 index 0000000..1995438 --- /dev/null +++ b/developer/source/deprecated/stage_orig/etc/unbound/unbound-US.conf @@ -0,0 +1,18 @@ +server: + username: "unbound" + chroot: "" + directory: "/etc/unbound" + do-daemonize: no + interface: 127.0.0.1@5301 + hide-identity: yes + hide-version: yes + harden-glue: yes + harden-dnssec-stripped: yes + qname-minimisation: yes + prefetch: yes + outgoing-interface: 10.0.0.1 + +forward-zone: + name: "." + forward-addr: 1.1.1.1 + forward-addr: 1.0.0.1 diff --git a/developer/source/deprecated/stage_orig/etc/unbound/unbound-x6.conf b/developer/source/deprecated/stage_orig/etc/unbound/unbound-x6.conf new file mode 100644 index 0000000..ed49241 --- /dev/null +++ b/developer/source/deprecated/stage_orig/etc/unbound/unbound-x6.conf @@ -0,0 +1,18 @@ +server: + username: "unbound" + chroot: "" + directory: "/etc/unbound" + do-daemonize: no + interface: 127.0.0.1@5302 + hide-identity: yes + hide-version: yes + harden-glue: yes + harden-dnssec-stripped: yes + qname-minimisation: yes + prefetch: yes + outgoing-interface: 10.8.0.2 + +forward-zone: + name: "." + forward-addr: 1.1.1.1 + forward-addr: 1.0.0.1 diff --git a/developer/source/deprecated/stage_orig/usr/local/sbin/DNS_status.sh b/developer/source/deprecated/stage_orig/usr/local/sbin/DNS_status.sh new file mode 100755 index 0000000..d4db58e --- /dev/null +++ b/developer/source/deprecated/stage_orig/usr/local/sbin/DNS_status.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail +echo "== DNS status ==" +systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true +echo +echo "== nftables ==" +nft list table inet NAT-DNS-REDIRECT || true +echo +echo "== Unbound logs (last 50 lines each) ==" +journalctl -u unbound@US -n 50 --no-pager || true +echo +journalctl -u unbound@x6 -n 50 --no-pager || true diff --git a/developer/source/deprecated/stage_show_plan.py b/developer/source/deprecated/stage_show_plan.py new file mode 100644 index 0000000..075e65b --- /dev/null +++ b/developer/source/deprecated/stage_show_plan.py @@ -0,0 +1,97 @@ +#!/usr/bin/env -S python3 -B +""" +stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan. + +Given: a stage root directory. +Does: loads Stage.py, executes each config, builds a native plan map, summarizes it. +Returns: exit code 0 on success, non-zero on error. +""" +from __future__ import annotations +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib + +# ---------- helpers ---------- + +def _load_stage_module(stage_root_dpath: Path): + "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module." + mod_fpath = stage_root_dpath/"Stage.py" + if not mod_fpath.exists(): + raise FileNotFoundError(f"Stage.py not found at {mod_fpath}") + spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath)) + mod = importlib.util.module_from_spec(spec) + sys.modules["Stage"] = mod + assert spec and spec.loader + spec.loader.exec_module(mod) # type: ignore + return mod + +def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]: + "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]." + rel_fpath_list: list[Path] = [] + for p in stage_root_dpath.rglob("*.py"): + if p.name == "Stage.py": continue + if p.is_file(): + rel_fpath_list.append(p.relative_to(stage_root_dpath)) + return sorted(rel_fpath_list ,key=lambda x: x.as_posix()) + +def _sha256_hex(b: bytes)-> str: + "Given: bytes. Does: sha256. Returns: hex string." + return hashlib.sha256(b).hexdigest() + +# ---------- main ---------- + +def main(argv: list[str]|None=None)-> int: + "Given: CLI. Does: show plan. Returns: exit code." + ap = argparse.ArgumentParser(prog="stage_show_plan.py" + ,description="Run staged config scripts and print the resulting plan.") + ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)") + args = ap.parse_args(argv) + + stage_root_dpath = Path(args.stage) + StageMod = _load_stage_module(stage_root_dpath) + Stage = StageMod.Stage + Stage._reset() + Stage.set_meta( + planner_user_name=getpass.getuser() + ,planner_uid_int=os.getuid() + ,planner_gid_int=os.getgid() + ,host_name=socket.gethostname() + ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) + ) + + for rel_fpath in _config_rel_fpaths(stage_root_dpath): + Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath) + runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__") + Stage._end() + + plan_map = Stage.plan_object() + entries_list = plan_map["entries_list"] + print(f"Plan version: {plan_map['version_int']}") + print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} " + f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}") + print(f"Created: {plan_map['meta_map'].get('created_utc_str')}") + print(f"Entries: {len(entries_list)}\n") + + for i ,e_map in enumerate(entries_list ,1): + op = e_map.get("op") + dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}" + if op == "copy": + content = e_map.get("content_bytes") or b"" + sz = len(content) + mode = e_map.get("mode_octal_str") or "????" + owner = e_map.get("owner_name") or "?" + h = _sha256_hex(content) + print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…") + elif op == "displace": + print(f"{i:02d}. displace -> {dst_fpath_str}") + elif op == "delete": + print(f"{i:02d}. delete -> {dst_fpath_str}") + else: + print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})") + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/executor_inner.py b/developer/source/executor_inner.py new file mode 100644 index 0000000..32999e2 --- /dev/null +++ b/developer/source/executor_inner.py @@ -0,0 +1,379 @@ +#!/usr/bin/env -S python3 -B +""" +executor_inner.py — Man_In_Gray phase-2 inner executor + +- Reads a CBOR plan file (--plan) +- Decodes to Journal (via Planner.py model) +- Optional checkpoints: + wellformed → sanity-1 → validity → sanity-2 → execute +- Default behavior (no stop flags): apply the journal +""" + +from __future__ import annotations + +# no bytecode anywhere +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import pwd +import stat as _stat + +# Journal model comes from the same directory's Planner.py +from Planner import ( + Journal, +) + +# -- helpers -- + +def _realpath(p: str|Path)-> Path: + "Resolve as much as possible without requiring target leaf to exist." + return Path(os.path.realpath(str(p))) + +def _is_under(child: Path ,root: Path)-> bool: + "True if child is the same as or within root (after realpath)." + try: + child_r = _realpath(child) + root_r = _realpath(root) + # Python <3.9 compat for is_relative_to: + child_parts = child_r.as_posix().rstrip("/") + "/" + root_parts = root_r.as_posix().rstrip("/") + "/" + return child_parts.startswith(root_parts) + except Exception: + return False + +# --- CBOR load --- + +def _journal_from_cbor_bytes(data: bytes)-> Journal: + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + obj = cbor2.loads(data) + if not isinstance(obj ,dict): + raise ValueError("CBOR root must be a dict") + return Journal(plan_dict=obj) + +# --- pretty helpers --- + +def _dst_from(ad: dict)-> str: + d = ad.get("write_file_dpath_str") or "?" + f = ad.get("write_file_fname") or "?" + try: + from pathlib import Path as _P + if isinstance(d ,str) and isinstance(f ,str) and "/" not in f: + return (_P(d)/f).as_posix() + except Exception: + pass + return f"{d}/{f}" + +def _mode_from_entry(ad: dict)-> int: + m = ad.get("mode_int") + if isinstance(m ,int): return m + s = ad.get("mode_octal_str") + if isinstance(s ,str): + try: + return int(s ,8) + except Exception: + pass + raise ValueError("invalid mode") + +# --- Phase: wellformed (schema/shape) --- + +def check_wellformed(journal: Journal)-> list[str]: + errs: list[str] = [] + for i ,cmd in enumerate(journal.command_list ,start=1): + op = getattr(cmd ,"name_str" ,None) + ad = getattr(cmd ,"arg_dict" ,None) + if op not in {"copy" ,"displace" ,"delete"}: + errs.append(f"[{i}] unknown op: {op!r}") + continue + if not isinstance(ad ,dict): + errs.append(f"[{i}] arg_dict missing") + continue + d = ad.get("write_file_dpath_str") + f = ad.get("write_file_fname") + if not (isinstance(d ,str) and d.startswith("/")): + errs.append(f"[{i}] write_file_dpath_str must be absolute: {d!r}") + if not (isinstance(f ,str) and "/" not in f and f not in {"." ,""}): + errs.append(f"[{i}] write_file_fname must be a bare filename: {f!r}") + if op == "copy": + if "owner_name" not in ad: + errs.append(f"[{i}] copy: owner_name missing") + if "content_bytes" not in ad: + errs.append(f"[{i}] copy: content_bytes missing") + if "mode_int" not in ad and "mode_octal_str" not in ad: + errs.append(f"[{i}] copy: mode missing") + return errs + +# --- Phase: sanity-1 (cheap static sanity) --- + +def check_sanity_1(journal: Journal ,allowed_roots: list[Path])-> list[str]: + """ + Scope fence: every destination directory must be under at least one allowed root. + Default allowed roots = [/etc, cwd_of_inner]. + """ + errs: list[str] = [] + allowed_str = ", ".join(r.as_posix() for r in allowed_roots) + for i ,cmd in enumerate(journal.command_list ,start=1): + ad = cmd.arg_dict + d = ad.get("write_file_dpath_str") + if not isinstance(d ,str): + # wellformed will report it; skip here + continue + d_real = _realpath(d) + if not any(_is_under(d_real ,root) for root in allowed_roots): + errs.append(f"[{i}] dst dir outside allowed roots: {d_real.as_posix()} (allowed: {allowed_str})") + return errs + +# --- Phase: validity (system lookups) --- + +def check_validity(journal: Journal)-> list[str]: + errs: list[str] = [] + for i ,cmd in enumerate(journal.command_list ,start=1): + ad = cmd.arg_dict + if cmd.name_str == "copy": + owner = ad.get("owner_name") + try: + pwd.getpwnam(owner) + except Exception: + errs.append(f"[{i}] unknown owner_name: {owner!r} (dst={_dst_from(ad)})") + try: + _ = _mode_from_entry(ad) + except Exception as e: + errs.append(f"[{i}] bad mode: {e} (dst={_dst_from(ad)})") + cb = ad.get("content_bytes") + if not isinstance(cb ,(bytes ,bytearray)): + errs.append(f"[{i}] content_bytes not bytes-like (dst={_dst_from(ad)})") + return errs + +# --- Phase: sanity-2 (filesystem checks, no mutation) --- + +def _safe_open_dir(dpath: str)-> int: + fd = os.open(dpath ,os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW) + st = os.fstat(fd) + if not _stat.S_ISDIR(st.st_mode): + os.close(fd) ; raise OSError("not a directory") + return fd + +def check_sanity_2(journal: Journal)-> list[str]: + errs: list[str] = [] + opened: dict[str ,int] = {} + try: + # ensure destination directories are openable (and not symlinked dirs) + for i ,cmd in enumerate(journal.command_list ,start=1): + d = cmd.arg_dict.get("write_file_dpath_str") + if not isinstance(d ,str): # already flagged in wellformed + continue + if d in opened: + continue + try: + opened[d] = _safe_open_dir(d) + except Exception as e: + errs.append(f"[{i}] cannot open destination dir: {d} ({e})") + + # also warn on multiple writes to same (d,f) without displacement/delete + seen: set[tuple[str ,str]] = set() + for i ,cmd in enumerate(journal.command_list ,start=1): + ad = cmd.arg_dict + key = (ad.get("write_file_dpath_str") ,ad.get("write_file_fname")) + if key in seen and cmd.name_str == "copy": + errs.append(f"[{i}] multiple writes to same target without prior displace/delete: {_dst_from(ad)}") + seen.add(key) + + finally: + for fd in opened.values(): + try: + os.close(fd) + except Exception: + pass + return errs + +# --- Execute (mutation) --- + +def _fsync_dirfd(dirfd: int)-> None: + try: + os.fsync(dirfd) + except Exception: + pass + +def _exists_regular_nosymlink_at(dirfd: int ,fname: str)-> bool: + try: + st = os.lstat(fname ,dir_fd=dirfd) + except FileNotFoundError: + return False + if _stat.S_ISLNK(st.st_mode): raise OSError("target is a symlink") + if not _stat.S_ISREG(st.st_mode): raise OSError("target not a regular file") + return True + +def _apply_displace(d: str ,f: str)-> None: + dirfd = _safe_open_dir(d) + try: + if not _exists_regular_nosymlink_at(dirfd ,f): + return + import time as _time + ts = _time.strftime("%Y%m%dT%H%M%SZ" ,_time.gmtime()) + bak = f"{f}.{ts}" + os.rename(f ,bak ,src_dir_fd=dirfd ,dst_dir_fd=dirfd) + _fsync_dirfd(dirfd) + finally: + os.close(dirfd) + +def _apply_copy(d: str ,f: str ,owner: str ,mode_int: int ,content: bytes)-> None: + pw = pwd.getpwnam(owner) + uid ,gid = pw.pw_uid ,pw.pw_gid + dirfd = _safe_open_dir(d) + try: + tmp = f".{f}.mig.tmp.{os.getpid()}" + tfd = os.open(tmp ,os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW ,0o600 ,dir_fd=dirfd) + try: + mv = memoryview(content) + off = 0 + while off < len(mv): + n = os.write(tfd ,mv[off:]) + if n <= 0: raise OSError("short write") + off += n + os.fsync(tfd) + os.fchown(tfd ,uid ,gid) + os.fchmod(tfd ,mode_int) + os.fsync(tfd) + finally: + os.close(tfd) + os.rename(tmp ,f ,src_dir_fd=dirfd ,dst_dir_fd=dirfd) + _fsync_dirfd(dirfd) + finally: + os.close(dirfd) + +def _apply_delete(d: str ,f: str)-> None: + dirfd = _safe_open_dir(d) + try: + if not _exists_regular_nosymlink_at(dirfd ,f): + return + os.unlink(f ,dir_fd=dirfd) + _fsync_dirfd(dirfd) + finally: + os.close(dirfd) + +def apply_journal(journal: Journal)-> int: + errs = 0 + for idx ,entry in enumerate(journal.command_list ,start=1): + op = getattr(entry ,"name_str" ,"?") + ad = getattr(entry ,"arg_dict" ,{}) or {} + try: + d = ad["write_file_dpath_str"] + f = ad["write_file_fname"] + if not (isinstance(d ,str) and d.startswith("/") and isinstance(f ,str) and "/" not in f): + raise ValueError("bad path or filename") + if op == "displace": + _apply_displace(d ,f) + elif op == "copy": + owner = ad["owner_name"] + mode = _mode_from_entry(ad) + content = ad["content_bytes"] + if not isinstance(content ,(bytes ,bytearray)): raise ValueError("content_bytes missing") + _apply_copy(d ,f ,owner ,mode ,bytes(content)) + elif op == "delete": + _apply_delete(d ,f) + else: + raise ValueError(f"unknown op: {op}") + except Exception as e: + errs += 1 + print(f"apply error [{idx} {op}] {_dst_from(ad)}: {e}" ,file=sys.stderr) + return 0 if errs == 0 else 1 + +# --- Orchestration --- + +def _phase_gate(name: str ,errors: list[str] ,then_stop: bool)-> bool: + if errors: + print(f"{name}: {len(errors)} issue(s)") + for e in errors: + print(f" ! {e}") + return True + if then_stop: + print(f"{name}: OK") + return True + return False + +def executor_inner( + journal: Journal + ,* + ,phase_2_print: bool=False + ,phase_2_then_stop: bool=False + ,phase_2_wellformed_then_stop: bool=False + ,phase_2_sanity1_then_stop: bool=False + ,phase_2_validity_then_stop: bool=False + ,phase_2_sanity2_then_stop: bool=False + ,allowed_roots: list[Path]|None=None +)-> int: + """ + Core pipeline for the inner executor. Returns a process-style exit code. + """ + if phase_2_print: + journal.print() + if phase_2_then_stop: + return 0 + + roots = allowed_roots or [Path("/etc").resolve() ,Path.cwd().resolve()] + + wf = check_wellformed(journal) + if _phase_gate("wellformed" ,wf ,phase_2_wellformed_then_stop): + return 1 if wf else 0 if phase_2_wellformed_then_stop else 0 + + s1 = check_sanity_1(journal ,roots) + if _phase_gate("sanity-1" ,s1 ,phase_2_sanity1_then_stop): + return 1 if s1 else 0 if phase_2_sanity1_then_stop else 0 + + v = check_validity(journal) + if _phase_gate("validity" ,v ,phase_2_validity_then_stop): + return 1 if v else 0 if phase_2_validity_then_stop else 0 + + s2 = check_sanity_2(journal) + if _phase_gate("sanity-2" ,s2 ,phase_2_sanity2_then_stop): + return 1 if s2 else 0 if phase_2_sanity2_then_stop else 0 + + return apply_journal(journal) + +# --- CLI wrapper --- + +def main(argv: list[str]|None=None)-> int: + ap = argparse.ArgumentParser( + prog="executor_inner.py" + ,description="Man_In_Gray inner executor (decode → validate → apply)" + ) + ap.add_argument("--plan" ,required=True ,help="path to CBOR plan file") + ap.add_argument("--phase-2-print" ,action="store_true" ,help="print decoded journal") + ap.add_argument("--phase-2-then-stop" ,action="store_true" ,help="stop after print (no apply)") + ap.add_argument("--phase-2-wellformed-then-stop" ,action="store_true" ,help="stop after wellformed checks") + ap.add_argument("--phase-2-sanity1-then-stop" ,action="store_true" ,help="stop after sanity-1 checks") + ap.add_argument("--phase-2-validity-then-stop" ,action="store_true" ,help="stop after validity checks") + ap.add_argument("--phase-2-sanity2-then-stop" ,action="store_true" ,help="stop after sanity-2 checks") + + args = ap.parse_args(argv) + + # load plan + try: + data = Path(args.plan).read_bytes() + except Exception as e: + print(f"error: failed to read plan file: {e}" ,file=sys.stderr) + return 2 + + try: + journal = _journal_from_cbor_bytes(data) + except Exception as e: + print(f"error: failed to decode CBOR: {e}" ,file=sys.stderr) + return 2 + + return executor_inner( + journal + ,phase_2_print=args.phase_2_print + ,phase_2_then_stop=args.phase_2_then_stop + ,phase_2_wellformed_then_stop=args.phase_2_wellformed_then_stop + ,phase_2_sanity1_then_stop=args.phase_2_sanity1_then_stop + ,phase_2_validity_then_stop=args.phase_2_validity_then_stop + ,phase_2_sanity2_then_stop=args.phase_2_sanity2_then_stop + ) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/executor_outer.py b/developer/source/executor_outer.py new file mode 100755 index 0000000..1779cb7 --- /dev/null +++ b/developer/source/executor_outer.py @@ -0,0 +1,487 @@ +#!/usr/bin/env -S python3 -B +""" +executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now) + +Phase 0 (bootstrap): + - Ensure filter program exists (create default in CWD if --filter omitted) + - Validate --stage exists + - If --phase-0-then-stop: exit here (no scan, no execution) + +Phase 1 (outer): + - Discover every file under --stage; acceptance filter decides which to include + - Execute each config’s configure(prov ,planner ,WriteFileMeta) into ONE Planner + - Optionally print the planner; optionally stop + +Phase 2 (inner shim in same program for now; no privilege yet): + - Encode plan to CBOR and hand to inner path + - Inner decodes to a Journal and can print it +""" + +from __future__ import annotations + +# no bytecode anywhere +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import getpass +import tempfile +import runpy +import subprocess +import datetime as _dt +import stat + +# Local module: Planner.py (same directory) +from Planner import ( + Planner + ,PlanProvenance + ,WriteFileMeta + ,Journal + ,Command +) + +# -------- default filter template (written to CWD when --input_acceptance not provided) -------- + +DEFAULT_FILTER_FILENAME = "Man_In_Gray_input_acceptance.py" + +DEFAULT_FILTER_SOURCE = """# Man_In_Gray_input_acceptance (default template) +# Return True to include a config file, False to skip it. +# You receive a PlanProvenance object named `prov`. +# +# prov fields commonly used here: +# prov.stage_root_dpath : Path → absolute path to the stage root +# prov.config_abs_fpath : Path → absolute path to the candidate file +# prov.config_rel_fpath : Path → path relative to the stage root +# prov.read_dir_dpath : Path → directory of the candidate file +# prov.read_fname : str → filename with trailing '.py' stripped (if present) +# +# Examples: +# +# 1) Accept everything (default behavior): +# def accept(prov): +# return True +# +# 2) Only accept configs in a 'dns/' namespace under the stage: +# def accept(prov): +# return prov.config_rel_fpath.as_posix().startswith("dns/") +# +# 3) Exclude editor backup files: +# def accept(prov): +# rel = prov.config_rel_fpath.as_posix() +# return not (rel.endswith("~") or rel.endswith(".swp")) +# +# 4) Only accept Python files + a few non-Python names: +# def accept(prov): +# name = prov.config_abs_fpath.name +# return name.endswith(".py") or name in {"hosts" ,"resolv.conf"} +# +# Choose ONE 'accept' definition. Below is the default: + +def accept(prov): + return True +""" + +# -------- utilities -------- + +def iso_utc_now_str()-> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _ensure_filter_file(filter_arg: str|None)-> Path: + """ + If --input_acceptance is provided, return that path (must exist). + Otherwise, create ./stagehand_filter.py in the CWD if missing (writing a helpful template), + and return its path. + """ + if filter_arg: + p = Path(filter_arg) + if not p.is_file(): + raise RuntimeError(f"--input_acceptance file not found: {p}") + return p + + p = Path.cwd()/DEFAULT_FILTER_FILENAME + if not p.exists(): + try: + p.write_text(DEFAULT_FILTER_SOURCE ,encoding="utf-8") + print(f"(created default filter at {p})") + except Exception as e: + raise RuntimeError(f"failed to create default filter {p}: {e}") + return p + +def _load_accept_func(filter_path: Path): + env = runpy.run_path(str(filter_path)) + fn = env.get("accept") + if not callable(fn): + raise RuntimeError(f"{filter_path}: missing callable 'accept(prov)'") + return fn + +def _walk_all_files(stage_root: Path): + """ + Yield every file (regular or symlink) under stage_root recursively. + We do not follow symlinked directories to avoid cycles. + """ + root = stage_root.resolve() + for dirpath ,dirnames ,filenames in os.walk(root ,followlinks=False): + # prune symlinked dirs (files can still be symlinks) + dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath ,d))] + for fname in filenames: + p = Path(dirpath ,fname) + try: + st = p.lstat() + if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): + yield p.resolve() + except Exception: + # unreadable/broken entries skipped + continue + +def find_config_paths(stage_root: Path ,accept_func)-> list[Path]: + """ + Return files accepted by the Python acceptance function: accept(prov) → True/False. + Ordered breadth-first by depth, then lexicographically by relative path. + """ + out: list[tuple[int ,str ,Path]] = [] + root = stage_root.resolve() + for p in _walk_all_files(stage_root): + prov = PlanProvenance(stage_root=stage_root ,config_path=p) + try: + if accept_func(prov): + rel = p.resolve().relative_to(root) + out.append((len(rel.parts) ,rel.as_posix() ,p.resolve())) + except Exception as e: + raise RuntimeError(f"accept() failed on {prov.config_rel_fpath.as_posix()}: {e}") + out.sort(key=lambda t: (t[0] ,t[1])) # (depth ,name) + return [t[2] for t in out] + +# --- run all configs into ONE planner --- + +def _run_all_configs_into_single_planner(stage_root: Path ,cfgs: list[Path])-> Planner: + """ + Create a single Planner and execute each config's configure(prov, planner, WriteFileMeta) + against it. Returns that single Planner containing the entire plan. + """ + # seed with synthetic provenance; we overwrite per config before execution + aggregate_prov = PlanProvenance(stage_root=stage_root ,config_path=stage_root/"(aggregate).py") + planner = Planner(provenance=aggregate_prov) + + for cfg in cfgs: + prov = PlanProvenance(stage_root=stage_root ,config_path=cfg) + planner.set_provenance(prov) + + env = runpy.run_path(str(cfg)) + fn = env.get("configure") + if not callable(fn): + raise RuntimeError(f"{cfg}: missing callable configure(prov ,planner ,WriteFileMeta)") + + fn(prov ,planner ,WriteFileMeta) + + # annotate meta once, on the single planner's journal + j = planner.journal() + j.set_meta( + generator_prog_str="executor.py" + ,generated_at_utc_str=iso_utc_now_str() + ,user_name_str=getpass.getuser() + ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown" + ,stage_root_dpath_str=str(stage_root.resolve()) + ,configs_list=[str(p.resolve().relative_to(stage_root.resolve())) for p in cfgs] + ) + return planner + +# ----- CBOR “matchbox” (simple wrapper kept local to executor) ----- + +def _plan_to_cbor_bytes(planner: Planner)-> bytes: + "Serialize a Planner's Journal to CBOR bytes." + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + plan_dict = planner.journal().as_dictionary() + return cbor2.dumps(plan_dict ,canonical=True) + +def _journal_from_cbor_bytes(data: bytes)-> Journal: + "Rebuild a Journal from CBOR bytes." + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + obj = cbor2.loads(data) + if not isinstance(obj ,dict): + raise ValueError("CBOR root must be a dict") + return Journal(plan_dict=obj) + +# -------- inner executor (phase 2) -------- + +def _inner_main(plan_path: Path ,phase2_print: bool ,phase2_then_stop: bool)-> int: + "Inner executor path: decode CBOR → Journal; optionally print; (apply TBD)." + try: + data = Path(plan_path).read_bytes() + except Exception as e: + print(f"error: failed to read plan file: {e}" ,file=sys.stderr) + return 2 + + try: + journal = _journal_from_cbor_bytes(data) + except Exception as e: + print(f"error: failed to decode CBOR: {e}" ,file=sys.stderr) + return 2 + + if phase2_print: + journal.print() + + if phase2_then_stop: + return 0 + + # (Stage 3 apply would go here; omitted in MVP) + return 0 + +# -------- outer executor (phase 1 & handoff) -------- + +def _outer_main(stage_root: Path ,accept_func ,args)-> int: + if not stage_root.is_dir(): + print(f"error: --stage not a directory: {stage_root}" ,file=sys.stderr) + return 2 + + cfgs = find_config_paths(stage_root ,accept_func) + if not cfgs: + print("No configuration files found.") + return 0 + + try: + master = _run_all_configs_into_single_planner(stage_root ,cfgs) + except SystemExit: + raise + except Exception as e: + print(f"error: executing configs: {e}" ,file=sys.stderr) + return 2 + + if args.phase_1_print: + master.print() + + if args.phase_1_then_stop: + return 0 + + # Phase 2: encode CBOR and invoke inner path (same script, --inner) + try: + cbor_bytes = _plan_to_cbor_bytes(master) + except Exception as e: + print(f"error: CBOR encode failed: {e}" ,file=sys.stderr) + return 2 + + with tempfile.NamedTemporaryFile(prefix="stagehand_plan_" ,suffix=".cbor" ,delete=False) as tf: + tf.write(cbor_bytes) + plan_path = tf.name + + try: + cmd = [ + sys.executable + ,str(Path(__file__).resolve()) + ,"--inner" + ,"--plan" ,plan_path + ] + if args.phase_2_print: + cmd.append("--phase-2-print") + if args.phase_2_then_stop: + cmd.append("--phase-2-then-stop") + + proc = subprocess.run(cmd) + return proc.returncode + finally: + try: + os.unlink(plan_path) + except Exception: + pass + +# -------- CLI -------- + +def main(argv: list[str]|None=None)-> int: + ap = argparse.ArgumentParser( + prog="executor.py" + ,description="StageHand outer/inner executor (plan → CBOR → decode)." + ) + ap.add_argument("--stage" ,default="stage" + ,help="stage root directory (default: ./stage)") + ap.add_argument( + "--input_acceptance" + ,default="" + ,help=f"path to acceptance filter program exporting accept(prov) " + f"(default: ./{DEFAULT_FILTER_FILENAME}; created if missing)" + ) + ap.add_argument( + "--phase-0-then-stop" + ,action="store_true" + ,help="stop after arg checks & filter bootstrap (no stage scan)" + ) + + # Phase-1 (outer) controls + ap.add_argument("--phase-1-print" ,action="store_true" ,help="print master planner (phase 1)") + ap.add_argument("--phase-1-then-stop" ,action="store_true" ,help="stop after phase 1") + + # Phase-2 (inner) controls (outer forwards these to inner) + ap.add_argument("--phase-2-print" ,action="store_true" ,help="print decoded journal (phase 2)") + ap.add_argument("--phase-2-then-stop" ,action="store_true" ,help="stop after phase 2 decode") + + # Inner-only flags (not for users) + ap.add_argument("--inner" ,action="store_true" ,help=argparse.SUPPRESS) + ap.add_argument("--plan" ,default=None ,help=argparse.SUPPRESS) + + args = ap.parse_args(argv) + + # Inner path + if args.inner: + if not args.plan: + print("error: --inner requires --plan " ,file=sys.stderr) + return 2 + return _inner_main(Path(args.plan) + ,phase2_print=args.phase_2_print + ,phase2_then_stop=args.phase_2_then_stop) + + # Phase 0: bootstrap & stop (no scan) + stage_root = Path(args.stage) + try: + filter_path = _ensure_filter_file(args.filter or None) + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + + if not stage_root.exists(): + print(f"error: --stage not found: {stage_root}" ,file=sys.stderr) + return 2 + if not stage_root.is_dir(): + print(f"error: --stage is not a directory: {stage_root}" ,file=sys.stderr) + return 2 + + if args.phase_0_then_stop: + print(f"phase-0 OK: stage at {stage_root.resolve()} and filter at {filter_path}") + return 0 + + # Load acceptance function and proceed with outer + try: + accept_func = _load_accept_func(filter_path) + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + + return _outer_main(stage_root ,accept_func ,args) + +# inner executor +# --- secure apply helpers (inner path) --- + +import pwd ,errno ,stat as _stat + +def _safe_open_dir(dpath: str)-> int: + "Open directory without following symlinks; return dirfd." + fd = os.open(dpath ,os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW) + st = os.fstat(fd) + if not _stat.S_ISDIR(st.st_mode): + os.close(fd) ; raise OSError("not a directory") + return fd + +def _exists_regular_nosymlink_at(dirfd: int ,fname: str)-> bool: + "True if a regular ,non-symlink file exists at dirfd/fname." + try: + st = os.lstat(fname ,dir_fd=dirfd) + except FileNotFoundError: + return False + if _stat.S_ISLNK(st.st_mode): raise OSError("target is a symlink") + if not _stat.S_ISREG(st.st_mode): raise OSError("target not a regular file") + return True + +def _fsync_dirfd(dirfd: int)-> None: + try: + os.fsync(dirfd) + except Exception: + pass # some FS may not support; best effort + +def _apply_displace(d: str ,f: str)-> None: + dirfd = _safe_open_dir(d) + try: + if not _exists_regular_nosymlink_at(dirfd ,f): + return + import time as _time + ts = _time.strftime("%Y%m%dT%H%M%SZ" ,_time.gmtime()) + bak = f"{f}.{ts}" + os.rename(f ,bak ,src_dir_fd=dirfd ,dst_dir_fd=dirfd) + _fsync_dirfd(dirfd) + finally: + os.close(dirfd) + +def _apply_copy(d: str ,f: str ,owner: str ,mode_int: int ,content: bytes)-> None: + pw = pwd.getpwnam(owner) + uid ,gid = pw.pw_uid ,pw.pw_gid + dirfd = _safe_open_dir(d) + try: + tmp = f".{f}.mig.tmp.{os.getpid()}" + tfd = os.open(tmp ,os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW ,0o600 ,dir_fd=dirfd) + try: + # write all bytes + mv = memoryview(content) + off = 0 + while off < len(mv): + n = os.write(tfd ,mv[off:]) + if n <= 0: raise OSError("short write") + off += n + os.fsync(tfd) + os.fchown(tfd ,uid ,gid) + os.fchmod(tfd ,mode_int) + os.fsync(tfd) + finally: + os.close(tfd) + os.rename(tmp ,f ,src_dir_fd=dirfd ,dst_dir_fd=dirfd) + _fsync_dirfd(dirfd) + finally: + os.close(dirfd) + +def _apply_delete(d: str ,f: str)-> None: + dirfd = _safe_open_dir(d) + try: + if not _exists_regular_nosymlink_at(dirfd ,f): + return + os.unlink(f ,dir_fd=dirfd) + _fsync_dirfd(dirfd) + finally: + os.close(dirfd) + +def _mode_from_entry(ad: dict)-> int: + m = ad.get("mode_int") + if isinstance(m ,int): return m + s = ad.get("mode_octal_str") + if isinstance(s ,str): + try: + return int(s ,8) + except Exception: + pass + raise ValueError("invalid mode") + +def apply_journal(journal: Journal)-> int: + """ + Apply the decoded journal. Returns 0 on success ,1 if any hard errors occurred. + """ + errs = 0 + for idx ,entry in enumerate(journal.command_list ,start=1): + op = getattr(entry ,"name_str" ,"?") + ad = getattr(entry ,"arg_dict" ,{}) or {} + try: + d = ad["write_file_dpath_str"] + f = ad["write_file_fname"] + if not (isinstance(d ,str) and d.startswith("/") and isinstance(f ,str) and "/" not in f): + raise ValueError("bad path or filename") + if op == "displace": + _apply_displace(d ,f) + elif op == "copy": + owner = ad["owner_name"] + mode = _mode_from_entry(ad) + content = ad["content_bytes"] + if not isinstance(content ,(bytes ,bytearray)): raise ValueError("content_bytes missing") + _apply_copy(d ,f ,owner ,mode ,bytes(content)) + elif op == "delete": + _apply_delete(d ,f) + else: + raise ValueError(f"unknown op: {op}") + except Exception as e: + errs += 1 + print(f"apply error [{idx} {op}]: {e}" ,file=sys.stderr) + return 0 if errs == 0 else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/scratchpad/.githolder b/developer/source/scratchpad/.githolder new file mode 100644 index 0000000..e69de29 diff --git a/developer/source/test_env.py b/developer/source/test_env.py new file mode 100644 index 0000000..d47e8a1 --- /dev/null +++ b/developer/source/test_env.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +import os +import sys + +def print_env_var(name): + value = os.getenv(name) + print(f"{name:<16}: {value if value else ''}") + +def main(): + print("=== Python Environment Test ===") + print(f"Python executable : {sys.executable}") + print(f"Python version : {sys.version}") + print() + + print("=== Harmony Environment Variables ===") + for var in ["ROLE", "REPO_HOME", "PYTHON_HOME", "VIRTUAL_ENV", "ENV"]: + print_env_var(var) + + print() + print("=== Current Working Directory ===") + print(os.getcwd()) + +if __name__ == "__main__": + main() diff --git a/developer/source/todo.txt b/developer/source/todo.txt new file mode 100644 index 0000000..3ebefe1 --- /dev/null +++ b/developer/source/todo.txt @@ -0,0 +1,5 @@ + +2025-09-19T08:48:29Z + +Port privileged code (phase 3 code) to C. + diff --git a/tester/stage_test_0/DNS/unbound.conf.py b/tester/stage_test_0/DNS/unbound.conf.py new file mode 100644 index 0000000..9a8fd8a --- /dev/null +++ b/tester/stage_test_0/DNS/unbound.conf.py @@ -0,0 +1,13 @@ +# stage_test_0/DNS/unbound_conf.py + +def configure(prov, planner, WriteFileMeta): + # dpath is relative; it will be anchored to prov.read_dir_dpath, + # so this lands in .../stage_test_0/stage_test_0_out/dns + wfm = WriteFileMeta( + dpath="stage_test_0_out/net", + fname=prov.read_fname, # "unbound_conf" + owner=prov.process_user, # current process user + mode=0o444 + ) + planner.delete(wfm) + planner.copy(wfm, content="server:\n verbosity: 1\n") diff --git a/tester/stage_test_0/unbound_conf.py b/tester/stage_test_0/unbound_conf.py new file mode 100644 index 0000000..ff275b9 --- /dev/null +++ b/tester/stage_test_0/unbound_conf.py @@ -0,0 +1,10 @@ +# unbound.conf (example) + +def configure(prov, planner, WriteFileMeta): + wfm = WriteFileMeta( + dpath="stage_test_0_out" + ,fname=prov.read_fname # write file name same as read file name + ,owner=prov.process_user + ) + planner.displace(wfm) + planner.copy(wfm, content="server:\n do-ip6: no\n") diff --git a/tester/stage_test_0/web/site_conf.py b/tester/stage_test_0/web/site_conf.py new file mode 100644 index 0000000..21397c4 --- /dev/null +++ b/tester/stage_test_0/web/site_conf.py @@ -0,0 +1,12 @@ +# stage_test_0/web/site_conf.py + +def configure(prov, planner, WriteFileMeta): + # This writes a faux web config to .../stage_test_0/stage_test_0_out/web/nginx.conf + wfm = WriteFileMeta( + dpath="stage_test_0_out/web", + fname="nginx.conf", # explicit override (not from prov) + owner=prov.process_user, + mode="0644" + ) + planner.displace(wfm) + planner.copy(wfm, content="events {}\nhttp { server { listen 8080; } }\n")