From: Thomas Walker Lynch Date: Wed, 17 Sep 2025 14:39:53 +0000 (-0700) Subject: mvoing to python cofigurations and Planner X-Git-Url: https://git.reasoningtechnology.com/style/rt_dark_doc.css?a=commitdiff_plain;h=44f448ced313f5f307d651f481f242f7d2f69c4e;p=subu mvoing to python cofigurations and Planner --- diff --git a/developer/source/DNS/Planner.py b/developer/source/DNS/Planner.py new file mode 100644 index 0000000..0da1384 --- /dev/null +++ b/developer/source/DNS/Planner.py @@ -0,0 +1,356 @@ +#!/usr/bin/env -S python3 -B +""" +Planner.py — plan builder for staged configuration (UNPRIVILEGED). + +The Planner accumulates Command objects into a Journal. + +Journal building is orchestrated by the outer runner (e.g., stage_show_plan, stage_cp) +which constructs a Planner per config file and invokes Planner command methods. + +Defaults and provenance come from a PlannerContext instance. You can replace the +context at any time via set_context(ctx). + +The Journal can be exported as CBOR via Journal.to_CBOR_bytes(), and reconstructed +on the privileged side via Journal.from_CBOR_bytes(). + +On-wire field names are snake_case and use explicit suffixes (_str,_bytes,_int, etc.) +to avoid ambiguity. +""" + +from __future__ import annotations + +# no bytecode anywhere (works under sudo/root shells too) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE","1") + +from dataclasses import dataclass ,field +from pathlib import Path +from typing import Any + +# ===== Utilities ===== + +def _norm_perm(value: int|str)-> tuple[int,str]|None: + "Given int or 4-char octal string. Does validate/normalize. Returns (int,'%04o') or None." + if isinstance(value ,int): + if 0 <= value <= 0o7777: + return value ,f"{value:04o}" + return None + if isinstance(value ,str): + s = value.strip() + if len(s)==4 and all(ch in "01234567" for ch in s): + try: + v = int(s ,8) + return v ,s + except Exception: + return None + return None + +def _is_abs_dpath(dpath_str: str)-> bool: + "Given path string. Does quick abs dir check. Returns bool." + return bool(dpath_str) and dpath_str.startswith("/") + +def _join_write_file(dpath_str: str ,fname_str: str)-> str: + "Given dir path string and filename string. Does join. Returns POSIX path string or ''." + if not _is_abs_dpath(dpath_str): return "" + if not fname_str or "/" in fname_str: return "" + return (Path(dpath_str)/fname_str).as_posix() + +# ===== Core data types ===== + +@dataclass(slots=True) +class Command: + """ + Command — a single planned operation. + + Given a command name and an argument map (native values). + Does hold the op name, owns a distinct args map, accumulates errors for this op. + Returns serializable mapping via to_map(). + """ + name_str: str + args_map: dict[str,Any] = field(default_factory=dict) + errors_list: list[str] = field(default_factory=list) + + def add_error(self ,msg_str: str)-> None: + "Given message. Does append to errors_list. Returns None." + self.errors_list.append(msg_str) + + def to_map(self)-> dict[str,Any]: + "Given self. Does convert to a plain dict. Returns {'op','args_map','errors_list'}." + return { + "op": self.name_str + ,"args_map": dict(self.args_map) + ,"errors_list": list(self.errors_list) + } + +@dataclass(slots=True) +class PlannerContext: + """ + PlannerContext — per-config provenance and defaults. + + Given: stage_root_dpath, read_file_rel_fpath, default write_file location/name, + default owner name, default permission (int or '0644'), optional default content. + Does: provide ambient defaults and provenance to Planner methods. + Returns: n/a (data holder). + """ + stage_root_dpath: Path + read_file_rel_fpath: Path + default_write_file_dpath_str: str + default_write_file_fname_str: str + default_owner_name_str: str + default_mode_int: int|None = None + default_mode_octal_str: str|None = None + default_content_bytes: bytes|None = None + + @staticmethod + def from_values(stage_root_dpath: Path + ,read_file_rel_fpath: Path + ,write_file_dpath_str: str + ,write_file_fname_str: str + ,owner_name_str: str + ,perm: int|str + ,content: bytes|str|None + )-> PlannerContext: + "Given raw values. Does normalize perm and content. Returns PlannerContext." + if isinstance(content ,str): + content_b = content.encode("utf-8") + else: + content_b = content + perm_norm = _norm_perm(perm) + if perm_norm is None: + m_int ,m_oct = None ,None + else: + m_int ,m_oct = perm_norm + return PlannerContext( + stage_root_dpath=stage_root_dpath + ,read_file_rel_fpath=read_file_rel_fpath + ,default_write_file_dpath_str=write_file_dpath_str + ,default_write_file_fname_str=write_file_fname_str + ,default_owner_name_str=owner_name_str + ,default_mode_int=m_int + ,default_mode_octal_str=m_oct + ,default_content_bytes=content_b + ) + +@dataclass(slots=True) +class Journal: + """ + Journal — ordered list of Commands plus provenance metadata. + + Given optional meta map. + Does append commands, expose entries, produce plain or CBOR encodings, and rebuild from CBOR. + Returns plain dict via to_map(), bytes via to_CBOR_bytes(), Journal via from_CBOR_bytes(). + """ + meta_map: dict[str,Any] = field(default_factory=dict) + commands_list: list[Command] = field(default_factory=list) + + def set_meta(self ,**kv)-> None: + "Given keyword meta. Does merge into meta_map. Returns None." + self.meta_map.update(kv) + + def append(self ,cmd: Command)-> None: + "Given Command. Does append to commands_list. Returns None." + self.commands_list.append(cmd) + + def entries_list(self)-> list[dict[str,Any]]: + "Given n/a. Does return list of entry dicts (copy). Returns list[dict]." + return [c.to_map() for c in self.commands_list] + + def to_map(self)-> dict[str,Any]: + "Given n/a. Does package a plan map (ready for CBOR). Returns dict." + return { + "version_int": 1 + ,"meta_map": dict(self.meta_map) + ,"entries_list": self.entries_list() + } + + def to_CBOR_bytes(self ,canonical_bool: bool=True)-> bytes: + "Given n/a. Does CBOR-encode to bytes (requires cbor2). Returns bytes." + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"package cbor2 required for to_CBOR_bytes: {e}") + return cbor2.dumps(self.to_map() ,canonical=canonical_bool) + + @staticmethod + def from_CBOR_bytes(data_bytes: bytes)-> Journal: + "Given CBOR bytes. Does decode and rebuild a Journal (Commands + meta). Returns Journal." + try: + import cbor2 + except Exception as e: + raise RuntimeError(f"package cbor2 required for from_CBOR_bytes: {e}") + obj = cbor2.loads(data_bytes) + if not isinstance(obj ,dict): raise ValueError("CBOR root must be a map") + meta = dict(obj.get("meta_map") or {}) + entries = obj.get("entries_list") or [] + j = Journal(meta_map=meta) + for e in entries: + if not isinstance(e ,dict): continue + op = e.get("op") or "?" + args = e.get("args_map") or {} + errs = e.get("errors_list") or [] + j.append(Command(name_str=op ,args_map=dict(args) ,errors_list=list(errs))) + return j + +# ===== Planner ===== + +class Planner: + """ + Planner — constructs a Journal of Commands from config scripts. + + Given: PlannerContext (provenance + defaults). + Does: maintains a Journal; command methods (copy/displace/delete) create Command objects, + fill missing args from context defaults, preflight minimal shape checks, then append. + Returns: accessors for Journal and meta; no I/O or privilege here. + """ + def __init__(self ,ctx: PlannerContext)-> None: + self._ctx = ctx + self._journal = Journal() + # seed provenance; outer tools can add more later + self._journal.set_meta( + source_read_file_rel_fpath_str=ctx.read_file_rel_fpath.as_posix() + ,stage_root_dpath_str=str(ctx.stage_root_dpath) + ) + + # --- Context management --- + + def set_context(self ,ctx: PlannerContext)-> None: + "Given PlannerContext. Does replace current context. Returns None." + self._ctx = ctx + + def context(self)-> PlannerContext: + "Given n/a. Does return current context. Returns PlannerContext." + return self._ctx + + # --- Journal access --- + + def journal(self)-> Journal: + "Given n/a. Does return the Journal (live). Returns Journal." + return self._journal + + # --- Helpers --- + + def _resolve_write_file(self ,write_file_dpath_str: str|None ,write_file_fname_str: str|None)-> tuple[str,str]: + "Given optional write_file dpath/fname. Does fill from context; '.' fname → read_file basename. Returns (dpath,fname)." + dpath_str = write_file_dpath_str if write_file_dpath_str is not None else self._ctx.default_write_file_dpath_str + fname_str = write_file_fname_str if write_file_fname_str is not None else self._ctx.default_write_file_fname_str + if fname_str == ".": + fname_str = self._ctx.read_file_rel_fpath.name + return dpath_str ,fname_str + + def _resolve_owner(self ,owner_name_str: str|None)-> str: + "Given optional owner. Does fill from context. Returns owner string." + return owner_name_str if owner_name_str is not None else self._ctx.default_owner_name_str + + def _resolve_mode(self ,perm: int|str|None)-> tuple[int|None,str|None]: + "Given optional perm. Does normalize or fall back to context. Returns (mode_int,mode_octal_str)." + if perm is None: + return self._ctx.default_mode_int ,self._ctx.default_mode_octal_str + norm = _norm_perm(perm) + return (norm if norm is not None else (None ,None)) + + def _resolve_content(self ,content: bytes|str|None)-> bytes|None: + "Given optional content (bytes or str). Does normalize or fall back to context. Returns bytes|None." + if content is None: + return self._ctx.default_content_bytes + if isinstance(content ,str): + return content.encode("utf-8") + return content + + # --- Command builders --- + + def copy(self + ,* + ,write_file_dpath_str: str|None=None + ,write_file_fname_str: str|None=None + ,owner_name_str: str|None=None + ,perm: int|str|None=None + ,content: bytes|str|None=None + ,read_file_rel_fpath: Path|None=None + )-> Command: + """ + Given: optional overrides for write_file (dpath,fname,owner,perm), content, and read_file_rel_fpath. + Does: build a 'copy' command entry (content is embedded; read_file path kept as provenance). + Returns: Command (also appended to Journal). + """ + cmd = Command("copy") + # resolve basics + wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str) + owner_str = self._resolve_owner(owner_name_str) + mode_int ,mode_oct = self._resolve_mode(perm) + content_b = self._resolve_content(content) + read_rel = (read_file_rel_fpath if read_file_rel_fpath is not None else self._ctx.read_file_rel_fpath) + + # minimal shape checks (well-formedness, not policy) + if not _is_abs_dpath(wf_dpath_str): + cmd.add_error("write_file_dpath_str must be absolute and non-empty") + if not wf_fname_str or "/" in wf_fname_str: + cmd.add_error("write_file_fname_str must be a simple filename (no '/')") + if not owner_str: + cmd.add_error("owner_name_str must be non-empty") + if (mode_int ,mode_oct) == (None ,None): + cmd.add_error("perm must be an int <= 0o7777 or a 4-digit octal string") + if content_b is None: + cmd.add_error("content is required for copy() (bytes or str)") + + cmd.args_map.update({ + "write_file_dpath_str": wf_dpath_str + ,"write_file_fname_str": wf_fname_str + ,"owner_name_str": owner_str + ,"mode_int": mode_int + ,"mode_octal_str": mode_oct + ,"content_bytes": content_b + ,"read_file_rel_fpath_str": read_rel.as_posix() + }) + self._journal.append(cmd) + return cmd + + def displace(self + ,* + ,write_file_dpath_str: str|None=None + ,write_file_fname_str: str|None=None + )-> Command: + """ + Given: optional write_file dpath/fname overrides. + Does: build a 'displace' command (rename existing write_file in-place with UTC suffix). + Returns: Command (appended). + """ + cmd = Command("displace") + wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str) + + if not _is_abs_dpath(wf_dpath_str): + cmd.add_error("write_file_dpath_str must be absolute and non-empty") + if not wf_fname_str or "/" in wf_fname_str: + cmd.add_error("write_file_fname_str must be a simple filename (no '/')") + + cmd.args_map.update({ + "write_file_dpath_str": wf_dpath_str + ,"write_file_fname_str": wf_fname_str + }) + self._journal.append(cmd) + return cmd + + def delete(self + ,* + ,write_file_dpath_str: str|None=None + ,write_file_fname_str: str|None=None + )-> Command: + """ + Given: optional write_file dpath/fname overrides. + Does: build a 'delete' command (unlink if present). + Returns: Command (appended). + """ + cmd = Command("delete") + wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str) + + if not _is_abs_dpath(wf_dpath_str): + cmd.add_error("write_file_dpath_str must be absolute and non-empty") + if not wf_fname_str or "/" in wf_fname_str: + cmd.add_error("write_file_fname_str must be a simple filename (no '/')") + + cmd.args_map.update({ + "write_file_dpath_str": wf_dpath_str + ,"write_file_fname_str": wf_fname_str + }) + self._journal.append(cmd) + return cmd diff --git a/developer/source/DNS/deprecated/stage_ls.py b/developer/source/DNS/deprecated/stage_ls.py new file mode 100755 index 0000000..93dd3d2 --- /dev/null +++ b/developer/source/DNS/deprecated/stage_ls.py @@ -0,0 +1,193 @@ +#!/usr/bin/env -S python3 -B +""" +ls_stage.py — list staged files and their header-declared install metadata. + +Header line format (first line of each file): + + +- owner: username string (need not exist until install time) +- permissions: four octal digits, e.g. 0644 +- write_file_name: '.' means use the read file's basename, else use the given POSIX filename +- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound) + +Output formats: +- list (default): "read_file_path: owner permissions write_file_name target_directory_path" +- table: columns aligned for readability +""" + +from __future__ import annotations + +# never write bytecode (root/sudo friendly) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from dataclasses import dataclass +from pathlib import Path +import argparse +import re + +# === Stage utilities (importable) === + +def stage_read_file_paths(stage_root: Path)-> list[Path]: + """Given: stage_root directory. + Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root. + Returns: list[Path] of POSIX-order sorted relative paths (no leading slash). + """ + rels: list[Path] = [] + for p in stage_root.rglob("*"): + try: + if p.is_file(): # follows symlink-to-file + rels.append(p.relative_to(stage_root)) + except (FileNotFoundError ,RuntimeError): + # broken link or race; skip conservatively + continue + return sorted(rels ,key=lambda x: x.as_posix()) + +@dataclass +class StageRow: + read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged") + owner: str # token[0] + perm_octal_str: str # token[1], exactly as in header (validated ####) + perm_int: int # token[1] parsed as base-8 + write_name: str # token[2] ('.' resolved to read_rel.name) + target_dir: Path # token[3] (Path) + header_raw: str # original header line (sans newline) + + # convenience + def write_abs(self ,root: Path)-> Path: + return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name + +# header parsing rules +_PERM_RE = re.compile(r"^[0-7]{4}$") + +def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]: + """Given: raw first line of a staged file and its stage-relative path. + Does: parse ' ' with max 4 tokens (target_dir may contain spaces if quoted not required). + Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem. + """ + # strip BOM and trailing newline/spaces + h = header.lstrip("\ufeff").strip() + if not h: + return None ,f"empty header line in {read_rel}" + parts = h.split(maxsplit=3) + if len(parts) != 4: + return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}" + owner ,perm_s ,write_name ,target_dir_s = parts + + if not _PERM_RE.fullmatch(perm_s): + return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits" + + # resolve '.' → basename + resolved_write_name = read_rel.name if write_name == "." else write_name + + # MVP guard: write_name should be a single filename (no '/') + if "/" in resolved_write_name: + return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}" + + # target dir may be absolute (recommended) or relative (we treat relative as under the install root) + target_dir = Path(target_dir_s) + + try: + row = StageRow( + read_rel = read_rel + ,owner = owner + ,perm_octal_str = perm_s + ,perm_int = int(perm_s ,8) + ,write_name = resolved_write_name + ,target_dir = target_dir + ,header_raw = h + ) + return row ,None + except Exception as e: + return None ,f"internal parse error in {read_rel}: {e}" + +def read_first_line(p: Path)-> str: + """Return the first line (sans newline). UTF-8 with BOM tolerant.""" + with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh: + line = fh.readline() + return line.rstrip("\n\r") + +def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]: + """Given: stage_root. + Does: enumerate files, parse each header line, collect rows and errors. + Returns: (rows, errors) + """ + rows: list[StageRow] = [] + errs: list[str] = [] + for rel in stage_read_file_paths(stage_root): + abs_path = stage_root / rel + try: + header = read_first_line(abs_path) + except Exception as e: + errs.append(f"read error in {rel}: {e}") + continue + row ,err = parse_stage_header_line(header ,rel) + if err: + errs.append(err) + else: + rows.append(row) # type: ignore[arg-type] + return rows ,errs + +# === Printers === + +def print_list(rows: list[StageRow])-> None: + """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line.""" + for r in rows: + print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}") + +def print_table(rows: list[StageRow])-> None: + """Aligned table printer (no headers, just data in columns).""" + if not rows: + return + a = [r.read_rel.as_posix() for r in rows] + b = [r.owner for r in rows] + c = [r.perm_octal_str for r in rows] + d = [r.write_name for r in rows] + e = [str(r.target_dir) for r in rows] + wa = max(len(s) for s in a) + wb = max(len(s) for s in b) + wc = max(len(s) for s in c) + wd = max(len(s) for s in d) + # e (target_dir) left ragged + for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e): + print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}") + +# === Orchestrator === + +def ls_stage(stage_root: Path ,fmt: str="list")-> int: + """Given: stage_root and output format ('list'|'table'). + Does: scan and parse staged files, print in the requested format; report syntax errors to stderr. + Returns: 0 on success; 1 if any syntax errors were encountered. + """ + rows ,errs = scan_stage(stage_root) + if fmt == "table": + print_table(rows) + else: + print_list(rows) + if errs: + print("\nerror(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 1 + return 0 + +# === CLI === + +def main(argv: list[str] | None=None)-> int: + ap = argparse.ArgumentParser( + prog="ls_stage.py" + ,description="List staged files and their header-declared install metadata." + ) + ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list" + ,help="output format (default: list)") + args = ap.parse_args(argv) + stage_root = Path(args.stage) + if not stage_root.exists() or not stage_root.is_dir(): + print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr) + return 2 + return ls_stage(stage_root ,fmt=args.format) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/plan_show.py b/developer/source/DNS/plan_show.py new file mode 100755 index 0000000..780d34a --- /dev/null +++ b/developer/source/DNS/plan_show.py @@ -0,0 +1,273 @@ +#!/usr/bin/env -S python3 -B +""" +plan_show.py — build and display a staged plan (UNPRIVILEGED). + +Given: a stage directory of config scripts (*.stage.py by default). +Does: executes each script with a pre-created Planner (P) and PlannerContext, + aggregates Commands into a single Journal, by default prints from the CBOR + round-trip (encode→decode) so the human view matches what will be shipped + to stage_cp; runs well-formed (WF) invariant checks. Can emit CBOR if requested. +Returns: exit status 0 on success; 2 on WF errors or usage errors. +""" + +from __future__ import annotations + +# no bytecode anywhere +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import datetime as _dt +import getpass +import runpy + +# local module (same dir): Planner +from Planner import Planner ,PlannerContext ,Journal ,Command + +# ===== Utilities (general / reusable) ===== + +def iso_utc_now_str()-> str: + "Given n/a. Does return compact UTC timestamp. Returns YYYYMMDDTHHMMSSZ." + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def find_configs(stage_root_dpath: Path ,glob_pat_str: str)-> list[Path]: + "Given stage root and glob. Does find matching files under stage. Returns list of absolute Paths." + root = stage_root_dpath.resolve() + return sorted((p for p in root.glob(glob_pat_str) if p.is_file()) ,key=lambda p: p.as_posix()) + +def human_size(n: int)-> str: + "Given byte count. Does format human size. Returns string." + units = ["B","KB","MB","GB","TB"] + i = 0 + x = float(max(0 ,n)) + while x >= 1024 and i < len(units)-1: + x /= 1024.0 + i += 1 + return f"{x:.1f} {units[i]}" + +def _dst_path_str(args_map: dict)-> str: + "Given args map. Does join write_file path. Returns POSIX path or '?'." + d = args_map.get("write_file_dpath_str") or "" + f = args_map.get("write_file_fname_str") or "" + try: + if d and f and "/" not in f: + return (Path(d)/f).as_posix() + except Exception: + pass + return "?" + +# ===== WF invariants (MVP) ===== +# These are “well-formedness” rules (shape/encoding/domain), not policy or privilege checks. + +def wf_check(journal: Journal)-> list[str]: + """ + Given Journal. Does run invariant checks on meta and each Command entry. Returns list of error strings. + Invariants (MVP): + - meta_map: must include generator identity and stage_root_dpath_str. + - entry.op ∈ {'copy','displace','delete'} + - all ops: write_file_dpath_str absolute; write_file_fname_str is bare filename. + - copy: owner_name_str non-empty; mode_int ∈ [0..0o7777] and no suid/sgid; content_bytes present (bytes). + """ + errs: list[str] = [] + meta = journal.meta_map or {} + + # meta presence (light placeholder) + if not isinstance(meta ,dict): + errs.append("WF_META: meta_map must be a map") + else: + if not meta.get("stage_root_dpath_str"): + errs.append("WF_META: missing stage_root_dpath_str") + if not meta.get("generator_prog_str"): + errs.append("WF_META: missing generator_prog_str") + + # entries + for idx ,cmd in enumerate(journal.commands_list ,1): + prefix = f"WF[{idx:02d}]" + if not isinstance(cmd ,Command): + errs.append(f"{prefix}: entry is not Command") + continue + op = cmd.name_str + if op not in {"copy","displace","delete"}: + errs.append(f"{prefix}: unknown op '{op}'") + continue + am = cmd.args_map or {} + dpath = am.get("write_file_dpath_str") + fname = am.get("write_file_fname_str") + if not isinstance(dpath ,str) or not dpath.startswith("/"): + errs.append(f"{prefix}: write_file_dpath_str must be absolute") + if not isinstance(fname ,str) or not fname or "/" in fname: + errs.append(f"{prefix}: write_file_fname_str must be a bare filename") + + if op == "copy": + owner = am.get("owner_name_str") + mode = am.get("mode_int") + data = am.get("content_bytes") + if not isinstance(owner ,str) or not owner.strip(): + errs.append(f"{prefix}: owner_name_str must be non-empty") + if not isinstance(mode ,int) or not (0 <= mode <= 0o7777): + errs.append(f"{prefix}: mode_int must be int in [0..0o7777]") + elif (mode & 0o6000): + errs.append(f"{prefix}: mode_int suid/sgid not allowed in MVP") + if not isinstance(data ,(bytes,bytearray)): + errs.append(f"{prefix}: content_bytes must be bytes") + return errs + +# ===== Planner execution ===== + +def _run_one_config(config_abs_fpath: Path ,stage_root_dpath: Path)-> Planner: + """ + Given abs path to a config script and stage root. Does construct a PlannerContext and Planner, + then executes the script with 'P' (Planner instance) bound in globals. Returns Planner with Journal. + Notes: + - Defaults are intentionally spartan; config should refine them via P.set_context(...). + - This is UNPRIVILEGED; no filesystem changes are performed here. + """ + read_rel = config_abs_fpath.resolve().relative_to(stage_root_dpath.resolve()) + ctx = PlannerContext.from_values( + stage_root_dpath=stage_root_dpath + ,read_file_rel_fpath=read_rel + ,write_file_dpath_str="/" + ,write_file_fname_str="." + ,owner_name_str=getpass.getuser() + ,perm=0o644 + ,content=None + ) + P = Planner(ctx) + g = {"Planner": Planner ,"PlannerContext": PlannerContext ,"P": P} + runpy.run_path(str(config_abs_fpath) ,init_globals=g) + return P + +def _aggregate_journal(planners_list: list[Planner] ,stage_root_dpath: Path)-> Journal: + "Given planners and stage root. Does aggregate Commands into a single Journal with meta. Returns Journal." + J = Journal() + J.set_meta( + version_int=1 + ,generator_prog_str="plan_show.py" + ,generated_at_utc_str=iso_utc_now_str() + ,user_name_str=getpass.getuser() + ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown" + ,stage_root_dpath_str=str(stage_root_dpath.resolve()) + ,configs_list=[p.context().read_file_rel_fpath.as_posix() for p in planners_list] + ) + for p in planners_list: + for cmd in p.journal().commands_list: + J.append(cmd) + return J + +def _print_plan(journal: Journal)-> None: + "Given Journal. Does print a readable summary. Returns None." + meta = journal.meta_map or {} + print(f"Stage: {meta.get('stage_root_dpath_str','?')}") + print(f"Generated: {meta.get('generated_at_utc_str','?')} by {meta.get('user_name_str','?')}@{meta.get('host_name_str','?')}\n") + + entries = journal.commands_list + if not entries: + print("(plan is empty)") + return + + n_copy = sum(1 for c in entries if c.name_str=="copy") + n_disp = sum(1 for c in entries if c.name_str=="displace") + n_del = sum(1 for c in entries if c.name_str=="delete") + print(f"Entries: {len(entries)} copy:{n_copy} displace:{n_disp} delete:{n_del}\n") + + for i ,cmd in enumerate(entries ,1): + am = cmd.args_map + dst = _dst_path_str(am) + if cmd.name_str == "copy": + size = len(am.get("content_bytes") or b"") + mode = am.get("mode_int") + owner = am.get("owner_name_str") + print(f"{i:02d}. copy -> {dst} mode {mode:04o} owner {owner} bytes {size} ({human_size(size)})") + elif cmd.name_str == "displace": + print(f"{i:02d}. displace -> {dst}") + elif cmd.name_str == "delete": + print(f"{i:02d}. delete -> {dst}") + else: + print(f"{i:02d}. ?op? -> {dst}") + +def _maybe_emit_CBOR(journal: Journal ,emit_CBOR_fpath: Path|None)-> None: + "Given Journal and optional path. Does write CBOR if requested. Returns None." + if not emit_CBOR_fpath: + return + try: + data = journal.to_CBOR_bytes(canonical_bool=True) + except Exception as e: + print(f"error: CBOR encode failed: {e}" ,file=sys.stderr) + raise + emit_CBOR_fpath.parent.mkdir(parents=True ,exist_ok=True) + with open(emit_CBOR_fpath ,"wb") as fh: + fh.write(data) + print(f"\nWrote CBOR plan: {emit_CBOR_fpath} ({len(data)} bytes)") + +# ===== CLI ===== + +def main(argv: list[str]|None=None)-> int: + "Given CLI. Does discover configs, build plan, (optionally) CBOR round-trip before printing, run WF, optionally emit CBOR. Returns exit code." + ap = argparse.ArgumentParser(prog="plan_show.py" + ,description="Build and show a staged plan (no privilege, no apply).") + ap.add_argument("--stage",default="stage",help="stage directory root (default: ./stage)") + ap.add_argument("--glob",default="**/*.stage.py",help="glob for config scripts under --stage") + ap.add_argument("--emit-CBOR",default=None,help="write CBOR plan to this path (optional)") + ap.add_argument("--print-from-journal",action="store_true" + ,help="print directly from in-memory Journal (skip CBOR round-trip)") + args = ap.parse_args(argv) + + stage_root_dpath = Path(args.stage) + if not stage_root_dpath.is_dir(): + print(f"error: --stage not a directory: {stage_root_dpath}" ,file=sys.stderr) + return 2 + + configs = find_configs(stage_root_dpath ,args.glob) + if not configs: + print("No config scripts found.") + return 0 + + planners: list[Planner] = [] + for cfg in configs: + try: + planners.append(_run_one_config(cfg ,stage_root_dpath)) + except SystemExit: + raise + except Exception as e: + print(f"error: executing {cfg}: {e}" ,file=sys.stderr) + return 2 + + journal_src = _aggregate_journal(planners ,stage_root_dpath) + + if not args.print_from_journal: + try: + cbor_bytes = journal_src.to_CBOR_bytes(canonical_bool=True) + journal = Journal.from_CBOR_bytes(cbor_bytes) + except Exception as e: + print(f"error: CBOR round-trip failed: {e}" ,file=sys.stderr) + return 2 + else: + journal = journal_src + + _print_plan(journal) + + errs = wf_check(journal) + if errs: + print("\nerror(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 2 + + emit = Path(args.emit_CBOR) if args.emit_CBOR else None + if emit: + try: + data = (cbor_bytes if not args.print_from_journal else journal_src.to_CBOR_bytes(canonical_bool=True)) + emit.parent.mkdir(parents=True ,exist_ok=True) + with open(emit ,"wb") as fh: + fh.write(data) + print(f"\nWrote CBOR plan: {emit} ({len(data)} bytes)") + except Exception as e: + print(f"error: failed to write CBOR: {e}" ,file=sys.stderr) + return 2 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/stage_cp.py b/developer/source/DNS/stage_cp.py new file mode 100644 index 0000000..0114853 --- /dev/null +++ b/developer/source/DNS/stage_cp.py @@ -0,0 +1,322 @@ +#!/usr/bin/env -S python3 -B +""" +stage_cp.py — build a CBOR plan from staged configs; show, validate, and apply with privilege. + +Given: a stage root directory. +Does: (user) run configs → build native plan → WF checks → summarize → encode plan → sudo re-exec + (root) decode plan → VALID + SANITY → apply ops (displace/copy/delete) safely. +Returns: exit code. + +Requires: pip install cbor2 +""" +from __future__ import annotations +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,tempfile ,subprocess ,pwd +from typing import Any +import cbor2 + +# ---------- small utils ---------- + +def _load_stage_module(stage_root_dpath: Path): + "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module." + mod_fpath = stage_root_dpath/"Stage.py" + if not mod_fpath.exists(): + raise FileNotFoundError(f"Stage.py not found at {mod_fpath}") + spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath)) + mod = importlib.util.module_from_spec(spec) + sys.modules["Stage"] = mod + assert spec and spec.loader + spec.loader.exec_module(mod) # type: ignore + return mod + +def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]: + "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]." + rel_fpath_list: list[Path] = [] + for p in stage_root_dpath.rglob("*.py"): + if p.name == "Stage.py": continue + if p.is_file(): + rel_fpath_list.append(p.relative_to(stage_root_dpath)) + return sorted(rel_fpath_list ,key=lambda x: x.as_posix()) + +def _sha256_bytes(b: bytes)-> bytes: + "Given: bytes. Does: sha256. Returns: 32-byte digest." + return hashlib.sha256(b).digest() + +def _dst_fpath_str(dst_dpath_str: str ,dst_fname_str: str)-> str: + "Given: a directory path string and a filename string. Does: join. Returns: combined POSIX path string." + if "/" in dst_fname_str: + return "" # invalid; WF will flag + return str((Path(dst_dpath_str)/dst_fname_str)) + +# ---------- WF / VALID / SANITY ---------- + +_ALLOWLIST_PREFIXES_LIST = ["/etc" ,"/usr/local" ,"/etc/systemd/system"] + +def wf_check(plan_map: dict[str,Any])-> list[str]: + "Given: plan map. Does: shape/lexical checks only. Returns: list of error strings." + errs_list: list[str] = [] + if plan_map.get("version_int") != 1: + errs_list.append("WF_VERSION: unsupported plan version") + entries_list = plan_map.get("entries_list") + if not isinstance(entries_list ,list): + errs_list.append("WF_ENTRIES: 'entries_list' missing or not a list") + return errs_list + for i ,e_map in enumerate(entries_list ,1): + op = e_map.get("op") + dst_dpath_str = e_map.get("dst_dpath") + dst_fname_str = e_map.get("dst_fname") + where = f"entry {i}" + if op not in ("copy","displace","delete"): + errs_list.append(f"WF_OP:{where}: invalid op {op!r}") + continue + if not isinstance(dst_dpath_str ,str) or not dst_dpath_str: + errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath missing or not str") + if not isinstance(dst_fname_str ,str) or not dst_fname_str: + errs_list.append(f"WF_DST_FNAME:{where}: dst_fname missing or not str") + if isinstance(dst_fname_str ,str) and "/" in dst_fname_str: + errs_list.append(f"WF_DST_FNAME:{where}: dst_fname must not contain '/'") + if isinstance(dst_dpath_str ,str) and not dst_dpath_str.startswith("/"): + errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath must be absolute") + full_fpath_str = _dst_fpath_str(dst_dpath_str or "" ,dst_fname_str or "") + if not full_fpath_str or not full_fpath_str.startswith("/"): + errs_list.append(f"WF_PATH:{where}: failed to construct absolute path from dst_dpath/fname") + if op == "copy": + mode_int = e_map.get("mode_int") + if not isinstance(mode_int ,int) or not (0 <= mode_int <= 0o7777): + errs_list.append(f"WF_MODE:{where}: mode_int must be int in [0..0o7777]") + if isinstance(mode_int ,int) and (mode_int & 0o6000): + errs_list.append(f"WF_MODE:{where}: suid/sgid bits not allowed in MVP") + owner_name = e_map.get("owner_name") + if not isinstance(owner_name ,str) or not owner_name: + errs_list.append(f"WF_OWNER:{where}: owner_name must be non-empty username string") + content_bytes = e_map.get("content_bytes") + if not (isinstance(content_bytes ,(bytes,bytearray)) and len(content_bytes) >= 0): + errs_list.append(f"WF_CONTENT:{where}: content_bytes must be bytes (may be empty)") + sha = e_map.get("sha256_bytes") + if sha is not None: + if not isinstance(sha ,(bytes,bytearray)) or len(sha)!=32: + errs_list.append(f"WF_SHA256:{where}: sha256_bytes must be 32-byte digest if present") + elif isinstance(content_bytes ,(bytes,bytearray)) and sha != _sha256_bytes(content_bytes): + errs_list.append(f"WF_SHA256_MISMATCH:{where}: sha256_bytes does not match content_bytes") + return errs_list + +def valid_check(plan_map: dict[str,Any])-> list[str]: + "Given: plan map. Does: environment (read-only) checks. Returns: list of error strings." + errs_list: list[str] = [] + for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1): + op = e_map.get("op") + dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")) + where = f"entry {i}" + try: + parent_dpath = Path(dst_fpath_str).parent + if not parent_dpath.exists(): + errs_list.append(f"VAL_PARENT_MISSING:{where}: parent dir does not exist: {parent_dpath}") + elif not parent_dpath.is_dir(): + errs_list.append(f"VAL_PARENT_NOT_DIR:{where}: parent is not a directory: {parent_dpath}") + if Path(dst_fpath_str).is_dir(): + errs_list.append(f"VAL_DST_IS_DIR:{where}: destination exists as a directory: {dst_fpath_str}") + if op == "copy": + owner_name = e_map.get("owner_name") + try: + pw = pwd.getpwnam(owner_name) # may raise KeyError + e_map["_resolved_uid_int"] = pw.pw_uid + e_map["_resolved_gid_int"] = pw.pw_gid + except Exception: + errs_list.append(f"VAL_OWNER_UNKNOWN:{where}: user not found: {owner_name!r}") + except Exception as x: + errs_list.append(f"VAL_EXCEPTION:{where}: {x}") + return errs_list + +def sanity_check(plan_map: dict[str,Any])-> list[str]: + "Given: plan map. Does: policy checks (allowlist, denials). Returns: list of error strings." + errs_list: list[str] = [] + for i ,e_map in enumerate(plan_map.get("entries_list",[]) ,1): + dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")) + where = f"entry {i}" + if not any(dst_fpath_str.startswith(pref + "/") or dst_fpath_str==pref for pref in _ALLOWLIST_PREFIXES_LIST): + errs_list.append(f"POL_PATH_DENY:{where}: destination outside allowlist: {dst_fpath_str}") + return errs_list + +# ---------- APPLY (root) ---------- + +def _utc_str()-> str: + "Given: n/a. Does: current UTC compact. Returns: string." + import datetime as _dt + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _ensure_parent_dirs(dst_fpath: Path)-> None: + "Given: destination file path. Does: create parents. Returns: None." + dst_fpath.parent.mkdir(parents=True ,exist_ok=True) + +def _displace_in_place(dst_fpath: Path)-> None: + "Given: destination file path. Does: rename existing file/symlink to add UTC suffix. Returns: None." + try: + if dst_fpath.exists() or dst_fpath.is_symlink(): + suffix = "_" + _utc_str() + dst_fpath.rename(dst_fpath.with_name(dst_fpath.name + suffix)) + except FileNotFoundError: + pass + +def _apply_copy(dst_fpath: Path ,content_bytes: bytes ,mode_int: int ,uid_int: int ,gid_int: int)-> None: + "Given: target, bytes, mode, uid, gid. Does: write temp, fsync, chmod/chown, atomic replace. Returns: None." + _ensure_parent_dirs(dst_fpath) + _displace_in_place(dst_fpath) + tmp_fpath = dst_fpath.with_name("." + dst_fpath.name + ".stage_tmp") + with open(tmp_fpath ,"wb") as fh: + fh.write(content_bytes) + fh.flush() + os.fsync(fh.fileno()) + try: + os.chmod(tmp_fpath ,mode_int & 0o777) + except Exception: + pass + try: + os.chown(tmp_fpath ,uid_int ,gid_int) + except Exception: + pass + os.replace(tmp_fpath ,dst_fpath) # atomic within same dir/device + +def _apply_delete(dst_fpath: Path)-> None: + "Given: target file path. Does: unlink file/symlink if present. Returns: None." + try: + if dst_fpath.is_symlink() or dst_fpath.is_file(): + dst_fpath.unlink() + except FileNotFoundError: + pass + +def apply_plan(plan_map: dict[str,Any] ,dry_run_bool: bool=False)-> int: + "Given: plan map and dry flag. Does: execute ops sequentially. Returns: exit code." + for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1): + op = e_map.get("op") + dst_fpath = Path(_dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))) + if op == "displace": + print(f"+ displace {dst_fpath}") + if not dry_run_bool: + _displace_in_place(dst_fpath) + elif op == "delete": + print(f"+ delete {dst_fpath}") + if not dry_run_bool: + _apply_delete(dst_fpath) + elif op == "copy": + mode_int = e_map.get("mode_int") or 0o644 + uid_int = e_map.get("_resolved_uid_int" ,0) + gid_int = e_map.get("_resolved_gid_int" ,0) + content_bytes = e_map.get("content_bytes") or b"" + print(f"+ copy {dst_fpath} mode {mode_int:04o} uid {uid_int} gid {gid_int} bytes {len(content_bytes)}") + if not dry_run_bool: + _apply_copy(dst_fpath ,content_bytes ,mode_int ,uid_int ,gid_int) + else: + print(f"! unknown op {op} (skipping)") + return 2 + return 0 + +# ---------- orchestration ---------- + +def _build_plan_unpriv(stage_root_dpath: Path)-> dict[str,Any]: + "Given: stage root. Does: execute configs, accumulate entries, add sha256. Returns: plan map." + StageMod = _load_stage_module(stage_root_dpath) + Stage = StageMod.Stage + Stage._reset() + Stage.set_meta( + planner_user_name=getpass.getuser() + ,planner_uid_int=os.getuid() + ,planner_gid_int=os.getgid() + ,host_name=socket.gethostname() + ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) + ) + for rel_fpath in _config_rel_fpaths(stage_root_dpath): + Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath) + runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__") + Stage._end() + for e_map in Stage.plan_entries(): + if e_map.get("op") == "copy" and isinstance(e_map.get("content_bytes") ,(bytes,bytearray)): + e_map["sha256_bytes"] = _sha256_bytes(e_map["content_bytes"]) + return Stage.plan_object() + +def _sudo_apply_self(plan_fpath: Path ,dry_run_bool: bool)-> int: + "Given: plan file path and dry flag. Does: sudo re-exec current script with --apply. Returns: exit code." + cmd_list = ["sudo",sys.executable,os.path.abspath(__file__),"--apply" + ,"--plan",str(plan_fpath)] + if dry_run_bool: + cmd_list.append("--dry-run") + return subprocess.call(cmd_list) + +def main(argv: list[str]|None=None)-> int: + "Given: CLI. Does: plan, WF (user) then VALID+SANITY+APPLY (root). Returns: exit code." + ap = argparse.ArgumentParser(prog="stage_cp.py" + ,description="Plan staged config application and apply with sudo.") + ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--dry-run",action="store_true",help="validate and show actions, do not change files") + ap.add_argument("--apply",action="store_true",help=argparse.SUPPRESS) # internal (root path) + ap.add_argument("--plan",default=None,help=argparse.SUPPRESS) # internal (root path) + args = ap.parse_args(argv) + + # Root path (apply) + if args.apply: + if os.geteuid() != 0: + print("error: --apply requires root" ,file=sys.stderr) + return 2 + if not args.plan: + print("error: --plan path required for --apply" ,file=sys.stderr) + return 2 + with open(args.plan ,"rb") as fh: + plan_map = cbor2.load(fh) + val_errs = valid_check(plan_map) + pol_errs = sanity_check(plan_map) + if val_errs or pol_errs: + print("error(s) during validation/sanity:" ,file=sys.stderr) + for e in val_errs: print(f" - {e}" ,file=sys.stderr) + for e in pol_errs: print(f" - {e}" ,file=sys.stderr) + return 2 + rc = apply_plan(plan_map ,dry_run_bool=args.dry_run) + return rc + + # User path (plan + summarize + escalate) + stage_root_dpath = Path(args.stage) + plan_map = _build_plan_unpriv(stage_root_dpath) + + entries_list = plan_map.get("entries_list" ,[]) + print(f"Built plan with {len(entries_list)} entr{'y' if len(entries_list)==1 else 'ies'}") + + total_bytes_int = sum(len(e_map.get("content_bytes") or b"") + for e_map in entries_list if e_map.get("op")=="copy") + print(f"Total bytes to write: {total_bytes_int}") + if args.dry_run: + print("\n--dry-run: would perform the following:") + + for i ,e_map in enumerate(entries_list ,1): + op = e_map.get("op") + dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath") ,e_map.get("dst_fname")) + if op=="copy": + mode_int = e_map.get("mode_int") or 0o644 + owner_name = e_map.get("owner_name") or "?" + size = len(e_map.get("content_bytes") or b"") + print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode_int:04o} owner {owner_name} bytes {size}") + elif op=="displace": + print(f"{i:02d}. displace -> {dst_fpath_str}") + elif op=="delete": + print(f"{i:02d}. delete -> {dst_fpath_str}") + else: + print(f"{i:02d}. ?op? -> {dst_fpath_str}") + + with tempfile.NamedTemporaryFile(prefix="plan_" ,suffix=".cbor" ,delete=False) as tf: + cbor2.dump(plan_map ,tf) + plan_fpath = Path(tf.name) + try: + if args.dry_run: + return _sudo_apply_self(plan_fpath ,dry_run_bool=True) + ans = input("\nProceed with apply under sudo? [y/N] ").strip().lower() + if ans not in ("y","yes"): + print("Aborted.") + return 0 + return _sudo_apply_self(plan_fpath ,dry_run_bool=False) + finally: + try: os.unlink(plan_fpath) + except Exception: pass + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/stage_ls.py b/developer/source/DNS/stage_ls.py deleted file mode 100755 index 93dd3d2..0000000 --- a/developer/source/DNS/stage_ls.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -ls_stage.py — list staged files and their header-declared install metadata. - -Header line format (first line of each file): - - -- owner: username string (need not exist until install time) -- permissions: four octal digits, e.g. 0644 -- write_file_name: '.' means use the read file's basename, else use the given POSIX filename -- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound) - -Output formats: -- list (default): "read_file_path: owner permissions write_file_name target_directory_path" -- table: columns aligned for readability -""" - -from __future__ import annotations - -# never write bytecode (root/sudo friendly) -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from dataclasses import dataclass -from pathlib import Path -import argparse -import re - -# === Stage utilities (importable) === - -def stage_read_file_paths(stage_root: Path)-> list[Path]: - """Given: stage_root directory. - Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root. - Returns: list[Path] of POSIX-order sorted relative paths (no leading slash). - """ - rels: list[Path] = [] - for p in stage_root.rglob("*"): - try: - if p.is_file(): # follows symlink-to-file - rels.append(p.relative_to(stage_root)) - except (FileNotFoundError ,RuntimeError): - # broken link or race; skip conservatively - continue - return sorted(rels ,key=lambda x: x.as_posix()) - -@dataclass -class StageRow: - read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged") - owner: str # token[0] - perm_octal_str: str # token[1], exactly as in header (validated ####) - perm_int: int # token[1] parsed as base-8 - write_name: str # token[2] ('.' resolved to read_rel.name) - target_dir: Path # token[3] (Path) - header_raw: str # original header line (sans newline) - - # convenience - def write_abs(self ,root: Path)-> Path: - return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name - -# header parsing rules -_PERM_RE = re.compile(r"^[0-7]{4}$") - -def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]: - """Given: raw first line of a staged file and its stage-relative path. - Does: parse ' ' with max 4 tokens (target_dir may contain spaces if quoted not required). - Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem. - """ - # strip BOM and trailing newline/spaces - h = header.lstrip("\ufeff").strip() - if not h: - return None ,f"empty header line in {read_rel}" - parts = h.split(maxsplit=3) - if len(parts) != 4: - return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}" - owner ,perm_s ,write_name ,target_dir_s = parts - - if not _PERM_RE.fullmatch(perm_s): - return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits" - - # resolve '.' → basename - resolved_write_name = read_rel.name if write_name == "." else write_name - - # MVP guard: write_name should be a single filename (no '/') - if "/" in resolved_write_name: - return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}" - - # target dir may be absolute (recommended) or relative (we treat relative as under the install root) - target_dir = Path(target_dir_s) - - try: - row = StageRow( - read_rel = read_rel - ,owner = owner - ,perm_octal_str = perm_s - ,perm_int = int(perm_s ,8) - ,write_name = resolved_write_name - ,target_dir = target_dir - ,header_raw = h - ) - return row ,None - except Exception as e: - return None ,f"internal parse error in {read_rel}: {e}" - -def read_first_line(p: Path)-> str: - """Return the first line (sans newline). UTF-8 with BOM tolerant.""" - with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh: - line = fh.readline() - return line.rstrip("\n\r") - -def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]: - """Given: stage_root. - Does: enumerate files, parse each header line, collect rows and errors. - Returns: (rows, errors) - """ - rows: list[StageRow] = [] - errs: list[str] = [] - for rel in stage_read_file_paths(stage_root): - abs_path = stage_root / rel - try: - header = read_first_line(abs_path) - except Exception as e: - errs.append(f"read error in {rel}: {e}") - continue - row ,err = parse_stage_header_line(header ,rel) - if err: - errs.append(err) - else: - rows.append(row) # type: ignore[arg-type] - return rows ,errs - -# === Printers === - -def print_list(rows: list[StageRow])-> None: - """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line.""" - for r in rows: - print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}") - -def print_table(rows: list[StageRow])-> None: - """Aligned table printer (no headers, just data in columns).""" - if not rows: - return - a = [r.read_rel.as_posix() for r in rows] - b = [r.owner for r in rows] - c = [r.perm_octal_str for r in rows] - d = [r.write_name for r in rows] - e = [str(r.target_dir) for r in rows] - wa = max(len(s) for s in a) - wb = max(len(s) for s in b) - wc = max(len(s) for s in c) - wd = max(len(s) for s in d) - # e (target_dir) left ragged - for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e): - print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}") - -# === Orchestrator === - -def ls_stage(stage_root: Path ,fmt: str="list")-> int: - """Given: stage_root and output format ('list'|'table'). - Does: scan and parse staged files, print in the requested format; report syntax errors to stderr. - Returns: 0 on success; 1 if any syntax errors were encountered. - """ - rows ,errs = scan_stage(stage_root) - if fmt == "table": - print_table(rows) - else: - print_list(rows) - if errs: - print("\nerror(s):" ,file=sys.stderr) - for e in errs: - print(f" - {e}" ,file=sys.stderr) - return 1 - return 0 - -# === CLI === - -def main(argv: list[str] | None=None)-> int: - ap = argparse.ArgumentParser( - prog="ls_stage.py" - ,description="List staged files and their header-declared install metadata." - ) - ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") - ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list" - ,help="output format (default: list)") - args = ap.parse_args(argv) - stage_root = Path(args.stage) - if not stage_root.exists() or not stage_root.is_dir(): - print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr) - return 2 - return ls_stage(stage_root ,fmt=args.format) - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/stage_show_plan.py b/developer/source/DNS/stage_show_plan.py new file mode 100644 index 0000000..075e65b --- /dev/null +++ b/developer/source/DNS/stage_show_plan.py @@ -0,0 +1,97 @@ +#!/usr/bin/env -S python3 -B +""" +stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan. + +Given: a stage root directory. +Does: loads Stage.py, executes each config, builds a native plan map, summarizes it. +Returns: exit code 0 on success, non-zero on error. +""" +from __future__ import annotations +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib + +# ---------- helpers ---------- + +def _load_stage_module(stage_root_dpath: Path): + "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module." + mod_fpath = stage_root_dpath/"Stage.py" + if not mod_fpath.exists(): + raise FileNotFoundError(f"Stage.py not found at {mod_fpath}") + spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath)) + mod = importlib.util.module_from_spec(spec) + sys.modules["Stage"] = mod + assert spec and spec.loader + spec.loader.exec_module(mod) # type: ignore + return mod + +def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]: + "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]." + rel_fpath_list: list[Path] = [] + for p in stage_root_dpath.rglob("*.py"): + if p.name == "Stage.py": continue + if p.is_file(): + rel_fpath_list.append(p.relative_to(stage_root_dpath)) + return sorted(rel_fpath_list ,key=lambda x: x.as_posix()) + +def _sha256_hex(b: bytes)-> str: + "Given: bytes. Does: sha256. Returns: hex string." + return hashlib.sha256(b).hexdigest() + +# ---------- main ---------- + +def main(argv: list[str]|None=None)-> int: + "Given: CLI. Does: show plan. Returns: exit code." + ap = argparse.ArgumentParser(prog="stage_show_plan.py" + ,description="Run staged config scripts and print the resulting plan.") + ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)") + args = ap.parse_args(argv) + + stage_root_dpath = Path(args.stage) + StageMod = _load_stage_module(stage_root_dpath) + Stage = StageMod.Stage + Stage._reset() + Stage.set_meta( + planner_user_name=getpass.getuser() + ,planner_uid_int=os.getuid() + ,planner_gid_int=os.getgid() + ,host_name=socket.gethostname() + ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime()) + ) + + for rel_fpath in _config_rel_fpaths(stage_root_dpath): + Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath) + runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__") + Stage._end() + + plan_map = Stage.plan_object() + entries_list = plan_map["entries_list"] + print(f"Plan version: {plan_map['version_int']}") + print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} " + f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}") + print(f"Created: {plan_map['meta_map'].get('created_utc_str')}") + print(f"Entries: {len(entries_list)}\n") + + for i ,e_map in enumerate(entries_list ,1): + op = e_map.get("op") + dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}" + if op == "copy": + content = e_map.get("content_bytes") or b"" + sz = len(content) + mode = e_map.get("mode_octal_str") or "????" + owner = e_map.get("owner_name") or "?" + h = _sha256_hex(content) + print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…") + elif op == "displace": + print(f"{i:02d}. displace -> {dst_fpath_str}") + elif op == "delete": + print(f"{i:02d}. delete -> {dst_fpath_str}") + else: + print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})") + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/stage_test_0/example.py b/developer/source/DNS/stage_test_0/example.py new file mode 100644 index 0000000..6f2c257 --- /dev/null +++ b/developer/source/DNS/stage_test_0/example.py @@ -0,0 +1,24 @@ +#!/usr/bin/env -S python3 -B +import Stage + +# You can compute these with arbitrary Python if you like +svc = "unbound" +zone = "US" +fname = f"unbound-{zone}.conf" + +Stage.init( + write_file_name="." # '.' → use basename of this file -> 'example_dns.py' +, write_file_directory_path="/etc/unbound" +, write_file_owner="root" +, write_file_permissions=0o644 # or "0644" +, read_file_contents="""\ +# generated config (example) +server: + verbosity: 1 + interface: 127.0.0.1 +""" +) + +# declare the desired operations (no effect in 'noop'/'dry' without a copier) +Stage.displace() +Stage.copy() diff --git a/developer/source/DNS/stage_test_0/stage_ls.py b/developer/source/DNS/stage_test_0/stage_ls.py new file mode 100644 index 0000000..1e27c26 --- /dev/null +++ b/developer/source/DNS/stage_test_0/stage_ls.py @@ -0,0 +1,169 @@ +#!/usr/bin/env -S python3 -B +""" +stage_ls.py — execute staged Python programs with Stage in 'noop' mode and list metadata. + +For each *.py under --stage (recursively, excluding Stage.py), this tool: + 1) loads Stage.py from the stage root, + 2) switches mode to 'noop' (no side effects, no printing), + 3) executes the program via runpy.run_path(...) with the proper __file__, + 4) collects the resolved write_file_* metadata and declared ops, + 5) prints either list or aligned table, + 6) reports any collected errors. + +This lets admins compute metadata with arbitrary Python while guaranteeing no writes. +""" + +from __future__ import annotations + +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from dataclasses import dataclass +from pathlib import Path +import argparse +import importlib.util ,runpy +import traceback + +# --- utility dataclass (for printing) --- + +@dataclass +class Row: + read_rel: Path + owner: str|None + perm: str|None + write_name: str|None + target_dir: Path|None + ops: list[str] + errors: list[str] + +# --- helpers --- + +def _load_stage_module(stage_root: Path): + """Load Stage.py from stage_root into sys.modules['Stage'] (overwriting if present). Returns the Stage module.""" + stage_py = stage_root/"Stage.py" + if not stage_py.exists(): + raise FileNotFoundError(f"Stage.py not found at {stage_py} — place Stage.py in the stage root.") + spec = importlib.util.spec_from_file_location("Stage" ,str(stage_py)) + if spec is None or spec.loader is None: + raise RuntimeError(f"cannot load Stage module from {stage_py}") + mod = importlib.util.module_from_spec(spec) + sys.modules["Stage"] = mod + spec.loader.exec_module(mod) # type: ignore[union-attr] + return mod + +def _stage_program_paths(stage_root: Path)-> list[Path]: + rels: list[Path] = [] + for p in stage_root.rglob("*.py"): + if p.name == "Stage.py": + continue + try: + if p.is_file(): + rels.append(p.relative_to(stage_root)) + except Exception: + continue + return sorted(rels ,key=lambda x: x.as_posix()) + +def print_list(rows: list[Row])-> None: + for r in rows: + owner = r.owner or "?" + perm = r.perm or "????" + name = r.write_name or "?" + tdir = str(r.target_dir) if r.target_dir is not None else "?" + print(f"{r.read_rel.as_posix()}: {owner} {perm} {name} {tdir}") + +def print_table(rows: list[Row])-> None: + if not rows: + return + a = [r.read_rel.as_posix() for r in rows] + b = [(r.owner or "?") for r in rows] + c = [(r.perm or "????") for r in rows] + d = [(r.write_name or "?") for r in rows] + e = [str(r.target_dir) if r.target_dir is not None else "?" for r in rows] + wa = max(len(s) for s in a) + wb = max(len(s) for s in b) + wc = max(len(s) for s in c) + wd = max(len(s) for s in d) + for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e): + print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}") + +# --- core --- + +def ls_stage(stage_root: Path ,fmt: str="list")-> int: + Stage = _load_stage_module(stage_root) + Stage.Stage.set_mode("noop") # hard safety for this tool + + rows: list[Row] = [] + errs: list[str] = [] + + for rel in _stage_program_paths(stage_root): + abs_path = stage_root/rel + try: + # isolate per-run state + Stage.Stage._current = None + Stage.Stage._all_records.clear() + Stage.Stage._begin(read_rel=rel ,stage_root=stage_root) + + # execute the staged program under its real path + runpy.run_path(str(abs_path) ,run_name="__main__") + + rec = Stage.Stage._end() + if rec is None: + errs.append(f"{rel}: program executed but Stage.init(...) was never called") + continue + + rows.append( + Row( + read_rel=rel + , owner=rec.owner + , perm=rec.perm_octal_str + , write_name=rec.write_name + , target_dir=rec.target_dir + , ops=list(rec.ops) + , errors=list(rec.errors) + ) + ) + + except SystemExit as e: + errs.append(f"{rel}: program called sys.exit({e.code}) during listing") + except Exception: + tb = traceback.format_exc(limit=2) + errs.append(f"{rel}: exception during execution:\n{tb}") + + # print data + if fmt == "table": + print_table(rows) + else: + print_list(rows) + + # print per-row Stage errors + row_errs = [f"{r.read_rel}: {msg}" for r in rows for msg in r.errors] + all_errs = row_errs + errs + if all_errs: + print("\nerror(s):" ,file=sys.stderr) + for e in all_errs: + print(f" - {e}" ,file=sys.stderr) + return 1 + return 0 + +# --- CLI --- + +def main(argv: list[str] | None=None)-> int: + import argparse + ap = argparse.ArgumentParser( + prog="stage_ls.py" + , description="Execute staged Python configs with Stage in 'noop' mode and list resolved metadata." + ) + ap.add_argument("--stage" ,default="stage" ,help="stage directory (default: ./stage)") + ap.add_argument("--format" ,choices=["list","table"] ,default="list" ,help="output format") + args = ap.parse_args(argv) + + stage_root = Path(args.stage) + if not stage_root.exists() or not stage_root.is_dir(): + print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr) + return 2 + + return ls_stage(stage_root ,fmt=args.format) + +if __name__ == "__main__": + sys.exit(main())