From: Thomas Walker Lynch Date: Thu, 18 Sep 2025 15:27:58 +0000 (-0700) Subject: check point working on staging X-Git-Url: https://git.reasoningtechnology.com/style/rt_dark_doc.css?a=commitdiff_plain;h=7110b2f05f5f3a4ed3ed358c4af1b2de6759b7ec;p=subu check point working on staging --- diff --git a/developer/source/DNS/Planner.py b/developer/source/DNS/Planner.py index 0da1384..0782707 100644 --- a/developer/source/DNS/Planner.py +++ b/developer/source/DNS/Planner.py @@ -2,19 +2,12 @@ """ Planner.py — plan builder for staged configuration (UNPRIVILEGED). -The Planner accumulates Command objects into a Journal. - -Journal building is orchestrated by the outer runner (e.g., stage_show_plan, stage_cp) -which constructs a Planner per config file and invokes Planner command methods. - -Defaults and provenance come from a PlannerContext instance. You can replace the -context at any time via set_context(ctx). - -The Journal can be exported as CBOR via Journal.to_CBOR_bytes(), and reconstructed -on the privileged side via Journal.from_CBOR_bytes(). - -On-wire field names are snake_case and use explicit suffixes (_str,_bytes,_int, etc.) -to avoid ambiguity. +Given: runner-side provenance (PlanProvenance) and optional defaults (WriteFileMeta). +Does: expose Planner whose command methods (copy/displace/delete) build Command entries, + resolving arguments with precedence: kwarg > per-call WriteFileMeta > planner default + (and for filename, fallback to provenance-derived basename). On any argument error, + the Command is returned with errors and NOT appended to the Journal. +Returns: Journal (model only; dict in/out) via planner.journal(). """ from __future__ import annotations @@ -22,175 +15,318 @@ from __future__ import annotations # no bytecode anywhere (works under sudo/root shells too) import sys ,os sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE","1") +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") -from dataclasses import dataclass ,field from pathlib import Path -from typing import Any +import getpass + # ===== Utilities ===== -def _norm_perm(value: int|str)-> tuple[int,str]|None: - "Given int or 4-char octal string. Does validate/normalize. Returns (int,'%04o') or None." +def norm_perm(value: int|str)-> tuple[int,str]|None: + "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int,'%04o') or None." if isinstance(value ,int): if 0 <= value <= 0o7777: return value ,f"{value:04o}" return None if isinstance(value ,str): - s = value.strip() - if len(s)==4 and all(ch in "01234567" for ch in s): + s = value.strip().lower() + if s.startswith("0o"): + try: + v = int(s ,8) + return v ,f"{v:04o}" + except Exception: + return None + if len(s) in (3 ,4) and all(ch in "01234567" for ch in s): try: v = int(s ,8) - return v ,s + return v ,f"{v:04o}" except Exception: return None return None -def _is_abs_dpath(dpath_str: str)-> bool: +def is_abs_dpath(dpath_str: str|None)-> bool: "Given path string. Does quick abs dir check. Returns bool." - return bool(dpath_str) and dpath_str.startswith("/") + return isinstance(dpath_str ,str) and dpath_str.startswith("/") and "\x00" not in dpath_str + +def norm_abs_dpath_str(value: str|Path|None)-> str|None: + "Given str/Path/None. Does normalize absolute dir path string. Returns str or None." + if value is None: return None + s = value.as_posix() if isinstance(value ,Path) else str(value) + return s if is_abs_dpath(s) else None + +def norm_fname_or_none(value: str|None)-> str|None: + "Given candidate filename or None. Does validate bare filename. Returns str or None." + if value is None: return None + s = str(value) + if not s: return None + if "/" in s or s in ("." ,"..") or "\x00" in s: return None + return s + +def norm_nonempty_owner(value: str|None)-> str|None: + "Given owner string or None. Does minimally validate (non-empty). Returns str or None." + if value is None: return None + s = str(value).strip() + return s if s else None + +def parse_mode(value: int|str|None)-> tuple[int|None ,str|None]: + "Given int/str/None. Does normalize via norm_perm. Returns (int,'%04o') or (None,None)." + if value is None: return None ,None + r = norm_perm(value) + return r if r is not None else (None ,None) + +def norm_content_bytes(value: bytes|str|None)-> bytes|None: + "Given bytes/str/None. Does normalize to UTF-8 bytes or None. Returns bytes|None." + if value is None: return None + if isinstance(value ,bytes): return value + return value.encode("utf-8") + +def norm_dpath_str(value: str|Path|None)-> str|None: + "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None." + if value is None: return None + s = value.as_posix() if isinstance(value ,Path) else str(value) + if not s or "\x00" in s: return None + return s + + +# ===== Wire-ready model types (no CBOR here) ===== -def _join_write_file(dpath_str: str ,fname_str: str)-> str: - "Given dir path string and filename string. Does join. Returns POSIX path string or ''." - if not _is_abs_dpath(dpath_str): return "" - if not fname_str or "/" in fname_str: return "" - return (Path(dpath_str)/fname_str).as_posix() - -# ===== Core data types ===== - -@dataclass(slots=True) class Command: """ Command — a single planned operation. - Given a command name and an argument map (native values). - Does hold the op name, owns a distinct args map, accumulates errors for this op. - Returns serializable mapping via to_map(). + Given name_str ('copy'|'displace'|'delete'), optional arg_dict, optional errors_list. + Does hold op name, own a fresh arg_dict, collect per-entry errors. + Returns dictionary via as_dictionary(). """ - name_str: str - args_map: dict[str,Any] = field(default_factory=dict) - errors_list: list[str] = field(default_factory=list) + __slots__ = ("name_str" ,"arg_dict" ,"errors_list") + + def __init__(self ,name_str: str ,arg_dict: dict|None=None ,errors_list: list[str]|None=None)-> None: + self.name_str = name_str + self.arg_dict = dict(arg_dict) if arg_dict is not None else {} + self.errors_list = list(errors_list) if errors_list is not None else [] def add_error(self ,msg_str: str)-> None: - "Given message. Does append to errors_list. Returns None." self.errors_list.append(msg_str) - def to_map(self)-> dict[str,Any]: - "Given self. Does convert to a plain dict. Returns {'op','args_map','errors_list'}." + def as_dictionary(self)-> dict: return { "op": self.name_str - ,"args_map": dict(self.args_map) + ,"arg_dict": dict(self.arg_dict) ,"errors_list": list(self.errors_list) } -@dataclass(slots=True) -class PlannerContext: - """ - PlannerContext — per-config provenance and defaults. + def print(self, *, index: int|None=None, file=None)-> None: + """ + Given: optional index for numbering and optional file-like (defaults to stdout). + Does: print a compact, human-readable one-line summary of this command; prints any errors indented below. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout - Given: stage_root_dpath, read_file_rel_fpath, default write_file location/name, - default owner name, default permission (int or '0644'), optional default content. - Does: provide ambient defaults and provenance to Planner methods. - Returns: n/a (data holder). - """ - stage_root_dpath: Path - read_file_rel_fpath: Path - default_write_file_dpath_str: str - default_write_file_fname_str: str - default_owner_name_str: str - default_mode_int: int|None = None - default_mode_octal_str: str|None = None - default_content_bytes: bytes|None = None - - @staticmethod - def from_values(stage_root_dpath: Path - ,read_file_rel_fpath: Path - ,write_file_dpath_str: str - ,write_file_fname_str: str - ,owner_name_str: str - ,perm: int|str - ,content: bytes|str|None - )-> PlannerContext: - "Given raw values. Does normalize perm and content. Returns PlannerContext." - if isinstance(content ,str): - content_b = content.encode("utf-8") - else: - content_b = content - perm_norm = _norm_perm(perm) - if perm_norm is None: - m_int ,m_oct = None ,None + op = self.name_str + ad = self.arg_dict or {} + + # Compose destination path for display + d = ad.get("write_file_dpath_str") or "" + f = ad.get("write_file_fname") or "" + try: + from pathlib import Path as _Path + dst = (_Path(d)/f).as_posix() if d and f and "/" not in f else "?" + except Exception: + dst = "?" + + # Numbering prefix + prefix = f"{index:02d}. " if index is not None else "" + + if op == "copy": + mode = ad.get("mode_int") + owner = ad.get("owner_name") + size = len(ad.get("content_bytes") or b"") + line = f"{prefix}copy -> {dst} mode {mode:04o} owner {owner} bytes {size}" + elif op == "displace": + line = f"{prefix}displace -> {dst}" + elif op == "delete": + line = f"{prefix}delete -> {dst}" else: - m_int ,m_oct = perm_norm - return PlannerContext( - stage_root_dpath=stage_root_dpath - ,read_file_rel_fpath=read_file_rel_fpath - ,default_write_file_dpath_str=write_file_dpath_str - ,default_write_file_fname_str=write_file_fname_str - ,default_owner_name_str=owner_name_str - ,default_mode_int=m_int - ,default_mode_octal_str=m_oct - ,default_content_bytes=content_b - ) + line = f"{prefix}?op? -> {dst}" + + print(line, file=file) + + # Print any per-entry errors underneath + for err in self.errors_list: + print(f" ! {err}", file=file) + -@dataclass(slots=True) class Journal: """ - Journal — ordered list of Commands plus provenance metadata. + Journal — ordered list of Command plus provenance metadata (model only; no CBOR). - Given optional meta map. - Does append commands, expose entries, produce plain or CBOR encodings, and rebuild from CBOR. - Returns plain dict via to_map(), bytes via to_CBOR_bytes(), Journal via from_CBOR_bytes(). + Given optional plan_dict in wire shape (for reconstruction). + Does manage meta, append commands, expose entries, and pack to dict. + Returns dict via as_dictionary(). """ - meta_map: dict[str,Any] = field(default_factory=dict) - commands_list: list[Command] = field(default_factory=list) + __slots__ = ("meta_dict" ,"command_list") + + def __init__(self ,plan_dict: dict|None=None)-> None: + self.meta_dict = {} + self.command_list = [] + if plan_dict is not None: + self._init_from_dict(plan_dict) + + def _init_from_dict(self ,plan_dict: dict)-> None: + if not isinstance(plan_dict ,dict): + raise ValueError("plan_dict must be a dict") + meta = dict(plan_dict.get("meta_dict") or {}) + entries = plan_dict.get("entries_list") or [] + self.meta_dict.update(meta) + for e in entries: + if not isinstance(e ,dict): + continue + op = e.get("op") or "?" + args = e.get("arg_dict") or {} + errs = e.get("errors_list") or [] + self.command_list.append(Command(name_str=op ,arg_dict=dict(args) ,errors_list=list(errs))) def set_meta(self ,**kv)-> None: - "Given keyword meta. Does merge into meta_map. Returns None." - self.meta_map.update(kv) + self.meta_dict.update(kv) def append(self ,cmd: Command)-> None: - "Given Command. Does append to commands_list. Returns None." - self.commands_list.append(cmd) + self.command_list.append(cmd) - def entries_list(self)-> list[dict[str,Any]]: - "Given n/a. Does return list of entry dicts (copy). Returns list[dict]." - return [c.to_map() for c in self.commands_list] + def entries_list(self)-> list[dict]: + return [c.as_dictionary() for c in self.command_list] - def to_map(self)-> dict[str,Any]: - "Given n/a. Does package a plan map (ready for CBOR). Returns dict." + def as_dictionary(self)-> dict: return { "version_int": 1 - ,"meta_map": dict(self.meta_map) + ,"meta_dict": dict(self.meta_dict) ,"entries_list": self.entries_list() } - def to_CBOR_bytes(self ,canonical_bool: bool=True)-> bytes: - "Given n/a. Does CBOR-encode to bytes (requires cbor2). Returns bytes." - try: - import cbor2 - except Exception as e: - raise RuntimeError(f"package cbor2 required for to_CBOR_bytes: {e}") - return cbor2.dumps(self.to_map() ,canonical=canonical_bool) - - @staticmethod - def from_CBOR_bytes(data_bytes: bytes)-> Journal: - "Given CBOR bytes. Does decode and rebuild a Journal (Commands + meta). Returns Journal." + def print(self, *, index_start: int = 1, file=None) -> None: + """ + Given: optional starting index and optional file-like (defaults to stdout). + Does: print each Command on a single line via Command.print(), numbered. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + if not self.command_list: + print("(plan is empty)", file=file) + return + + for i, cmd in enumerate(self.command_list, start=index_start): + cmd.print(index=i, file=file) + +# ===== Runner-provided provenance ===== + +# Planner.py +class PlanProvenance: + """ + Runner-provided, read-only provenance for a single config script. + """ + __slots__ = ("stage_root_dpath","config_abs_fpath","config_rel_fpath", + "read_dir_dpath","read_fname") + + def __init__(self, *, stage_root: Path, config_path: Path): + self.stage_root_dpath = stage_root.resolve() + self.config_abs_fpath = config_path.resolve() try: - import cbor2 - except Exception as e: - raise RuntimeError(f"package cbor2 required for from_CBOR_bytes: {e}") - obj = cbor2.loads(data_bytes) - if not isinstance(obj ,dict): raise ValueError("CBOR root must be a map") - meta = dict(obj.get("meta_map") or {}) - entries = obj.get("entries_list") or [] - j = Journal(meta_map=meta) - for e in entries: - if not isinstance(e ,dict): continue - op = e.get("op") or "?" - args = e.get("args_map") or {} - errs = e.get("errors_list") or [] - j.append(Command(name_str=op ,args_map=dict(args) ,errors_list=list(errs))) - return j + self.config_rel_fpath = self.config_abs_fpath.relative_to(self.stage_root_dpath) + except Exception: + self.config_rel_fpath = Path(self.config_abs_fpath.name) + + # Where the config file lives (used to anchor relative write dirs) + self.read_dir_dpath = self.config_abs_fpath.parent + + # “py-less” filename: strip .stage.py, else .py, else keep name + name = self.config_abs_fpath.name + if name.endswith(".stage.py"): + self.read_fname = name[:-len(".stage.py")] + elif name.endswith(".py"): + self.read_fname = name[:-3] + else: + self.read_fname = name + + def print(self, *, file=None) -> None: + """ + Given: optional file-like (defaults to stdout). + Does: print a readable, multi-line summary of provenance. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + print(f"Stage root: {self.stage_root_dpath}", file=file) + print(f"Config (rel): {self.config_rel_fpath.as_posix()}", file=file) + print(f"Config (abs): {self.config_abs_fpath}", file=file) + print(f"Read dir: {self.read_dir_dpath}", file=file) + print(f"Read fname: {self.read_fname}", file=file) + + + +# ===== Admin-facing defaults carrier ===== + +class WriteFileMeta: + """ + WriteFileMeta — per-call or planner-default write-file attributes. + + Given dpath (abs str/Path) ,fname (bare name or None) ,owner (str) + ,mode (int|'0644') ,content (bytes|str|None). + Does normalize into fields (may remain None if absent/invalid). + Returns object suitable for providing defaults to Planner methods. + """ + __slots__ = ("dpath_str" ,"fname" ,"owner_name_str" ,"mode_int" ,"mode_octal_str" ,"content_bytes") + + def __init__(self + ,* + ,dpath="/" + ,fname=None # None or "." → let Planner resolve (provenance fallback) + ,owner="root" # "." → current process user (resolved by Planner) + ,mode=0o444 + ,content=None + ): + self.dpath_str = norm_dpath_str(dpath) + # keep "." as a sentinel; otherwise validate the filename + if fname == ".": + self.fname = "." + else: + self.fname = norm_fname_or_none(fname) + + # keep "." as a sentinel; otherwise normalize owner + if owner == ".": + self.owner_name_str = "." + else: + self.owner_name_str = norm_nonempty_owner(owner) + + self.mode_int, self.mode_octal_str = parse_mode(mode) + # content_'bytes' due to UTF8 encoding + self.content_bytes = norm_content_bytes(content) + + def print(self, *, label: str | None = None, file=None) -> None: + """ + Given: optional label and optional file-like (defaults to stdout). + Does: print a single-line summary of defaults/overrides. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + dpath = self.dpath_str or "?" + fname = self.fname or "?" + owner = self.owner_name_str or "?" + mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int, int) else (self.mode_octal_str or "?") + size = len(self.content_bytes) if isinstance(self.content_bytes, (bytes, bytearray)) else 0 + prefix = (label + ": ") if label else "" + print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}", file=file) + # ===== Planner ===== @@ -198,159 +334,200 @@ class Planner: """ Planner — constructs a Journal of Commands from config scripts. - Given: PlannerContext (provenance + defaults). - Does: maintains a Journal; command methods (copy/displace/delete) create Command objects, - fill missing args from context defaults, preflight minimal shape checks, then append. - Returns: accessors for Journal and meta; no I/O or privilege here. + Given provenance (PlanProvenance) and optional default WriteFileMeta. + Does resolve command parameters by precedence: kwarg > per-call WriteFileMeta > planner default, + with a final filename fallback to provenance basename if still missing. + On any argument error, returns the Command with errors and DOES NOT append it to Journal. + Returns live Journal via journal(). """ - def __init__(self ,ctx: PlannerContext)-> None: - self._ctx = ctx + __slots__ = ("_prov" ,"_defaults" ,"_journal") + + def __init__(self ,provenance: PlanProvenance ,defaults: WriteFileMeta|None=None)-> None: + self._prov = provenance + self._defaults = defaults if defaults is not None else WriteFileMeta( + dpath="/" + ,fname=provenance.read_fname + ,owner="root" + ,mode=0o444 + ,content=None + ) self._journal = Journal() - # seed provenance; outer tools can add more later self._journal.set_meta( - source_read_file_rel_fpath_str=ctx.read_file_rel_fpath.as_posix() - ,stage_root_dpath_str=str(ctx.stage_root_dpath) + stage_root_dpath_str=str(self._prov.stage_root_dpath) + ,config_rel_fpath_str=self._prov.config_rel_fpath.as_posix() ) - # --- Context management --- - - def set_context(self ,ctx: PlannerContext)-> None: - "Given PlannerContext. Does replace current context. Returns None." - self._ctx = ctx + # --- defaults management / access --- - def context(self)-> PlannerContext: - "Given n/a. Does return current context. Returns PlannerContext." - return self._ctx + def set_defaults(self ,defaults: WriteFileMeta)-> None: + "Given WriteFileMeta. Does replace planner defaults. Returns None." + self._defaults = defaults - # --- Journal access --- + def defaults(self)-> WriteFileMeta: + "Given n/a. Does return current WriteFileMeta defaults. Returns WriteFileMeta." + return self._defaults def journal(self)-> Journal: - "Given n/a. Does return the Journal (live). Returns Journal." + "Given n/a. Returns Journal reference (live, still being modified here)." return self._journal - # --- Helpers --- - - def _resolve_write_file(self ,write_file_dpath_str: str|None ,write_file_fname_str: str|None)-> tuple[str,str]: - "Given optional write_file dpath/fname. Does fill from context; '.' fname → read_file basename. Returns (dpath,fname)." - dpath_str = write_file_dpath_str if write_file_dpath_str is not None else self._ctx.default_write_file_dpath_str - fname_str = write_file_fname_str if write_file_fname_str is not None else self._ctx.default_write_file_fname_str - if fname_str == ".": - fname_str = self._ctx.read_file_rel_fpath.name - return dpath_str ,fname_str - - def _resolve_owner(self ,owner_name_str: str|None)-> str: - "Given optional owner. Does fill from context. Returns owner string." - return owner_name_str if owner_name_str is not None else self._ctx.default_owner_name_str - - def _resolve_mode(self ,perm: int|str|None)-> tuple[int|None,str|None]: - "Given optional perm. Does normalize or fall back to context. Returns (mode_int,mode_octal_str)." - if perm is None: - return self._ctx.default_mode_int ,self._ctx.default_mode_octal_str - norm = _norm_perm(perm) - return (norm if norm is not None else (None ,None)) - - def _resolve_content(self ,content: bytes|str|None)-> bytes|None: - "Given optional content (bytes or str). Does normalize or fall back to context. Returns bytes|None." - if content is None: - return self._ctx.default_content_bytes - if isinstance(content ,str): - return content.encode("utf-8") - return content - - # --- Command builders --- + # --- resolution helpers --- + + def _pick(self ,kw ,meta_attr ,default_attr): + "Given three sources. Does pick first non-None. Returns value or None." + return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr) + + # Planner.py (inside Planner) + def _resolve_write_file(self, wfm, dpath, fname) -> tuple[str|None, str|None]: + # normalize explicit kwargs (allow "." to pass through untouched) + dpath_str = norm_dpath_str(dpath) if dpath is not None else None + if fname is not None and fname != ".": + fname = norm_fname_or_none(fname) + + dpath_val = self._pick(dpath_str, (wfm.dpath_str if wfm else None), self._defaults.dpath_str) + fname_val = self._pick(fname, (wfm.fname if wfm else None), self._defaults.fname) + + # final fallback for filename: "." or None → derive from config name + if fname_val == "." or fname_val is None: + fname_val = self._prov.read_fname + + # anchor relative dpaths against the config’s directory + if dpath_val is not None and not is_abs_dpath(dpath_val): + dpath_val = (self._prov.read_dir_dpath / dpath_val).as_posix() + + return dpath_val, fname_val + + def _resolve_owner_mode_content(self + ,wfm: WriteFileMeta|None + ,owner: str|None + ,mode: int|str|None + ,content: bytes|str|None + )-> tuple[str|None ,tuple[int|None ,str|None] ,bytes|None]: + owner_norm = norm_nonempty_owner(owner) if (owner is not None and owner != ".") else owner + mode_norm = parse_mode(mode) if mode is not None else (None, None) + content_b = norm_content_bytes(content) if content is not None else None + + owner_v = self._pick(owner_norm, (wfm.owner_name_str if wfm else None), self._defaults.owner_name_str) + + # resolve "." → current process user + if owner_v == ".": + owner_v = getpass.getuser() + + mode_v = (mode_norm if mode_norm != (None, None) else + ((wfm.mode_int, wfm.mode_octal_str) if wfm else (self._defaults.mode_int, self._defaults.mode_octal_str))) + content_v = self._pick(content_b, (wfm.content_bytes if wfm else None), self._defaults.content_bytes) + return owner_v, mode_v, content_v + + def print(self, *, show_journal: bool = True, file=None) -> None: + """ + Given: flags (show_journal) and optional file-like (defaults to stdout). + Does: print provenance, defaults, and optionally the journal via delegation. + Returns: None. + """ + if file is None: + import sys as _sys + file = _sys.stdout + + print("== Provenance ==", file=file) + self._prov.print(file=file) + + print("\n== Defaults ==", file=file) + self._defaults.print(label="defaults", file=file) + + if show_journal: + entries = getattr(self._journal, "command_list", []) + n_total = len(entries) + n_copy = sum(1 for c in entries if getattr(c, "name_str", None) == "copy") + n_disp = sum(1 for c in entries if getattr(c, "name_str", None) == "displace") + n_del = sum(1 for c in entries if getattr(c, "name_str", None) == "delete") + + print("\n== Journal ==", file=file) + print(f"entries: {n_total} copy:{n_copy} displace:{n_disp} delete:{n_del}", file=file) + if n_total: + self._journal.print(index_start=1, file=file) + else: + print("(plan is empty)", file=file) + + # --- Command builders (first arg may be WriteFileMeta) --- def copy(self - ,* - ,write_file_dpath_str: str|None=None - ,write_file_fname_str: str|None=None - ,owner_name_str: str|None=None - ,perm: int|str|None=None - ,content: bytes|str|None=None - ,read_file_rel_fpath: Path|None=None + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None + ,owner: str|None=None + ,mode: int|str|None=None + ,content: bytes|str|None=None )-> Command: """ - Given: optional overrides for write_file (dpath,fname,owner,perm), content, and read_file_rel_fpath. - Does: build a 'copy' command entry (content is embedded; read_file path kept as provenance). - Returns: Command (also appended to Journal). + Given optional WriteFileMeta plus keyword overrides. + Does build a 'copy' command; on any argument error the command is returned with errors and NOT appended. + Returns Command. """ cmd = Command("copy") - # resolve basics - wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str) - owner_str = self._resolve_owner(owner_name_str) - mode_int ,mode_oct = self._resolve_mode(perm) - content_b = self._resolve_content(content) - read_rel = (read_file_rel_fpath if read_file_rel_fpath is not None else self._ctx.read_file_rel_fpath) - - # minimal shape checks (well-formedness, not policy) - if not _is_abs_dpath(wf_dpath_str): - cmd.add_error("write_file_dpath_str must be absolute and non-empty") - if not wf_fname_str or "/" in wf_fname_str: - cmd.add_error("write_file_fname_str must be a simple filename (no '/')") - if not owner_str: - cmd.add_error("owner_name_str must be non-empty") + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + owner_v ,(mode_int ,mode_oct) ,content_b = self._resolve_owner_mode_content(wfm ,owner ,mode ,content) + + # well-formed checks + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + if not owner_v: cmd.add_error("owner must be non-empty") if (mode_int ,mode_oct) == (None ,None): - cmd.add_error("perm must be an int <= 0o7777 or a 4-digit octal string") + cmd.add_error("mode must be int <= 0o7777 or 3/4-digit octal string") if content_b is None: cmd.add_error("content is required for copy() (bytes or str)") - cmd.args_map.update({ - "write_file_dpath_str": wf_dpath_str - ,"write_file_fname_str": wf_fname_str - ,"owner_name_str": owner_str - ,"mode_int": mode_int - ,"mode_octal_str": mode_oct - ,"content_bytes": content_b - ,"read_file_rel_fpath_str": read_rel.as_posix() + cmd.arg_dict.update({ + "write_file_dpath_str": dpath, + "write_file_fname": fname, # was write_file_fname + "owner_name": owner_v, # was owner_name_str + "mode_int": mode_int, + "mode_octal_str": mode_oct, + "content_bytes": content_b, + "provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix(), }) - self._journal.append(cmd) + + if not cmd.errors_list: + self._journal.append(cmd) return cmd def displace(self - ,* - ,write_file_dpath_str: str|None=None - ,write_file_fname_str: str|None=None + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None )-> Command: - """ - Given: optional write_file dpath/fname overrides. - Does: build a 'displace' command (rename existing write_file in-place with UTC suffix). - Returns: Command (appended). - """ + "Given optional WriteFileMeta plus overrides. Does build 'displace' entry or return errors. Returns Command." cmd = Command("displace") - wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str) - - if not _is_abs_dpath(wf_dpath_str): - cmd.add_error("write_file_dpath_str must be absolute and non-empty") - if not wf_fname_str or "/" in wf_fname_str: - cmd.add_error("write_file_fname_str must be a simple filename (no '/')") - - cmd.args_map.update({ - "write_file_dpath_str": wf_dpath_str - ,"write_file_fname_str": wf_fname_str + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + cmd.arg_dict.update({ + "write_file_dpath_str": dpath, + "write_file_fname": fname, }) - self._journal.append(cmd) + if not cmd.errors_list: + self._journal.append(cmd) return cmd def delete(self - ,* - ,write_file_dpath_str: str|None=None - ,write_file_fname_str: str|None=None + ,wfm: WriteFileMeta|None=None + ,* + ,write_file_dpath: str|Path|None=None + ,write_file_fname: str|None=None )-> Command: - """ - Given: optional write_file dpath/fname overrides. - Does: build a 'delete' command (unlink if present). - Returns: Command (appended). - """ + "Given optional WriteFileMeta plus overrides. Does build 'delete' entry or return errors. Returns Command." cmd = Command("delete") - wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str) - - if not _is_abs_dpath(wf_dpath_str): - cmd.add_error("write_file_dpath_str must be absolute and non-empty") - if not wf_fname_str or "/" in wf_fname_str: - cmd.add_error("write_file_fname_str must be a simple filename (no '/')") - - cmd.args_map.update({ - "write_file_dpath_str": wf_dpath_str - ,"write_file_fname_str": wf_fname_str + dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname) + if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute") + if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename") + cmd.arg_dict.update({ + "write_file_dpath_str": dpath, + "write_file_fname": fname, }) - self._journal.append(cmd) + if not cmd.errors_list: + self._journal.append(cmd) return cmd + + + diff --git a/developer/source/DNS/deploy.py b/developer/source/DNS/deploy.py deleted file mode 100755 index 5f12397..0000000 --- a/developer/source/DNS/deploy.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python3 -""" -deploy.py — Deploy staged DNS bundle (Unbound per-subu + nft redirect) -RT-v2025.09.15.4 - -What it does - - Installs the staged tree under ./stage into / - - systemctl daemon-reload - - nft -f /etc/nftables.conf (relies on: include "/etc/nftables.d/*.nft") - - enable + restart unbound@ for each instance (default: US x6) - -Assumptions - - This file lives next to install_staged_tree.py - - Stage contains: - stage/etc/nftables.d/10-block-IPv6.nft - stage/etc/nftables.d/20-SUBU-ports.nft - stage/etc/systemd/system/unbound@.service - stage/etc/unbound/unbound-US.conf (127.0.0.1@5301) - stage/etc/unbound/unbound-x6.conf (127.0.0.1@5302) - - /etc/nftables.conf has: include "/etc/nftables.d/*.nft" - -Exit codes - 0 = success, 2 = preflight/deploy error -""" -from __future__ import annotations -from pathlib import Path -import argparse -import importlib -import os -import subprocess -import sys - -ROOT = Path(__file__).resolve().parent -STAGE = ROOT / "stage" - -def _run(cmd: list[str]) -> tuple[int, str, str]: - cp = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return (cp.returncode, cp.stdout.strip(), cp.stderr.strip()) - -def _preflight_errors() -> list[str]: - errs = [] - if os.geteuid() != 0: - errs.append("must be run as root (sudo)") - if not STAGE.exists(): - errs.append(f"stage dir missing: {STAGE}") - return errs - -def _install_stage(stage_root: Path) -> list[str]: - """ - Call install_staged_tree.install_staged_tree(stage_root=..., dest_root=/, create_dirs=True) - and return its log lines. - """ - sys.path.insert(0, str(ROOT)) - try: - ist = importlib.import_module("install_staged_tree") - except Exception as e: - raise RuntimeError(f"failed to import install_staged_tree: {e}") - - # Expect signature: (stage_root, dest_root, create_dirs=False, skip_identical=True) -> (logs, ifaces) - try: - logs, _ifaces = ist.install_staged_tree( - stage_root=stage_root, - dest_root=Path("/"), - create_dirs=True, - skip_identical=True, - ) - except TypeError as te: - # Fallback for older two-arg signature: install_staged_tree(stage_root, dest_root) - try: - logs, _ifaces = ist.install_staged_tree(stage_root, Path("/")) - except Exception as e2: - raise RuntimeError(f"install_staged_tree() call failed: {e2}") from te - return logs - -def deploy(instances: list[str]) -> list[str]: - logs: list[str] = [] - - # Plan - logs.append("Deploy DNS plan:") - logs.append(f" instances: {', '.join(instances)}") - logs.append(f" stage: {STAGE}") - logs.append(f" root: /") - logs.append("") - logs.append("Installing staged artifacts…") - - # Install staged files - install_logs = _install_stage(STAGE) - logs.extend(install_logs) - - # Reload systemd units (for unbound@.service changes) - _run(["systemctl", "daemon-reload"]) - - # Apply nftables from the main config (which includes drop-ins) - rc, out, err = _run(["/usr/sbin/nft", "-f", "/etc/nftables.conf"]) - if rc != 0: - raise RuntimeError(f"nftables apply failed:\n{err or out}") - - # Sanity: verify our tables are present - rc2, out2, err2 = _run(["/usr/sbin/nft", "list", "tables"]) - if rc2 != 0: - raise RuntimeError(f"nftables list tables failed:\n{err2 or out2}") - - required = {"inet NO-IPV6", "inet SUBU-DNS-REDIRECT", "inet SUBU-PORT-EGRESS"} - present = set() - for line in out2.splitlines(): - parts = line.strip().split() - # lines look like: "table inet FOO" - if len(parts) == 3 and parts[0] == "table": - present.add(f"{parts[1]} {parts[2]}") - missing = required - present - if missing: - raise RuntimeError(f"nftables missing tables: {', '.join(sorted(missing))}") - - # Enable + restart unbound instances - for inst in instances: - unit = f"unbound@{inst}.service" - _run(["systemctl", "enable", unit]) - _run(["systemctl", "restart", unit]) - rcA, _, _ = _run(["systemctl", "is-active", unit]) - logs.append(f"{unit}: {'active' if rcA == 0 else 'inactive'}") - - logs.append("") - logs.append("✓ DNS deploy complete.") - return logs - -def main(argv=None) -> int: - ap = argparse.ArgumentParser(description="Deploy staged DNS (Unbound per-subu + nft redirect).") - ap.add_argument("--instances", nargs="+", default=["US", "x6"], - help="Unbound instances to enable (default: US x6)") - args = ap.parse_args(argv) - - errs = _preflight_errors() - if errs: - print("❌ deploy preflight found issue(s):", file=sys.stderr) - for e in errs: - print(f" - {e}", file=sys.stderr) - return 2 - - try: - logs = deploy(args.instances) - print("\n".join(logs)) - return 0 - except Exception as e: - print(f"❌ deploy failed: {e}", file=sys.stderr) - return 2 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft b/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft new file mode 100644 index 0000000..eaee5be --- /dev/null +++ b/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft @@ -0,0 +1,16 @@ +table inet NO-IPV6 { + chain input { + type filter hook input priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop + } + + chain output { + type filter hook output priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop + } + + chain forward { + type filter hook forward priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 forward" drop + } +} diff --git a/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft b/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft new file mode 100644 index 0000000..6c31446 --- /dev/null +++ b/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft @@ -0,0 +1,47 @@ +table inet SUBU-DNS-REDIRECT { + chain output { + type nat hook output priority -100; policy accept; + + # Redirect DNS for the subu UIDs to local Unbound listeners + meta skuid 2017 udp dport 53 redirect to :5301 + meta skuid 2018 udp dport 53 redirect to :5302 + meta skuid 2017 tcp dport 53 redirect to :5301 + meta skuid 2018 tcp dport 53 redirect to :5302 + } +} + +table inet SUBU-PORT-EGRESS { + chain output { + type filter hook output priority 0; policy accept; + + # Always allow loopback on egress + oifname "lo" accept + + # No IPv6 for subu (until you reintroduce v6) + meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop + + ##### x6 (UID 2018) + # Block some exfil channels regardless of iface + meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop + meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop + meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop + + # (Optional) allow ICMP echo out via x6 + meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept + + # Enforce interface binding + meta skuid 2018 oifname "x6" accept + meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop + + ##### US (UID 2017) + meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission" + meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN" + meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)" + + # (Optional) ICMP via US + meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept + + meta skuid 2017 oifname "US" accept + meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop + } +} diff --git a/developer/source/DNS/deprecated/stage_orig/etc/systemd/system/unbound@.service b/developer/source/DNS/deprecated/stage_orig/etc/systemd/system/unbound@.service new file mode 100644 index 0000000..ba2919b --- /dev/null +++ b/developer/source/DNS/deprecated/stage_orig/etc/systemd/system/unbound@.service @@ -0,0 +1,19 @@ +[Unit] +Description=Unbound DNS instance for %i (per-subu tunnel egress) +After=network-online.target wg-quick@%i.service +Requires=wg-quick@%i.service +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf +User=unbound +Group=unbound +Restart=on-failure +RestartSec=2s +AmbientCapabilities=CAP_NET_BIND_SERVICE +CapabilityBoundingSet=CAP_NET_BIND_SERVICE +NoNewPrivileges=true + +[Install] +WantedBy=multi-user.target diff --git a/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-US.conf b/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-US.conf new file mode 100644 index 0000000..1995438 --- /dev/null +++ b/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-US.conf @@ -0,0 +1,18 @@ +server: + username: "unbound" + chroot: "" + directory: "/etc/unbound" + do-daemonize: no + interface: 127.0.0.1@5301 + hide-identity: yes + hide-version: yes + harden-glue: yes + harden-dnssec-stripped: yes + qname-minimisation: yes + prefetch: yes + outgoing-interface: 10.0.0.1 + +forward-zone: + name: "." + forward-addr: 1.1.1.1 + forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-x6.conf b/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-x6.conf new file mode 100644 index 0000000..ed49241 --- /dev/null +++ b/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-x6.conf @@ -0,0 +1,18 @@ +server: + username: "unbound" + chroot: "" + directory: "/etc/unbound" + do-daemonize: no + interface: 127.0.0.1@5302 + hide-identity: yes + hide-version: yes + harden-glue: yes + harden-dnssec-stripped: yes + qname-minimisation: yes + prefetch: yes + outgoing-interface: 10.8.0.2 + +forward-zone: + name: "." + forward-addr: 1.1.1.1 + forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/deprecated/stage_orig/usr/local/sbin/DNS_status.sh b/developer/source/DNS/deprecated/stage_orig/usr/local/sbin/DNS_status.sh new file mode 100755 index 0000000..d4db58e --- /dev/null +++ b/developer/source/DNS/deprecated/stage_orig/usr/local/sbin/DNS_status.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail +echo "== DNS status ==" +systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true +echo +echo "== nftables ==" +nft list table inet NAT-DNS-REDIRECT || true +echo +echo "== Unbound logs (last 50 lines each) ==" +journalctl -u unbound@US -n 50 --no-pager || true +echo +journalctl -u unbound@x6 -n 50 --no-pager || true diff --git a/developer/source/DNS/deprecated/stage_show_plan.py b/developer/source/DNS/deprecated/stage_show_plan.py new file mode 100644 index 0000000..075e65b --- /dev/null +++ b/developer/source/DNS/deprecated/stage_show_plan.py @@ -0,0 +1,97 @@ +#!/usr/bin/env -S python3 -B +""" +stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan. + +Given: a stage root directory. +Does: loads Stage.py, executes each config, builds a native plan map, summarizes it. +Returns: exit code 0 on success, non-zero on error. +""" +from __future__ import annotations +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib + +# ---------- helpers ---------- + +def _load_stage_module(stage_root_dpath: Path): + "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module." + mod_fpath = stage_root_dpath/"Stage.py" + if not mod_fpath.exists(): + raise FileNotFoundError(f"Stage.py not found at {mod_fpath}") + spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath)) + mod = importlib.util.module_from_spec(spec) + sys.modules["Stage"] = mod + assert spec and spec.loader + spec.loader.exec_module(mod) # type: ignore + return mod + +def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]: + "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]." + rel_fpath_list: list[Path] = [] + for p in stage_root_dpath.rglob("*.py"): + if p.name == "Stage.py": continue + if p.is_file(): + rel_fpath_list.append(p.relative_to(stage_root_dpath)) + return sorted(rel_fpath_list ,key=lambda x: x.as_posix()) + +def _sha256_hex(b: bytes)-> str: + "Given: bytes. Does: sha256. Returns: hex string." + return hashlib.sha256(b).hexdigest() + +# ---------- main ---------- + +def main(argv: list[str]|None=None)-> int: + "Given: CLI. Does: show plan. Returns: exit code." + ap = argparse.ArgumentParser(prog="stage_show_plan.py" + ,description="Run staged config scripts and print the resulting plan.") + ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)") + args = ap.parse_args(argv) + + stage_root_dpath = Path(args.stage) + StageMod = _load_stage_module(stage_root_dpath) + Stage = StageMod.Stage + Stage._reset() + Stage.set_meta( + planner_user_name=getpass.getuser() + ,planner_uid_int=os.getuid() + ,planner_gid_int=os.getgid() + ,host_name=socket.gethostname() + ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) + ) + + for rel_fpath in _config_rel_fpaths(stage_root_dpath): + Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath) + runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__") + Stage._end() + + plan_map = Stage.plan_object() + entries_list = plan_map["entries_list"] + print(f"Plan version: {plan_map['version_int']}") + print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} " + f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}") + print(f"Created: {plan_map['meta_map'].get('created_utc_str')}") + print(f"Entries: {len(entries_list)}\n") + + for i ,e_map in enumerate(entries_list ,1): + op = e_map.get("op") + dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}" + if op == "copy": + content = e_map.get("content_bytes") or b"" + sz = len(content) + mode = e_map.get("mode_octal_str") or "????" + owner = e_map.get("owner_name") or "?" + h = _sha256_hex(content) + print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…") + elif op == "displace": + print(f"{i:02d}. displace -> {dst_fpath_str}") + elif op == "delete": + print(f"{i:02d}. delete -> {dst_fpath_str}") + else: + print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})") + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/executor.py b/developer/source/DNS/executor.py new file mode 100755 index 0000000..fffa490 --- /dev/null +++ b/developer/source/DNS/executor.py @@ -0,0 +1,271 @@ +#!/usr/bin/env -S python3 -B +""" +executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now) + +Phase 1 (outer): + - Build a combined plan by executing each config's `configure(prov, planner, WriteFileMeta)`. + - Optionally print the plan via Planner.print(). + - Optionally stop. + +Phase 2 (inner shim in same program for now; no privilege yet): + - Encode combined plan to CBOR and pass to inner path. + - Inner decodes back to a Journal and optionally prints it. + - Optionally stop. + +Discovery: + - --stage (default: ./stage) points at the stage directory root. + - By default, *every file* under --stage (recursively) is executed as a config, + regardless of extension. Editors can still use .py for highlighting; we strip + only a trailing ".py" to derive prov.read_fname. + +""" + +from __future__ import annotations + +# no bytecode anywhere +import sys, os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1") + +from pathlib import Path +import argparse +import getpass +import tempfile +import runpy +import subprocess +import datetime as _dt +import os, fnmatch, stat + + +# Local module: Planner.py (same directory) +from Planner import ( + Planner, PlanProvenance, WriteFileMeta, Journal, Command, +) + +# -------- utilities -------- + +def iso_utc_now_str() -> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _split_globs(glob_arg: str) -> list[str]: + parts = [g.strip() for g in (glob_arg or "").split(",") if g.strip()] + # Default includes both deep and top-level files + return parts or ["**/*", "*"] + +def find_config_paths(stage_root: Path, glob_arg: str) -> list[Path]: + """ + Given stage root and comma-glob string, return sorted list of files (regular or symlink). + Defaults to match ALL files under stage, including top-level ones. + """ + root = stage_root.resolve() + patterns = _split_globs(glob_arg) + out: set[Path] = set() + + for dirpath, dirnames, filenames in os.walk(root, followlinks=False): + # (optional) prune symlinked dirs to avoid cycles; files can still be symlinks + dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath, d))] + + for fname in filenames: + f_abs = Path(dirpath, fname) + rel = f_abs.relative_to(root).as_posix() + if any(fnmatch.fnmatch(rel, pat) for pat in patterns): + try: + st = f_abs.lstat() + if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): + out.add(f_abs) + except Exception: + # unreadable/broken entries are skipped + pass + + return sorted(out, key=lambda p: p.as_posix()) + + + +def _run_one_config(config_path: Path, stage_root: Path) -> Planner: + """Execute a single config's `configure(prov, planner, WriteFileMeta)` and return that config's Planner.""" + prov = PlanProvenance(stage_root=stage_root, config_path=config_path) + per_planner = Planner(provenance=prov) # defaults derive from this file's provenance + env = runpy.run_path(str(config_path)) + fn = env.get("configure") + if not callable(fn): + raise RuntimeError(f"{config_path}: missing callable configure(prov, planner, WriteFileMeta)") + fn(prov, per_planner, WriteFileMeta) + return per_planner + +def _aggregate_into_master(stage_root: Path, planners: list[Planner]) -> Planner: + """Create a master Planner and copy all Commands from per-config planners into it.""" + # Synthetic provenance for the master planner (used only for display/meta) + fake_config = stage_root / "(aggregate).py" + master = Planner(PlanProvenance(stage_root=stage_root, config_path=fake_config)) + + # annotate meta + master.journal().set_meta( + generator_prog_str="executor.py", + generated_at_utc_str=iso_utc_now_str(), + user_name_str=getpass.getuser(), + host_name_str=os.uname().nodename if hasattr(os, "uname") else "unknown", + stage_root_dpath_str=str(stage_root.resolve()), + configs_list=[p._prov.config_rel_fpath.as_posix() for p in planners], + ) + + # copy commands + out_j = master.journal() + for p in planners: + for cmd in p.journal().command_list: + out_j.append(cmd) # keep Command objects as-is + return master + +# ----- CBOR “matchbox” (simple wrapper kept local to executor) ----- + +def _plan_to_cbor_bytes(planner: Planner) -> bytes: + """Serialize a Planner's Journal to CBOR bytes.""" + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + plan_dict = planner.journal().as_dictionary() + return cbor2.dumps(plan_dict, canonical=True) + +def _journal_from_cbor_bytes(data: bytes) -> Journal: + """Rebuild a Journal from CBOR bytes.""" + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"cbor2 is required: {e}") + obj = cbor2.loads(data) + if not isinstance(obj, dict): + raise ValueError("CBOR root must be a dict") + return Journal(plan_dict=obj) + +# -------- inner executor (phase 2) -------- + +def _inner_main(plan_path: Path, phase2_print: bool, phase2_then_stop: bool) -> int: + """Inner executor path: decode CBOR → Journal; optionally print; (apply TBD).""" + try: + data = Path(plan_path).read_bytes() + except Exception as e: + print(f"error: failed to read plan file: {e}", file=sys.stderr) + return 2 + + try: + journal = _journal_from_cbor_bytes(data) + except Exception as e: + print(f"error: failed to decode CBOR: {e}", file=sys.stderr) + return 2 + + if phase2_print: + journal.print() + + if phase2_then_stop: + return 0 + + # (Stage 3 apply would go here; omitted in MVP) + return 0 + +# -------- outer executor (phase 1 & handoff) -------- + +def _outer_main(args) -> int: + stage_root = Path(args.stage) + if not stage_root.is_dir(): + print(f"error: --stage not a directory: {stage_root}", file=sys.stderr) + return 2 + + cfgs = find_config_paths(stage_root, args.glob) + if not cfgs: + print("No configuration files found.") + return 0 + + # Execute each config into its own planner + per_planners: list[Planner] = [] + for cfg in cfgs: + try: + per_planners.append(_run_one_config(cfg, stage_root)) + except SystemExit: + raise + except Exception as e: + print(f"error: executing {cfg}: {e}", file=sys.stderr) + return 2 + + # Aggregate into a single master planner for printing/CBOR + master = _aggregate_into_master(stage_root, per_planners) + + if args.phase_1_print: + master.print() + + if args.phase_1_then_stop: + return 0 + + # Phase 2: encode CBOR and invoke inner path (same script, --inner) + try: + cbor_bytes = _plan_to_cbor_bytes(master) + except Exception as e: + print(f"error: CBOR encode failed: {e}", file=sys.stderr) + return 2 + + with tempfile.NamedTemporaryFile(prefix="stagehand_plan_", suffix=".cbor", delete=False) as tf: + tf.write(cbor_bytes) + plan_path = tf.name + + try: + cmd = [ + sys.executable, + str(Path(__file__).resolve()), + "--inner", + "--plan", plan_path, + ] + if args.phase_2_print: + cmd.append("--phase-2-print") + if args.phase_2_then_stop: + cmd.append("--phase-2-then-stop") + + proc = subprocess.run(cmd) + return proc.returncode + finally: + try: + os.unlink(plan_path) + except Exception: + pass + +# -------- CLI -------- + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser( + prog="executor.py", + description="StageHand outer/inner executor (plan → CBOR → decode).", + ) + ap.add_argument("--stage", default="stage", help="stage root directory (default: ./stage)") + + ap.add_argument("--glob", default="**/*", + help="glob for config scripts under --stage (default: '**/*' = all files)") + + # ap.add_argument("--glob", + # default="**/*", + # help="comma-separated globs under --stage (default: **/*; every file is a config)") + + # Phase-1 (outer) controls + ap.add_argument("--phase-1-print", action="store_true", help="print master planner (phase 1)") + ap.add_argument("--phase-1-then-stop", action="store_true", help="stop after phase 1") + + # Phase-2 (inner) controls (outer forwards these to inner) + ap.add_argument("--phase-2-print", action="store_true", help="print decoded journal (phase 2)") + ap.add_argument("--phase-2-then-stop", action="store_true", help="stop after phase 2 decode") + + # Inner-only flags (not for users) + ap.add_argument("--inner", action="store_true", help=argparse.SUPPRESS) + ap.add_argument("--plan", default=None, help=argparse.SUPPRESS) + + args = ap.parse_args(argv) + + if args.inner: + if not args.plan: + print("error: --inner requires --plan ", file=sys.stderr) + return 2 + return _inner_main(Path(args.plan), + phase2_print=args.phase_2_print, + phase2_then_stop=args.phase_2_then_stop) + + return _outer_main(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/install_staged_tree.py b/developer/source/DNS/install_staged_tree.py deleted file mode 100755 index 7c1786f..0000000 --- a/developer/source/DNS/install_staged_tree.py +++ /dev/null @@ -1,262 +0,0 @@ -#!/usr/bin/env python3 -""" -install_staged_tree.py -RT-v2025.09.15.2 - -A dumb installer: copy staged files into the target root with backups and -deterministic permissions. No systemd stop/start, no daemon-reload. - -- Extended whitelist to include DNS bundle assets: - * /etc/unbound/*.conf -> 0644 - * /etc/nftables.d/*.nft -> 0644 - * /usr/local/sbin/* -> 0500 - * /etc/systemd/system/*.service -> 0644 -- Keeps existing WireGuard/iproute2 handling. -- API unchanged: - install_staged_tree(stage_root: Path, dest_root: Path, - create_dirs=False, skip_identical=True) - -> returns (logs: list[str], detected_ifaces: list[str]) -""" - -from __future__ import annotations -from pathlib import Path -from typing import List, Optional, Sequence, Tuple -import argparse -import datetime as dt -import hashlib -import os -import shutil -import sys - -ROOT = Path(__file__).resolve().parent -DEFAULT_STAGE = ROOT / "stage" - -def _sha256(path: Path) -> str: - h = hashlib.sha256() - with path.open("rb") as f: - for chunk in iter(lambda: f.read(1<<20), b""): - h.update(chunk) - return h.hexdigest() - -def _ensure_parents(dest_root: Path, rel: Path, create: bool) -> None: - parent = (dest_root / rel).parent - if parent.exists(): - return - if not create: - raise RuntimeError(f"missing parent directory: {parent}") - parent.mkdir(parents=True, exist_ok=True) - -def _backup_existing_to_stage(stage_root: Path, dest_root: Path, rel: Path) -> Optional[Path]: - """If target exists, copy it back into stage/_backups// and return backup path.""" - target = dest_root / rel - if not target.exists(): - return None - ts = dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - backup = stage_root / "_backups" / ts / rel - backup.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(target, backup) - return backup - -def _atomic_install(src: Path, dst: Path, mode: int) -> None: - tmp = dst.with_suffix(dst.suffix + ".tmp") - shutil.copyfile(src, tmp) - os.chmod(tmp, mode) - try: - os.chown(tmp, 0, 0) # best-effort - except PermissionError: - pass - os.replace(tmp, dst) - -def _mode_for_rel(rel: Path) -> Optional[int]: - """Choose a mode based on the relative path bucket.""" - s = str(rel) - - # Existing buckets - if s.startswith("usr/local/bin/"): - return 0o500 - if s.startswith("etc/wireguard/") and rel.suffix == ".conf": - return 0o600 - if s == "etc/iproute2/rt_tables": - return 0o644 - if s.startswith("etc/systemd/system/") and s.endswith(".conf"): - return 0o644 - - # NEW: DNS bundle buckets - if s.startswith("usr/local/sbin/"): - return 0o500 - if s.startswith("etc/unbound/") and s.endswith(".conf"): - return 0o644 - if s.startswith("etc/nftables.d/") and s.endswith(".nft"): - return 0o644 - if s.startswith("etc/systemd/system/") and s.endswith(".service"): - return 0o644 - - return None - -def _iter_stage_targets(stage_root: Path) -> List[Path]: - """Return a list of *relative* paths under stage that match our whitelist.""" - rels: List[Path] = [] - - # /usr/local/bin/* - bin_dir = stage_root / "usr" / "local" / "bin" - if bin_dir.is_dir(): - for p in sorted(bin_dir.glob("*")): - if p.is_file(): - rels.append(p.relative_to(stage_root)) - - # NEW: /usr/local/sbin/* - sbin_dir = stage_root / "usr" / "local" / "sbin" - if sbin_dir.is_dir(): - for p in sorted(sbin_dir.glob("*")): - if p.is_file(): - rels.append(p.relative_to(stage_root)) - - # /etc/wireguard/*.conf - wg_dir = stage_root / "etc" / "wireguard" - if wg_dir.is_dir(): - for p in sorted(wg_dir.glob("*.conf")): - rels.append(p.relative_to(stage_root)) - - # /etc/systemd/system/wg-quick@*.service.d/*.conf - sysd_dir = stage_root / "etc" / "systemd" / "system" - if sysd_dir.is_dir(): - for p in sorted(sysd_dir.rglob("wg-quick@*.service.d/*.conf")): - rels.append(p.relative_to(stage_root)) - - # NEW: /etc/systemd/system/*.service - if sysd_dir.is_dir(): - for p in sorted(sysd_dir.glob("*.service")): - if p.is_file(): - rels.append(p.relative_to(stage_root)) - - # /etc/iproute2/rt_tables - rt = stage_root / "etc" / "iproute2" / "rt_tables" - if rt.is_file(): - rels.append(rt.relative_to(stage_root)) - - # NEW: /etc/unbound/*.conf - ub_dir = stage_root / "etc" / "unbound" - if ub_dir.is_dir(): - for p in sorted(ub_dir.glob("*.conf")): - rels.append(p.relative_to(stage_root)) - - # NEW: /etc/nftables.d/*.nft - nft_dir = stage_root / "etc" / "nftables.d" - if nft_dir.is_dir(): - for p in sorted(nft_dir.glob("*.nft")): - rels.append(p.relative_to(stage_root)) - - return rels - -def _discover_ifaces_from_stage(stage_root: Path) -> List[str]: - """Peek into staged artifacts to guess iface names (for friendly next-steps).""" - names = set() - - # from /etc/wireguard/.conf - wg_dir = stage_root / "etc" / "wireguard" - if wg_dir.is_dir(): - for p in wg_dir.glob("*.conf"): - names.add(p.stem) - - # from /etc/systemd/system/wg-quick@.service.d/ - sysd = stage_root / "etc" / "systemd" / "system" - if sysd.is_dir(): - for d in sysd.glob("wg-quick@*.service.d"): - name = d.name - at = name.find("@") - dot = name.find(".service.d") - if at != -1 and dot != -1 and dot > at: - names.add(name[at+1:dot]) - - return sorted(names) - -def install_staged_tree( - stage_root: Path, - dest_root: Path, - create_dirs: bool = False, - skip_identical: bool = True, -) -> Tuple[List[str], List[str]]: - """ - Copy files from stage_root to dest_root. - Returns (logs, detected_ifaces). - """ - old_umask = os.umask(0o077) - logs: List[str] = [] - try: - staged = _iter_stage_targets(stage_root) - if not staged: - raise RuntimeError("nothing to install (stage is empty or whitelist didn’t match)") - - for rel in staged: - src = stage_root / rel - dst = dest_root / rel - - mode = _mode_for_rel(rel) - if mode is None: - logs.append(f"skip (not whitelisted): {rel}") - continue - - _ensure_parents(dest_root, rel, create_dirs) - - backup = _backup_existing_to_stage(stage_root, dest_root, rel) - if backup: - logs.append(f"backup: {dst} -> {backup}") - - if skip_identical and dst.exists(): - try: - if _sha256(src) == _sha256(dst): - logs.append(f"identical: skip {rel}") - continue - except Exception: - pass - - _atomic_install(src, dst, mode) - logs.append(f"install: {rel} (mode {oct(mode)})") - - ifaces = _discover_ifaces_from_stage(stage_root) - return (logs, ifaces) - finally: - os.umask(old_umask) - -def _require_root(allow_nonroot: bool) -> None: - if not allow_nonroot and os.geteuid() != 0: - raise RuntimeError("must run as root (use --force-nonroot to override)") - -def main(argv: Optional[Sequence[str]] = None) -> int: - ap = argparse.ArgumentParser(description="Install staged artifacts into a target root. No service control.") - ap.add_argument("--stage", default=str(DEFAULT_STAGE)) - ap.add_argument("--root", default="/") - ap.add_argument("--create-dirs", action="store_true", help="create missing parent directories") - ap.add_argument("--no-skip-identical", action="store_true", help="always replace even if content identical") - ap.add_argument("--force-nonroot", action="store_true", help="allow non-root install (ownership may be wrong)") - args = ap.parse_args(argv) - - try: - _require_root(allow_nonroot=args.force_nonroot) - logs, ifaces = install_staged_tree( - stage_root=Path(args.stage), - dest_root=Path(args.root), - create_dirs=args.create_dirs, - skip_identical=(not args.no_skip_identical), - ) - for line in logs: - print(line) - - print("\n=== Summary ===") - print(f"Installed {sum(1 for l in logs if l.startswith('install:'))} file(s).") - if ifaces: - lst = " ".join(ifaces) - print(f"Detected interfaces from stage: {lst}") - print("\nNext steps:") - print(f" sudo ./start_iface.py {lst}") - else: - print("No interfaces detected in staged artifacts.") - print("\nNext steps:") - print(" sudo ./start_iface.py [more ifaces]") - return 0 - except Exception as e: - print(f"❌ install failed: {e}", file=sys.stderr) - return 2 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/plan_show.py b/developer/source/DNS/plan_show.py deleted file mode 100755 index 780d34a..0000000 --- a/developer/source/DNS/plan_show.py +++ /dev/null @@ -1,273 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -plan_show.py — build and display a staged plan (UNPRIVILEGED). - -Given: a stage directory of config scripts (*.stage.py by default). -Does: executes each script with a pre-created Planner (P) and PlannerContext, - aggregates Commands into a single Journal, by default prints from the CBOR - round-trip (encode→decode) so the human view matches what will be shipped - to stage_cp; runs well-formed (WF) invariant checks. Can emit CBOR if requested. -Returns: exit status 0 on success; 2 on WF errors or usage errors. -""" - -from __future__ import annotations - -# no bytecode anywhere -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from pathlib import Path -import argparse -import datetime as _dt -import getpass -import runpy - -# local module (same dir): Planner -from Planner import Planner ,PlannerContext ,Journal ,Command - -# ===== Utilities (general / reusable) ===== - -def iso_utc_now_str()-> str: - "Given n/a. Does return compact UTC timestamp. Returns YYYYMMDDTHHMMSSZ." - return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - -def find_configs(stage_root_dpath: Path ,glob_pat_str: str)-> list[Path]: - "Given stage root and glob. Does find matching files under stage. Returns list of absolute Paths." - root = stage_root_dpath.resolve() - return sorted((p for p in root.glob(glob_pat_str) if p.is_file()) ,key=lambda p: p.as_posix()) - -def human_size(n: int)-> str: - "Given byte count. Does format human size. Returns string." - units = ["B","KB","MB","GB","TB"] - i = 0 - x = float(max(0 ,n)) - while x >= 1024 and i < len(units)-1: - x /= 1024.0 - i += 1 - return f"{x:.1f} {units[i]}" - -def _dst_path_str(args_map: dict)-> str: - "Given args map. Does join write_file path. Returns POSIX path or '?'." - d = args_map.get("write_file_dpath_str") or "" - f = args_map.get("write_file_fname_str") or "" - try: - if d and f and "/" not in f: - return (Path(d)/f).as_posix() - except Exception: - pass - return "?" - -# ===== WF invariants (MVP) ===== -# These are “well-formedness” rules (shape/encoding/domain), not policy or privilege checks. - -def wf_check(journal: Journal)-> list[str]: - """ - Given Journal. Does run invariant checks on meta and each Command entry. Returns list of error strings. - Invariants (MVP): - - meta_map: must include generator identity and stage_root_dpath_str. - - entry.op ∈ {'copy','displace','delete'} - - all ops: write_file_dpath_str absolute; write_file_fname_str is bare filename. - - copy: owner_name_str non-empty; mode_int ∈ [0..0o7777] and no suid/sgid; content_bytes present (bytes). - """ - errs: list[str] = [] - meta = journal.meta_map or {} - - # meta presence (light placeholder) - if not isinstance(meta ,dict): - errs.append("WF_META: meta_map must be a map") - else: - if not meta.get("stage_root_dpath_str"): - errs.append("WF_META: missing stage_root_dpath_str") - if not meta.get("generator_prog_str"): - errs.append("WF_META: missing generator_prog_str") - - # entries - for idx ,cmd in enumerate(journal.commands_list ,1): - prefix = f"WF[{idx:02d}]" - if not isinstance(cmd ,Command): - errs.append(f"{prefix}: entry is not Command") - continue - op = cmd.name_str - if op not in {"copy","displace","delete"}: - errs.append(f"{prefix}: unknown op '{op}'") - continue - am = cmd.args_map or {} - dpath = am.get("write_file_dpath_str") - fname = am.get("write_file_fname_str") - if not isinstance(dpath ,str) or not dpath.startswith("/"): - errs.append(f"{prefix}: write_file_dpath_str must be absolute") - if not isinstance(fname ,str) or not fname or "/" in fname: - errs.append(f"{prefix}: write_file_fname_str must be a bare filename") - - if op == "copy": - owner = am.get("owner_name_str") - mode = am.get("mode_int") - data = am.get("content_bytes") - if not isinstance(owner ,str) or not owner.strip(): - errs.append(f"{prefix}: owner_name_str must be non-empty") - if not isinstance(mode ,int) or not (0 <= mode <= 0o7777): - errs.append(f"{prefix}: mode_int must be int in [0..0o7777]") - elif (mode & 0o6000): - errs.append(f"{prefix}: mode_int suid/sgid not allowed in MVP") - if not isinstance(data ,(bytes,bytearray)): - errs.append(f"{prefix}: content_bytes must be bytes") - return errs - -# ===== Planner execution ===== - -def _run_one_config(config_abs_fpath: Path ,stage_root_dpath: Path)-> Planner: - """ - Given abs path to a config script and stage root. Does construct a PlannerContext and Planner, - then executes the script with 'P' (Planner instance) bound in globals. Returns Planner with Journal. - Notes: - - Defaults are intentionally spartan; config should refine them via P.set_context(...). - - This is UNPRIVILEGED; no filesystem changes are performed here. - """ - read_rel = config_abs_fpath.resolve().relative_to(stage_root_dpath.resolve()) - ctx = PlannerContext.from_values( - stage_root_dpath=stage_root_dpath - ,read_file_rel_fpath=read_rel - ,write_file_dpath_str="/" - ,write_file_fname_str="." - ,owner_name_str=getpass.getuser() - ,perm=0o644 - ,content=None - ) - P = Planner(ctx) - g = {"Planner": Planner ,"PlannerContext": PlannerContext ,"P": P} - runpy.run_path(str(config_abs_fpath) ,init_globals=g) - return P - -def _aggregate_journal(planners_list: list[Planner] ,stage_root_dpath: Path)-> Journal: - "Given planners and stage root. Does aggregate Commands into a single Journal with meta. Returns Journal." - J = Journal() - J.set_meta( - version_int=1 - ,generator_prog_str="plan_show.py" - ,generated_at_utc_str=iso_utc_now_str() - ,user_name_str=getpass.getuser() - ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown" - ,stage_root_dpath_str=str(stage_root_dpath.resolve()) - ,configs_list=[p.context().read_file_rel_fpath.as_posix() for p in planners_list] - ) - for p in planners_list: - for cmd in p.journal().commands_list: - J.append(cmd) - return J - -def _print_plan(journal: Journal)-> None: - "Given Journal. Does print a readable summary. Returns None." - meta = journal.meta_map or {} - print(f"Stage: {meta.get('stage_root_dpath_str','?')}") - print(f"Generated: {meta.get('generated_at_utc_str','?')} by {meta.get('user_name_str','?')}@{meta.get('host_name_str','?')}\n") - - entries = journal.commands_list - if not entries: - print("(plan is empty)") - return - - n_copy = sum(1 for c in entries if c.name_str=="copy") - n_disp = sum(1 for c in entries if c.name_str=="displace") - n_del = sum(1 for c in entries if c.name_str=="delete") - print(f"Entries: {len(entries)} copy:{n_copy} displace:{n_disp} delete:{n_del}\n") - - for i ,cmd in enumerate(entries ,1): - am = cmd.args_map - dst = _dst_path_str(am) - if cmd.name_str == "copy": - size = len(am.get("content_bytes") or b"") - mode = am.get("mode_int") - owner = am.get("owner_name_str") - print(f"{i:02d}. copy -> {dst} mode {mode:04o} owner {owner} bytes {size} ({human_size(size)})") - elif cmd.name_str == "displace": - print(f"{i:02d}. displace -> {dst}") - elif cmd.name_str == "delete": - print(f"{i:02d}. delete -> {dst}") - else: - print(f"{i:02d}. ?op? -> {dst}") - -def _maybe_emit_CBOR(journal: Journal ,emit_CBOR_fpath: Path|None)-> None: - "Given Journal and optional path. Does write CBOR if requested. Returns None." - if not emit_CBOR_fpath: - return - try: - data = journal.to_CBOR_bytes(canonical_bool=True) - except Exception as e: - print(f"error: CBOR encode failed: {e}" ,file=sys.stderr) - raise - emit_CBOR_fpath.parent.mkdir(parents=True ,exist_ok=True) - with open(emit_CBOR_fpath ,"wb") as fh: - fh.write(data) - print(f"\nWrote CBOR plan: {emit_CBOR_fpath} ({len(data)} bytes)") - -# ===== CLI ===== - -def main(argv: list[str]|None=None)-> int: - "Given CLI. Does discover configs, build plan, (optionally) CBOR round-trip before printing, run WF, optionally emit CBOR. Returns exit code." - ap = argparse.ArgumentParser(prog="plan_show.py" - ,description="Build and show a staged plan (no privilege, no apply).") - ap.add_argument("--stage",default="stage",help="stage directory root (default: ./stage)") - ap.add_argument("--glob",default="**/*.stage.py",help="glob for config scripts under --stage") - ap.add_argument("--emit-CBOR",default=None,help="write CBOR plan to this path (optional)") - ap.add_argument("--print-from-journal",action="store_true" - ,help="print directly from in-memory Journal (skip CBOR round-trip)") - args = ap.parse_args(argv) - - stage_root_dpath = Path(args.stage) - if not stage_root_dpath.is_dir(): - print(f"error: --stage not a directory: {stage_root_dpath}" ,file=sys.stderr) - return 2 - - configs = find_configs(stage_root_dpath ,args.glob) - if not configs: - print("No config scripts found.") - return 0 - - planners: list[Planner] = [] - for cfg in configs: - try: - planners.append(_run_one_config(cfg ,stage_root_dpath)) - except SystemExit: - raise - except Exception as e: - print(f"error: executing {cfg}: {e}" ,file=sys.stderr) - return 2 - - journal_src = _aggregate_journal(planners ,stage_root_dpath) - - if not args.print_from_journal: - try: - cbor_bytes = journal_src.to_CBOR_bytes(canonical_bool=True) - journal = Journal.from_CBOR_bytes(cbor_bytes) - except Exception as e: - print(f"error: CBOR round-trip failed: {e}" ,file=sys.stderr) - return 2 - else: - journal = journal_src - - _print_plan(journal) - - errs = wf_check(journal) - if errs: - print("\nerror(s):" ,file=sys.stderr) - for e in errs: - print(f" - {e}" ,file=sys.stderr) - return 2 - - emit = Path(args.emit_CBOR) if args.emit_CBOR else None - if emit: - try: - data = (cbor_bytes if not args.print_from_journal else journal_src.to_CBOR_bytes(canonical_bool=True)) - emit.parent.mkdir(parents=True ,exist_ok=True) - with open(emit ,"wb") as fh: - fh.write(data) - print(f"\nWrote CBOR plan: {emit} ({len(data)} bytes)") - except Exception as e: - print(f"error: failed to write CBOR: {e}" ,file=sys.stderr) - return 2 - - return 0 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/stage b/developer/source/DNS/stage deleted file mode 120000 index 6b42c53..0000000 --- a/developer/source/DNS/stage +++ /dev/null @@ -1 +0,0 @@ -stage_test_0/ \ No newline at end of file diff --git a/developer/source/DNS/stage_cp.py b/developer/source/DNS/stage_cp.py deleted file mode 100644 index 0114853..0000000 --- a/developer/source/DNS/stage_cp.py +++ /dev/null @@ -1,322 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -stage_cp.py — build a CBOR plan from staged configs; show, validate, and apply with privilege. - -Given: a stage root directory. -Does: (user) run configs → build native plan → WF checks → summarize → encode plan → sudo re-exec - (root) decode plan → VALID + SANITY → apply ops (displace/copy/delete) safely. -Returns: exit code. - -Requires: pip install cbor2 -""" -from __future__ import annotations -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from pathlib import Path -import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,tempfile ,subprocess ,pwd -from typing import Any -import cbor2 - -# ---------- small utils ---------- - -def _load_stage_module(stage_root_dpath: Path): - "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module." - mod_fpath = stage_root_dpath/"Stage.py" - if not mod_fpath.exists(): - raise FileNotFoundError(f"Stage.py not found at {mod_fpath}") - spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath)) - mod = importlib.util.module_from_spec(spec) - sys.modules["Stage"] = mod - assert spec and spec.loader - spec.loader.exec_module(mod) # type: ignore - return mod - -def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]: - "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]." - rel_fpath_list: list[Path] = [] - for p in stage_root_dpath.rglob("*.py"): - if p.name == "Stage.py": continue - if p.is_file(): - rel_fpath_list.append(p.relative_to(stage_root_dpath)) - return sorted(rel_fpath_list ,key=lambda x: x.as_posix()) - -def _sha256_bytes(b: bytes)-> bytes: - "Given: bytes. Does: sha256. Returns: 32-byte digest." - return hashlib.sha256(b).digest() - -def _dst_fpath_str(dst_dpath_str: str ,dst_fname_str: str)-> str: - "Given: a directory path string and a filename string. Does: join. Returns: combined POSIX path string." - if "/" in dst_fname_str: - return "" # invalid; WF will flag - return str((Path(dst_dpath_str)/dst_fname_str)) - -# ---------- WF / VALID / SANITY ---------- - -_ALLOWLIST_PREFIXES_LIST = ["/etc" ,"/usr/local" ,"/etc/systemd/system"] - -def wf_check(plan_map: dict[str,Any])-> list[str]: - "Given: plan map. Does: shape/lexical checks only. Returns: list of error strings." - errs_list: list[str] = [] - if plan_map.get("version_int") != 1: - errs_list.append("WF_VERSION: unsupported plan version") - entries_list = plan_map.get("entries_list") - if not isinstance(entries_list ,list): - errs_list.append("WF_ENTRIES: 'entries_list' missing or not a list") - return errs_list - for i ,e_map in enumerate(entries_list ,1): - op = e_map.get("op") - dst_dpath_str = e_map.get("dst_dpath") - dst_fname_str = e_map.get("dst_fname") - where = f"entry {i}" - if op not in ("copy","displace","delete"): - errs_list.append(f"WF_OP:{where}: invalid op {op!r}") - continue - if not isinstance(dst_dpath_str ,str) or not dst_dpath_str: - errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath missing or not str") - if not isinstance(dst_fname_str ,str) or not dst_fname_str: - errs_list.append(f"WF_DST_FNAME:{where}: dst_fname missing or not str") - if isinstance(dst_fname_str ,str) and "/" in dst_fname_str: - errs_list.append(f"WF_DST_FNAME:{where}: dst_fname must not contain '/'") - if isinstance(dst_dpath_str ,str) and not dst_dpath_str.startswith("/"): - errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath must be absolute") - full_fpath_str = _dst_fpath_str(dst_dpath_str or "" ,dst_fname_str or "") - if not full_fpath_str or not full_fpath_str.startswith("/"): - errs_list.append(f"WF_PATH:{where}: failed to construct absolute path from dst_dpath/fname") - if op == "copy": - mode_int = e_map.get("mode_int") - if not isinstance(mode_int ,int) or not (0 <= mode_int <= 0o7777): - errs_list.append(f"WF_MODE:{where}: mode_int must be int in [0..0o7777]") - if isinstance(mode_int ,int) and (mode_int & 0o6000): - errs_list.append(f"WF_MODE:{where}: suid/sgid bits not allowed in MVP") - owner_name = e_map.get("owner_name") - if not isinstance(owner_name ,str) or not owner_name: - errs_list.append(f"WF_OWNER:{where}: owner_name must be non-empty username string") - content_bytes = e_map.get("content_bytes") - if not (isinstance(content_bytes ,(bytes,bytearray)) and len(content_bytes) >= 0): - errs_list.append(f"WF_CONTENT:{where}: content_bytes must be bytes (may be empty)") - sha = e_map.get("sha256_bytes") - if sha is not None: - if not isinstance(sha ,(bytes,bytearray)) or len(sha)!=32: - errs_list.append(f"WF_SHA256:{where}: sha256_bytes must be 32-byte digest if present") - elif isinstance(content_bytes ,(bytes,bytearray)) and sha != _sha256_bytes(content_bytes): - errs_list.append(f"WF_SHA256_MISMATCH:{where}: sha256_bytes does not match content_bytes") - return errs_list - -def valid_check(plan_map: dict[str,Any])-> list[str]: - "Given: plan map. Does: environment (read-only) checks. Returns: list of error strings." - errs_list: list[str] = [] - for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1): - op = e_map.get("op") - dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")) - where = f"entry {i}" - try: - parent_dpath = Path(dst_fpath_str).parent - if not parent_dpath.exists(): - errs_list.append(f"VAL_PARENT_MISSING:{where}: parent dir does not exist: {parent_dpath}") - elif not parent_dpath.is_dir(): - errs_list.append(f"VAL_PARENT_NOT_DIR:{where}: parent is not a directory: {parent_dpath}") - if Path(dst_fpath_str).is_dir(): - errs_list.append(f"VAL_DST_IS_DIR:{where}: destination exists as a directory: {dst_fpath_str}") - if op == "copy": - owner_name = e_map.get("owner_name") - try: - pw = pwd.getpwnam(owner_name) # may raise KeyError - e_map["_resolved_uid_int"] = pw.pw_uid - e_map["_resolved_gid_int"] = pw.pw_gid - except Exception: - errs_list.append(f"VAL_OWNER_UNKNOWN:{where}: user not found: {owner_name!r}") - except Exception as x: - errs_list.append(f"VAL_EXCEPTION:{where}: {x}") - return errs_list - -def sanity_check(plan_map: dict[str,Any])-> list[str]: - "Given: plan map. Does: policy checks (allowlist, denials). Returns: list of error strings." - errs_list: list[str] = [] - for i ,e_map in enumerate(plan_map.get("entries_list",[]) ,1): - dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")) - where = f"entry {i}" - if not any(dst_fpath_str.startswith(pref + "/") or dst_fpath_str==pref for pref in _ALLOWLIST_PREFIXES_LIST): - errs_list.append(f"POL_PATH_DENY:{where}: destination outside allowlist: {dst_fpath_str}") - return errs_list - -# ---------- APPLY (root) ---------- - -def _utc_str()-> str: - "Given: n/a. Does: current UTC compact. Returns: string." - import datetime as _dt - return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - -def _ensure_parent_dirs(dst_fpath: Path)-> None: - "Given: destination file path. Does: create parents. Returns: None." - dst_fpath.parent.mkdir(parents=True ,exist_ok=True) - -def _displace_in_place(dst_fpath: Path)-> None: - "Given: destination file path. Does: rename existing file/symlink to add UTC suffix. Returns: None." - try: - if dst_fpath.exists() or dst_fpath.is_symlink(): - suffix = "_" + _utc_str() - dst_fpath.rename(dst_fpath.with_name(dst_fpath.name + suffix)) - except FileNotFoundError: - pass - -def _apply_copy(dst_fpath: Path ,content_bytes: bytes ,mode_int: int ,uid_int: int ,gid_int: int)-> None: - "Given: target, bytes, mode, uid, gid. Does: write temp, fsync, chmod/chown, atomic replace. Returns: None." - _ensure_parent_dirs(dst_fpath) - _displace_in_place(dst_fpath) - tmp_fpath = dst_fpath.with_name("." + dst_fpath.name + ".stage_tmp") - with open(tmp_fpath ,"wb") as fh: - fh.write(content_bytes) - fh.flush() - os.fsync(fh.fileno()) - try: - os.chmod(tmp_fpath ,mode_int & 0o777) - except Exception: - pass - try: - os.chown(tmp_fpath ,uid_int ,gid_int) - except Exception: - pass - os.replace(tmp_fpath ,dst_fpath) # atomic within same dir/device - -def _apply_delete(dst_fpath: Path)-> None: - "Given: target file path. Does: unlink file/symlink if present. Returns: None." - try: - if dst_fpath.is_symlink() or dst_fpath.is_file(): - dst_fpath.unlink() - except FileNotFoundError: - pass - -def apply_plan(plan_map: dict[str,Any] ,dry_run_bool: bool=False)-> int: - "Given: plan map and dry flag. Does: execute ops sequentially. Returns: exit code." - for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1): - op = e_map.get("op") - dst_fpath = Path(_dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))) - if op == "displace": - print(f"+ displace {dst_fpath}") - if not dry_run_bool: - _displace_in_place(dst_fpath) - elif op == "delete": - print(f"+ delete {dst_fpath}") - if not dry_run_bool: - _apply_delete(dst_fpath) - elif op == "copy": - mode_int = e_map.get("mode_int") or 0o644 - uid_int = e_map.get("_resolved_uid_int" ,0) - gid_int = e_map.get("_resolved_gid_int" ,0) - content_bytes = e_map.get("content_bytes") or b"" - print(f"+ copy {dst_fpath} mode {mode_int:04o} uid {uid_int} gid {gid_int} bytes {len(content_bytes)}") - if not dry_run_bool: - _apply_copy(dst_fpath ,content_bytes ,mode_int ,uid_int ,gid_int) - else: - print(f"! unknown op {op} (skipping)") - return 2 - return 0 - -# ---------- orchestration ---------- - -def _build_plan_unpriv(stage_root_dpath: Path)-> dict[str,Any]: - "Given: stage root. Does: execute configs, accumulate entries, add sha256. Returns: plan map." - StageMod = _load_stage_module(stage_root_dpath) - Stage = StageMod.Stage - Stage._reset() - Stage.set_meta( - planner_user_name=getpass.getuser() - ,planner_uid_int=os.getuid() - ,planner_gid_int=os.getgid() - ,host_name=socket.gethostname() - ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) - ) - for rel_fpath in _config_rel_fpaths(stage_root_dpath): - Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath) - runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__") - Stage._end() - for e_map in Stage.plan_entries(): - if e_map.get("op") == "copy" and isinstance(e_map.get("content_bytes") ,(bytes,bytearray)): - e_map["sha256_bytes"] = _sha256_bytes(e_map["content_bytes"]) - return Stage.plan_object() - -def _sudo_apply_self(plan_fpath: Path ,dry_run_bool: bool)-> int: - "Given: plan file path and dry flag. Does: sudo re-exec current script with --apply. Returns: exit code." - cmd_list = ["sudo",sys.executable,os.path.abspath(__file__),"--apply" - ,"--plan",str(plan_fpath)] - if dry_run_bool: - cmd_list.append("--dry-run") - return subprocess.call(cmd_list) - -def main(argv: list[str]|None=None)-> int: - "Given: CLI. Does: plan, WF (user) then VALID+SANITY+APPLY (root). Returns: exit code." - ap = argparse.ArgumentParser(prog="stage_cp.py" - ,description="Plan staged config application and apply with sudo.") - ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)") - ap.add_argument("--dry-run",action="store_true",help="validate and show actions, do not change files") - ap.add_argument("--apply",action="store_true",help=argparse.SUPPRESS) # internal (root path) - ap.add_argument("--plan",default=None,help=argparse.SUPPRESS) # internal (root path) - args = ap.parse_args(argv) - - # Root path (apply) - if args.apply: - if os.geteuid() != 0: - print("error: --apply requires root" ,file=sys.stderr) - return 2 - if not args.plan: - print("error: --plan path required for --apply" ,file=sys.stderr) - return 2 - with open(args.plan ,"rb") as fh: - plan_map = cbor2.load(fh) - val_errs = valid_check(plan_map) - pol_errs = sanity_check(plan_map) - if val_errs or pol_errs: - print("error(s) during validation/sanity:" ,file=sys.stderr) - for e in val_errs: print(f" - {e}" ,file=sys.stderr) - for e in pol_errs: print(f" - {e}" ,file=sys.stderr) - return 2 - rc = apply_plan(plan_map ,dry_run_bool=args.dry_run) - return rc - - # User path (plan + summarize + escalate) - stage_root_dpath = Path(args.stage) - plan_map = _build_plan_unpriv(stage_root_dpath) - - entries_list = plan_map.get("entries_list" ,[]) - print(f"Built plan with {len(entries_list)} entr{'y' if len(entries_list)==1 else 'ies'}") - - total_bytes_int = sum(len(e_map.get("content_bytes") or b"") - for e_map in entries_list if e_map.get("op")=="copy") - print(f"Total bytes to write: {total_bytes_int}") - if args.dry_run: - print("\n--dry-run: would perform the following:") - - for i ,e_map in enumerate(entries_list ,1): - op = e_map.get("op") - dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath") ,e_map.get("dst_fname")) - if op=="copy": - mode_int = e_map.get("mode_int") or 0o644 - owner_name = e_map.get("owner_name") or "?" - size = len(e_map.get("content_bytes") or b"") - print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode_int:04o} owner {owner_name} bytes {size}") - elif op=="displace": - print(f"{i:02d}. displace -> {dst_fpath_str}") - elif op=="delete": - print(f"{i:02d}. delete -> {dst_fpath_str}") - else: - print(f"{i:02d}. ?op? -> {dst_fpath_str}") - - with tempfile.NamedTemporaryFile(prefix="plan_" ,suffix=".cbor" ,delete=False) as tf: - cbor2.dump(plan_map ,tf) - plan_fpath = Path(tf.name) - try: - if args.dry_run: - return _sudo_apply_self(plan_fpath ,dry_run_bool=True) - ans = input("\nProceed with apply under sudo? [y/N] ").strip().lower() - if ans not in ("y","yes"): - print("Aborted.") - return 0 - return _sudo_apply_self(plan_fpath ,dry_run_bool=False) - finally: - try: os.unlink(plan_fpath) - except Exception: pass - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft b/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft deleted file mode 100644 index eaee5be..0000000 --- a/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft +++ /dev/null @@ -1,16 +0,0 @@ -table inet NO-IPV6 { - chain input { - type filter hook input priority raw; policy accept; - meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop - } - - chain output { - type filter hook output priority raw; policy accept; - meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop - } - - chain forward { - type filter hook forward priority raw; policy accept; - meta nfproto ipv6 counter comment "drop all IPv6 forward" drop - } -} diff --git a/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft b/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft deleted file mode 100644 index 6c31446..0000000 --- a/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft +++ /dev/null @@ -1,47 +0,0 @@ -table inet SUBU-DNS-REDIRECT { - chain output { - type nat hook output priority -100; policy accept; - - # Redirect DNS for the subu UIDs to local Unbound listeners - meta skuid 2017 udp dport 53 redirect to :5301 - meta skuid 2018 udp dport 53 redirect to :5302 - meta skuid 2017 tcp dport 53 redirect to :5301 - meta skuid 2018 tcp dport 53 redirect to :5302 - } -} - -table inet SUBU-PORT-EGRESS { - chain output { - type filter hook output priority 0; policy accept; - - # Always allow loopback on egress - oifname "lo" accept - - # No IPv6 for subu (until you reintroduce v6) - meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop - - ##### x6 (UID 2018) - # Block some exfil channels regardless of iface - meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop - meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop - meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop - - # (Optional) allow ICMP echo out via x6 - meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept - - # Enforce interface binding - meta skuid 2018 oifname "x6" accept - meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop - - ##### US (UID 2017) - meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission" - meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN" - meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)" - - # (Optional) ICMP via US - meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept - - meta skuid 2017 oifname "US" accept - meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop - } -} diff --git a/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service b/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service deleted file mode 100644 index ba2919b..0000000 --- a/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service +++ /dev/null @@ -1,19 +0,0 @@ -[Unit] -Description=Unbound DNS instance for %i (per-subu tunnel egress) -After=network-online.target wg-quick@%i.service -Requires=wg-quick@%i.service -Wants=network-online.target - -[Service] -Type=simple -ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf -User=unbound -Group=unbound -Restart=on-failure -RestartSec=2s -AmbientCapabilities=CAP_NET_BIND_SERVICE -CapabilityBoundingSet=CAP_NET_BIND_SERVICE -NoNewPrivileges=true - -[Install] -WantedBy=multi-user.target diff --git a/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf b/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf deleted file mode 100644 index 1995438..0000000 --- a/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf +++ /dev/null @@ -1,18 +0,0 @@ -server: - username: "unbound" - chroot: "" - directory: "/etc/unbound" - do-daemonize: no - interface: 127.0.0.1@5301 - hide-identity: yes - hide-version: yes - harden-glue: yes - harden-dnssec-stripped: yes - qname-minimisation: yes - prefetch: yes - outgoing-interface: 10.0.0.1 - -forward-zone: - name: "." - forward-addr: 1.1.1.1 - forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf b/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf deleted file mode 100644 index ed49241..0000000 --- a/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf +++ /dev/null @@ -1,18 +0,0 @@ -server: - username: "unbound" - chroot: "" - directory: "/etc/unbound" - do-daemonize: no - interface: 127.0.0.1@5302 - hide-identity: yes - hide-version: yes - harden-glue: yes - harden-dnssec-stripped: yes - qname-minimisation: yes - prefetch: yes - outgoing-interface: 10.8.0.2 - -forward-zone: - name: "." - forward-addr: 1.1.1.1 - forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh b/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh deleted file mode 100755 index d4db58e..0000000 --- a/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail -echo "== DNS status ==" -systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true -echo -echo "== nftables ==" -nft list table inet NAT-DNS-REDIRECT || true -echo -echo "== Unbound logs (last 50 lines each) ==" -journalctl -u unbound@US -n 50 --no-pager || true -echo -journalctl -u unbound@x6 -n 50 --no-pager || true diff --git a/developer/source/DNS/stage_show_plan.py b/developer/source/DNS/stage_show_plan.py deleted file mode 100644 index 075e65b..0000000 --- a/developer/source/DNS/stage_show_plan.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan. - -Given: a stage root directory. -Does: loads Stage.py, executes each config, builds a native plan map, summarizes it. -Returns: exit code 0 on success, non-zero on error. -""" -from __future__ import annotations -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from pathlib import Path -import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib - -# ---------- helpers ---------- - -def _load_stage_module(stage_root_dpath: Path): - "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module." - mod_fpath = stage_root_dpath/"Stage.py" - if not mod_fpath.exists(): - raise FileNotFoundError(f"Stage.py not found at {mod_fpath}") - spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath)) - mod = importlib.util.module_from_spec(spec) - sys.modules["Stage"] = mod - assert spec and spec.loader - spec.loader.exec_module(mod) # type: ignore - return mod - -def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]: - "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]." - rel_fpath_list: list[Path] = [] - for p in stage_root_dpath.rglob("*.py"): - if p.name == "Stage.py": continue - if p.is_file(): - rel_fpath_list.append(p.relative_to(stage_root_dpath)) - return sorted(rel_fpath_list ,key=lambda x: x.as_posix()) - -def _sha256_hex(b: bytes)-> str: - "Given: bytes. Does: sha256. Returns: hex string." - return hashlib.sha256(b).hexdigest() - -# ---------- main ---------- - -def main(argv: list[str]|None=None)-> int: - "Given: CLI. Does: show plan. Returns: exit code." - ap = argparse.ArgumentParser(prog="stage_show_plan.py" - ,description="Run staged config scripts and print the resulting plan.") - ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)") - args = ap.parse_args(argv) - - stage_root_dpath = Path(args.stage) - StageMod = _load_stage_module(stage_root_dpath) - Stage = StageMod.Stage - Stage._reset() - Stage.set_meta( - planner_user_name=getpass.getuser() - ,planner_uid_int=os.getuid() - ,planner_gid_int=os.getgid() - ,host_name=socket.gethostname() - ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) - ) - - for rel_fpath in _config_rel_fpaths(stage_root_dpath): - Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath) - runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__") - Stage._end() - - plan_map = Stage.plan_object() - entries_list = plan_map["entries_list"] - print(f"Plan version: {plan_map['version_int']}") - print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} " - f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}") - print(f"Created: {plan_map['meta_map'].get('created_utc_str')}") - print(f"Entries: {len(entries_list)}\n") - - for i ,e_map in enumerate(entries_list ,1): - op = e_map.get("op") - dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}" - if op == "copy": - content = e_map.get("content_bytes") or b"" - sz = len(content) - mode = e_map.get("mode_octal_str") or "????" - owner = e_map.get("owner_name") or "?" - h = _sha256_hex(content) - print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…") - elif op == "displace": - print(f"{i:02d}. displace -> {dst_fpath_str}") - elif op == "delete": - print(f"{i:02d}. delete -> {dst_fpath_str}") - else: - print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})") - return 0 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/stage_test_0/a.conf b/developer/source/DNS/stage_test_0/a.conf deleted file mode 100644 index a3153de..0000000 --- a/developer/source/DNS/stage_test_0/a.conf +++ /dev/null @@ -1 +0,0 @@ -Thomas-developer 0x444 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/b.conf b/developer/source/DNS/stage_test_0/b.conf deleted file mode 100644 index 9e8fc11..0000000 --- a/developer/source/DNS/stage_test_0/b.conf +++ /dev/null @@ -1 +0,0 @@ -Thomas-developer 0640 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/c.conf b/developer/source/DNS/stage_test_0/c.conf deleted file mode 100644 index 7d4330b..0000000 --- a/developer/source/DNS/stage_test_0/c.conf +++ /dev/null @@ -1 +0,0 @@ -Thomas-developer 0444 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/example.py b/developer/source/DNS/stage_test_0/example.py deleted file mode 100644 index 6f2c257..0000000 --- a/developer/source/DNS/stage_test_0/example.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env -S python3 -B -import Stage - -# You can compute these with arbitrary Python if you like -svc = "unbound" -zone = "US" -fname = f"unbound-{zone}.conf" - -Stage.init( - write_file_name="." # '.' → use basename of this file -> 'example_dns.py' -, write_file_directory_path="/etc/unbound" -, write_file_owner="root" -, write_file_permissions=0o644 # or "0644" -, read_file_contents="""\ -# generated config (example) -server: - verbosity: 1 - interface: 127.0.0.1 -""" -) - -# declare the desired operations (no effect in 'noop'/'dry' without a copier) -Stage.displace() -Stage.copy() diff --git a/developer/source/DNS/stage_test_0/name_space/c.conf b/developer/source/DNS/stage_test_0/name_space/c.conf deleted file mode 100644 index 7d4330b..0000000 --- a/developer/source/DNS/stage_test_0/name_space/c.conf +++ /dev/null @@ -1 +0,0 @@ -Thomas-developer 0444 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/stage_ls.py b/developer/source/DNS/stage_test_0/stage_ls.py deleted file mode 100644 index 1e27c26..0000000 --- a/developer/source/DNS/stage_test_0/stage_ls.py +++ /dev/null @@ -1,169 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -stage_ls.py — execute staged Python programs with Stage in 'noop' mode and list metadata. - -For each *.py under --stage (recursively, excluding Stage.py), this tool: - 1) loads Stage.py from the stage root, - 2) switches mode to 'noop' (no side effects, no printing), - 3) executes the program via runpy.run_path(...) with the proper __file__, - 4) collects the resolved write_file_* metadata and declared ops, - 5) prints either list or aligned table, - 6) reports any collected errors. - -This lets admins compute metadata with arbitrary Python while guaranteeing no writes. -""" - -from __future__ import annotations - -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from dataclasses import dataclass -from pathlib import Path -import argparse -import importlib.util ,runpy -import traceback - -# --- utility dataclass (for printing) --- - -@dataclass -class Row: - read_rel: Path - owner: str|None - perm: str|None - write_name: str|None - target_dir: Path|None - ops: list[str] - errors: list[str] - -# --- helpers --- - -def _load_stage_module(stage_root: Path): - """Load Stage.py from stage_root into sys.modules['Stage'] (overwriting if present). Returns the Stage module.""" - stage_py = stage_root/"Stage.py" - if not stage_py.exists(): - raise FileNotFoundError(f"Stage.py not found at {stage_py} — place Stage.py in the stage root.") - spec = importlib.util.spec_from_file_location("Stage" ,str(stage_py)) - if spec is None or spec.loader is None: - raise RuntimeError(f"cannot load Stage module from {stage_py}") - mod = importlib.util.module_from_spec(spec) - sys.modules["Stage"] = mod - spec.loader.exec_module(mod) # type: ignore[union-attr] - return mod - -def _stage_program_paths(stage_root: Path)-> list[Path]: - rels: list[Path] = [] - for p in stage_root.rglob("*.py"): - if p.name == "Stage.py": - continue - try: - if p.is_file(): - rels.append(p.relative_to(stage_root)) - except Exception: - continue - return sorted(rels ,key=lambda x: x.as_posix()) - -def print_list(rows: list[Row])-> None: - for r in rows: - owner = r.owner or "?" - perm = r.perm or "????" - name = r.write_name or "?" - tdir = str(r.target_dir) if r.target_dir is not None else "?" - print(f"{r.read_rel.as_posix()}: {owner} {perm} {name} {tdir}") - -def print_table(rows: list[Row])-> None: - if not rows: - return - a = [r.read_rel.as_posix() for r in rows] - b = [(r.owner or "?") for r in rows] - c = [(r.perm or "????") for r in rows] - d = [(r.write_name or "?") for r in rows] - e = [str(r.target_dir) if r.target_dir is not None else "?" for r in rows] - wa = max(len(s) for s in a) - wb = max(len(s) for s in b) - wc = max(len(s) for s in c) - wd = max(len(s) for s in d) - for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e): - print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}") - -# --- core --- - -def ls_stage(stage_root: Path ,fmt: str="list")-> int: - Stage = _load_stage_module(stage_root) - Stage.Stage.set_mode("noop") # hard safety for this tool - - rows: list[Row] = [] - errs: list[str] = [] - - for rel in _stage_program_paths(stage_root): - abs_path = stage_root/rel - try: - # isolate per-run state - Stage.Stage._current = None - Stage.Stage._all_records.clear() - Stage.Stage._begin(read_rel=rel ,stage_root=stage_root) - - # execute the staged program under its real path - runpy.run_path(str(abs_path) ,run_name="__main__") - - rec = Stage.Stage._end() - if rec is None: - errs.append(f"{rel}: program executed but Stage.init(...) was never called") - continue - - rows.append( - Row( - read_rel=rel - , owner=rec.owner - , perm=rec.perm_octal_str - , write_name=rec.write_name - , target_dir=rec.target_dir - , ops=list(rec.ops) - , errors=list(rec.errors) - ) - ) - - except SystemExit as e: - errs.append(f"{rel}: program called sys.exit({e.code}) during listing") - except Exception: - tb = traceback.format_exc(limit=2) - errs.append(f"{rel}: exception during execution:\n{tb}") - - # print data - if fmt == "table": - print_table(rows) - else: - print_list(rows) - - # print per-row Stage errors - row_errs = [f"{r.read_rel}: {msg}" for r in rows for msg in r.errors] - all_errs = row_errs + errs - if all_errs: - print("\nerror(s):" ,file=sys.stderr) - for e in all_errs: - print(f" - {e}" ,file=sys.stderr) - return 1 - return 0 - -# --- CLI --- - -def main(argv: list[str] | None=None)-> int: - import argparse - ap = argparse.ArgumentParser( - prog="stage_ls.py" - , description="Execute staged Python configs with Stage in 'noop' mode and list resolved metadata." - ) - ap.add_argument("--stage" ,default="stage" ,help="stage directory (default: ./stage)") - ap.add_argument("--format" ,choices=["list","table"] ,default="list" ,help="output format") - args = ap.parse_args(argv) - - stage_root = Path(args.stage) - if not stage_root.exists() or not stage_root.is_dir(): - print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr) - return 2 - - return ls_stage(stage_root ,fmt=args.format) - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/stage_test_0/unbound_conf.py b/developer/source/DNS/stage_test_0/unbound_conf.py new file mode 100644 index 0000000..4f57794 --- /dev/null +++ b/developer/source/DNS/stage_test_0/unbound_conf.py @@ -0,0 +1,15 @@ +# example unbound.conf +def configure(prov, planner, WriteFileMeta): + # use current user for owner, and use this script’s py-less name for the filename + + # owner defaults to root (this is a configuration file installer) + # owner "." means owner of the process running Stagehane + # owner "." is good for testing + + # fname "." means write file has the same name as read file (without .py if it has .py) + # fname "." is the default, so it is redundant here. "." still works in args, even when wfm changes the fname. + + wfm = WriteFileMeta(dpath="stage_test_0_out", fname=".", owner=".") + planner.displace(wfm) + planner.copy(wfm, content="server:\n do-ip6: no\n") +