--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+Planner.py — plan builder for staged configuration (UNPRIVILEGED).
+
+The Planner accumulates Command objects into a Journal.
+
+Journal building is orchestrated by the outer runner (e.g., stage_show_plan, stage_cp)
+which constructs a Planner per config file and invokes Planner command methods.
+
+Defaults and provenance come from a PlannerContext instance. You can replace the
+context at any time via set_context(ctx).
+
+The Journal can be exported as CBOR via Journal.to_CBOR_bytes(), and reconstructed
+on the privileged side via Journal.from_CBOR_bytes().
+
+On-wire field names are snake_case and use explicit suffixes (_str,_bytes,_int, etc.)
+to avoid ambiguity.
+"""
+
+from __future__ import annotations
+
+# no bytecode anywhere (works under sudo/root shells too)
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE","1")
+
+from dataclasses import dataclass ,field
+from pathlib import Path
+from typing import Any
+
+# ===== Utilities =====
+
+def _norm_perm(value: int|str)-> tuple[int,str]|None:
+ "Given int or 4-char octal string. Does validate/normalize. Returns (int,'%04o') or None."
+ if isinstance(value ,int):
+ if 0 <= value <= 0o7777:
+ return value ,f"{value:04o}"
+ return None
+ if isinstance(value ,str):
+ s = value.strip()
+ if len(s)==4 and all(ch in "01234567" for ch in s):
+ try:
+ v = int(s ,8)
+ return v ,s
+ except Exception:
+ return None
+ return None
+
+def _is_abs_dpath(dpath_str: str)-> bool:
+ "Given path string. Does quick abs dir check. Returns bool."
+ return bool(dpath_str) and dpath_str.startswith("/")
+
+def _join_write_file(dpath_str: str ,fname_str: str)-> str:
+ "Given dir path string and filename string. Does join. Returns POSIX path string or ''."
+ if not _is_abs_dpath(dpath_str): return ""
+ if not fname_str or "/" in fname_str: return ""
+ return (Path(dpath_str)/fname_str).as_posix()
+
+# ===== Core data types =====
+
+@dataclass(slots=True)
+class Command:
+ """
+ Command — a single planned operation.
+
+ Given a command name and an argument map (native values).
+ Does hold the op name, owns a distinct args map, accumulates errors for this op.
+ Returns serializable mapping via to_map().
+ """
+ name_str: str
+ args_map: dict[str,Any] = field(default_factory=dict)
+ errors_list: list[str] = field(default_factory=list)
+
+ def add_error(self ,msg_str: str)-> None:
+ "Given message. Does append to errors_list. Returns None."
+ self.errors_list.append(msg_str)
+
+ def to_map(self)-> dict[str,Any]:
+ "Given self. Does convert to a plain dict. Returns {'op','args_map','errors_list'}."
+ return {
+ "op": self.name_str
+ ,"args_map": dict(self.args_map)
+ ,"errors_list": list(self.errors_list)
+ }
+
+@dataclass(slots=True)
+class PlannerContext:
+ """
+ PlannerContext — per-config provenance and defaults.
+
+ Given: stage_root_dpath, read_file_rel_fpath, default write_file location/name,
+ default owner name, default permission (int or '0644'), optional default content.
+ Does: provide ambient defaults and provenance to Planner methods.
+ Returns: n/a (data holder).
+ """
+ stage_root_dpath: Path
+ read_file_rel_fpath: Path
+ default_write_file_dpath_str: str
+ default_write_file_fname_str: str
+ default_owner_name_str: str
+ default_mode_int: int|None = None
+ default_mode_octal_str: str|None = None
+ default_content_bytes: bytes|None = None
+
+ @staticmethod
+ def from_values(stage_root_dpath: Path
+ ,read_file_rel_fpath: Path
+ ,write_file_dpath_str: str
+ ,write_file_fname_str: str
+ ,owner_name_str: str
+ ,perm: int|str
+ ,content: bytes|str|None
+ )-> PlannerContext:
+ "Given raw values. Does normalize perm and content. Returns PlannerContext."
+ if isinstance(content ,str):
+ content_b = content.encode("utf-8")
+ else:
+ content_b = content
+ perm_norm = _norm_perm(perm)
+ if perm_norm is None:
+ m_int ,m_oct = None ,None
+ else:
+ m_int ,m_oct = perm_norm
+ return PlannerContext(
+ stage_root_dpath=stage_root_dpath
+ ,read_file_rel_fpath=read_file_rel_fpath
+ ,default_write_file_dpath_str=write_file_dpath_str
+ ,default_write_file_fname_str=write_file_fname_str
+ ,default_owner_name_str=owner_name_str
+ ,default_mode_int=m_int
+ ,default_mode_octal_str=m_oct
+ ,default_content_bytes=content_b
+ )
+
+@dataclass(slots=True)
+class Journal:
+ """
+ Journal — ordered list of Commands plus provenance metadata.
+
+ Given optional meta map.
+ Does append commands, expose entries, produce plain or CBOR encodings, and rebuild from CBOR.
+ Returns plain dict via to_map(), bytes via to_CBOR_bytes(), Journal via from_CBOR_bytes().
+ """
+ meta_map: dict[str,Any] = field(default_factory=dict)
+ commands_list: list[Command] = field(default_factory=list)
+
+ def set_meta(self ,**kv)-> None:
+ "Given keyword meta. Does merge into meta_map. Returns None."
+ self.meta_map.update(kv)
+
+ def append(self ,cmd: Command)-> None:
+ "Given Command. Does append to commands_list. Returns None."
+ self.commands_list.append(cmd)
+
+ def entries_list(self)-> list[dict[str,Any]]:
+ "Given n/a. Does return list of entry dicts (copy). Returns list[dict]."
+ return [c.to_map() for c in self.commands_list]
+
+ def to_map(self)-> dict[str,Any]:
+ "Given n/a. Does package a plan map (ready for CBOR). Returns dict."
+ return {
+ "version_int": 1
+ ,"meta_map": dict(self.meta_map)
+ ,"entries_list": self.entries_list()
+ }
+
+ def to_CBOR_bytes(self ,canonical_bool: bool=True)-> bytes:
+ "Given n/a. Does CBOR-encode to bytes (requires cbor2). Returns bytes."
+ try:
+ import cbor2
+ except Exception as e:
+ raise RuntimeError(f"package cbor2 required for to_CBOR_bytes: {e}")
+ return cbor2.dumps(self.to_map() ,canonical=canonical_bool)
+
+ @staticmethod
+ def from_CBOR_bytes(data_bytes: bytes)-> Journal:
+ "Given CBOR bytes. Does decode and rebuild a Journal (Commands + meta). Returns Journal."
+ try:
+ import cbor2
+ except Exception as e:
+ raise RuntimeError(f"package cbor2 required for from_CBOR_bytes: {e}")
+ obj = cbor2.loads(data_bytes)
+ if not isinstance(obj ,dict): raise ValueError("CBOR root must be a map")
+ meta = dict(obj.get("meta_map") or {})
+ entries = obj.get("entries_list") or []
+ j = Journal(meta_map=meta)
+ for e in entries:
+ if not isinstance(e ,dict): continue
+ op = e.get("op") or "?"
+ args = e.get("args_map") or {}
+ errs = e.get("errors_list") or []
+ j.append(Command(name_str=op ,args_map=dict(args) ,errors_list=list(errs)))
+ return j
+
+# ===== Planner =====
+
+class Planner:
+ """
+ Planner — constructs a Journal of Commands from config scripts.
+
+ Given: PlannerContext (provenance + defaults).
+ Does: maintains a Journal; command methods (copy/displace/delete) create Command objects,
+ fill missing args from context defaults, preflight minimal shape checks, then append.
+ Returns: accessors for Journal and meta; no I/O or privilege here.
+ """
+ def __init__(self ,ctx: PlannerContext)-> None:
+ self._ctx = ctx
+ self._journal = Journal()
+ # seed provenance; outer tools can add more later
+ self._journal.set_meta(
+ source_read_file_rel_fpath_str=ctx.read_file_rel_fpath.as_posix()
+ ,stage_root_dpath_str=str(ctx.stage_root_dpath)
+ )
+
+ # --- Context management ---
+
+ def set_context(self ,ctx: PlannerContext)-> None:
+ "Given PlannerContext. Does replace current context. Returns None."
+ self._ctx = ctx
+
+ def context(self)-> PlannerContext:
+ "Given n/a. Does return current context. Returns PlannerContext."
+ return self._ctx
+
+ # --- Journal access ---
+
+ def journal(self)-> Journal:
+ "Given n/a. Does return the Journal (live). Returns Journal."
+ return self._journal
+
+ # --- Helpers ---
+
+ def _resolve_write_file(self ,write_file_dpath_str: str|None ,write_file_fname_str: str|None)-> tuple[str,str]:
+ "Given optional write_file dpath/fname. Does fill from context; '.' fname → read_file basename. Returns (dpath,fname)."
+ dpath_str = write_file_dpath_str if write_file_dpath_str is not None else self._ctx.default_write_file_dpath_str
+ fname_str = write_file_fname_str if write_file_fname_str is not None else self._ctx.default_write_file_fname_str
+ if fname_str == ".":
+ fname_str = self._ctx.read_file_rel_fpath.name
+ return dpath_str ,fname_str
+
+ def _resolve_owner(self ,owner_name_str: str|None)-> str:
+ "Given optional owner. Does fill from context. Returns owner string."
+ return owner_name_str if owner_name_str is not None else self._ctx.default_owner_name_str
+
+ def _resolve_mode(self ,perm: int|str|None)-> tuple[int|None,str|None]:
+ "Given optional perm. Does normalize or fall back to context. Returns (mode_int,mode_octal_str)."
+ if perm is None:
+ return self._ctx.default_mode_int ,self._ctx.default_mode_octal_str
+ norm = _norm_perm(perm)
+ return (norm if norm is not None else (None ,None))
+
+ def _resolve_content(self ,content: bytes|str|None)-> bytes|None:
+ "Given optional content (bytes or str). Does normalize or fall back to context. Returns bytes|None."
+ if content is None:
+ return self._ctx.default_content_bytes
+ if isinstance(content ,str):
+ return content.encode("utf-8")
+ return content
+
+ # --- Command builders ---
+
+ def copy(self
+ ,*
+ ,write_file_dpath_str: str|None=None
+ ,write_file_fname_str: str|None=None
+ ,owner_name_str: str|None=None
+ ,perm: int|str|None=None
+ ,content: bytes|str|None=None
+ ,read_file_rel_fpath: Path|None=None
+ )-> Command:
+ """
+ Given: optional overrides for write_file (dpath,fname,owner,perm), content, and read_file_rel_fpath.
+ Does: build a 'copy' command entry (content is embedded; read_file path kept as provenance).
+ Returns: Command (also appended to Journal).
+ """
+ cmd = Command("copy")
+ # resolve basics
+ wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
+ owner_str = self._resolve_owner(owner_name_str)
+ mode_int ,mode_oct = self._resolve_mode(perm)
+ content_b = self._resolve_content(content)
+ read_rel = (read_file_rel_fpath if read_file_rel_fpath is not None else self._ctx.read_file_rel_fpath)
+
+ # minimal shape checks (well-formedness, not policy)
+ if not _is_abs_dpath(wf_dpath_str):
+ cmd.add_error("write_file_dpath_str must be absolute and non-empty")
+ if not wf_fname_str or "/" in wf_fname_str:
+ cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
+ if not owner_str:
+ cmd.add_error("owner_name_str must be non-empty")
+ if (mode_int ,mode_oct) == (None ,None):
+ cmd.add_error("perm must be an int <= 0o7777 or a 4-digit octal string")
+ if content_b is None:
+ cmd.add_error("content is required for copy() (bytes or str)")
+
+ cmd.args_map.update({
+ "write_file_dpath_str": wf_dpath_str
+ ,"write_file_fname_str": wf_fname_str
+ ,"owner_name_str": owner_str
+ ,"mode_int": mode_int
+ ,"mode_octal_str": mode_oct
+ ,"content_bytes": content_b
+ ,"read_file_rel_fpath_str": read_rel.as_posix()
+ })
+ self._journal.append(cmd)
+ return cmd
+
+ def displace(self
+ ,*
+ ,write_file_dpath_str: str|None=None
+ ,write_file_fname_str: str|None=None
+ )-> Command:
+ """
+ Given: optional write_file dpath/fname overrides.
+ Does: build a 'displace' command (rename existing write_file in-place with UTC suffix).
+ Returns: Command (appended).
+ """
+ cmd = Command("displace")
+ wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
+
+ if not _is_abs_dpath(wf_dpath_str):
+ cmd.add_error("write_file_dpath_str must be absolute and non-empty")
+ if not wf_fname_str or "/" in wf_fname_str:
+ cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
+
+ cmd.args_map.update({
+ "write_file_dpath_str": wf_dpath_str
+ ,"write_file_fname_str": wf_fname_str
+ })
+ self._journal.append(cmd)
+ return cmd
+
+ def delete(self
+ ,*
+ ,write_file_dpath_str: str|None=None
+ ,write_file_fname_str: str|None=None
+ )-> Command:
+ """
+ Given: optional write_file dpath/fname overrides.
+ Does: build a 'delete' command (unlink if present).
+ Returns: Command (appended).
+ """
+ cmd = Command("delete")
+ wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
+
+ if not _is_abs_dpath(wf_dpath_str):
+ cmd.add_error("write_file_dpath_str must be absolute and non-empty")
+ if not wf_fname_str or "/" in wf_fname_str:
+ cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
+
+ cmd.args_map.update({
+ "write_file_dpath_str": wf_dpath_str
+ ,"write_file_fname_str": wf_fname_str
+ })
+ self._journal.append(cmd)
+ return cmd
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+ls_stage.py — list staged files and their header-declared install metadata.
+
+Header line format (first line of each file):
+ <owner> <permissions> <write_file_name> <target_directory_path>
+
+- owner: username string (need not exist until install time)
+- permissions: four octal digits, e.g. 0644
+- write_file_name: '.' means use the read file's basename, else use the given POSIX filename
+- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound)
+
+Output formats:
+- list (default): "read_file_path: owner permissions write_file_name target_directory_path"
+- table: columns aligned for readability
+"""
+
+from __future__ import annotations
+
+# never write bytecode (root/sudo friendly)
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from dataclasses import dataclass
+from pathlib import Path
+import argparse
+import re
+
+# === Stage utilities (importable) ===
+
+def stage_read_file_paths(stage_root: Path)-> list[Path]:
+ """Given: stage_root directory.
+ Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root.
+ Returns: list[Path] of POSIX-order sorted relative paths (no leading slash).
+ """
+ rels: list[Path] = []
+ for p in stage_root.rglob("*"):
+ try:
+ if p.is_file(): # follows symlink-to-file
+ rels.append(p.relative_to(stage_root))
+ except (FileNotFoundError ,RuntimeError):
+ # broken link or race; skip conservatively
+ continue
+ return sorted(rels ,key=lambda x: x.as_posix())
+
+@dataclass
+class StageRow:
+ read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged")
+ owner: str # token[0]
+ perm_octal_str: str # token[1], exactly as in header (validated ####)
+ perm_int: int # token[1] parsed as base-8
+ write_name: str # token[2] ('.' resolved to read_rel.name)
+ target_dir: Path # token[3] (Path)
+ header_raw: str # original header line (sans newline)
+
+ # convenience
+ def write_abs(self ,root: Path)-> Path:
+ return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name
+
+# header parsing rules
+_PERM_RE = re.compile(r"^[0-7]{4}$")
+
+def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]:
+ """Given: raw first line of a staged file and its stage-relative path.
+ Does: parse '<owner> <perm> <write_name> <target_dir>' with max 4 tokens (target_dir may contain spaces if quoted not required).
+ Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem.
+ """
+ # strip BOM and trailing newline/spaces
+ h = header.lstrip("\ufeff").strip()
+ if not h:
+ return None ,f"empty header line in {read_rel}"
+ parts = h.split(maxsplit=3)
+ if len(parts) != 4:
+ return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}"
+ owner ,perm_s ,write_name ,target_dir_s = parts
+
+ if not _PERM_RE.fullmatch(perm_s):
+ return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits"
+
+ # resolve '.' → basename
+ resolved_write_name = read_rel.name if write_name == "." else write_name
+
+ # MVP guard: write_name should be a single filename (no '/')
+ if "/" in resolved_write_name:
+ return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}"
+
+ # target dir may be absolute (recommended) or relative (we treat relative as under the install root)
+ target_dir = Path(target_dir_s)
+
+ try:
+ row = StageRow(
+ read_rel = read_rel
+ ,owner = owner
+ ,perm_octal_str = perm_s
+ ,perm_int = int(perm_s ,8)
+ ,write_name = resolved_write_name
+ ,target_dir = target_dir
+ ,header_raw = h
+ )
+ return row ,None
+ except Exception as e:
+ return None ,f"internal parse error in {read_rel}: {e}"
+
+def read_first_line(p: Path)-> str:
+ """Return the first line (sans newline). UTF-8 with BOM tolerant."""
+ with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh:
+ line = fh.readline()
+ return line.rstrip("\n\r")
+
+def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]:
+ """Given: stage_root.
+ Does: enumerate files, parse each header line, collect rows and errors.
+ Returns: (rows, errors)
+ """
+ rows: list[StageRow] = []
+ errs: list[str] = []
+ for rel in stage_read_file_paths(stage_root):
+ abs_path = stage_root / rel
+ try:
+ header = read_first_line(abs_path)
+ except Exception as e:
+ errs.append(f"read error in {rel}: {e}")
+ continue
+ row ,err = parse_stage_header_line(header ,rel)
+ if err:
+ errs.append(err)
+ else:
+ rows.append(row) # type: ignore[arg-type]
+ return rows ,errs
+
+# === Printers ===
+
+def print_list(rows: list[StageRow])-> None:
+ """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line."""
+ for r in rows:
+ print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}")
+
+def print_table(rows: list[StageRow])-> None:
+ """Aligned table printer (no headers, just data in columns)."""
+ if not rows:
+ return
+ a = [r.read_rel.as_posix() for r in rows]
+ b = [r.owner for r in rows]
+ c = [r.perm_octal_str for r in rows]
+ d = [r.write_name for r in rows]
+ e = [str(r.target_dir) for r in rows]
+ wa = max(len(s) for s in a)
+ wb = max(len(s) for s in b)
+ wc = max(len(s) for s in c)
+ wd = max(len(s) for s in d)
+ # e (target_dir) left ragged
+ for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
+ print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}")
+
+# === Orchestrator ===
+
+def ls_stage(stage_root: Path ,fmt: str="list")-> int:
+ """Given: stage_root and output format ('list'|'table').
+ Does: scan and parse staged files, print in the requested format; report syntax errors to stderr.
+ Returns: 0 on success; 1 if any syntax errors were encountered.
+ """
+ rows ,errs = scan_stage(stage_root)
+ if fmt == "table":
+ print_table(rows)
+ else:
+ print_list(rows)
+ if errs:
+ print("\nerror(s):" ,file=sys.stderr)
+ for e in errs:
+ print(f" - {e}" ,file=sys.stderr)
+ return 1
+ return 0
+
+# === CLI ===
+
+def main(argv: list[str] | None=None)-> int:
+ ap = argparse.ArgumentParser(
+ prog="ls_stage.py"
+ ,description="List staged files and their header-declared install metadata."
+ )
+ ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
+ ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list"
+ ,help="output format (default: list)")
+ args = ap.parse_args(argv)
+ stage_root = Path(args.stage)
+ if not stage_root.exists() or not stage_root.is_dir():
+ print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
+ return 2
+ return ls_stage(stage_root ,fmt=args.format)
+
+if __name__ == "__main__":
+ sys.exit(main())
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+plan_show.py — build and display a staged plan (UNPRIVILEGED).
+
+Given: a stage directory of config scripts (*.stage.py by default).
+Does: executes each script with a pre-created Planner (P) and PlannerContext,
+ aggregates Commands into a single Journal, by default prints from the CBOR
+ round-trip (encode→decode) so the human view matches what will be shipped
+ to stage_cp; runs well-formed (WF) invariant checks. Can emit CBOR if requested.
+Returns: exit status 0 on success; 2 on WF errors or usage errors.
+"""
+
+from __future__ import annotations
+
+# no bytecode anywhere
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse
+import datetime as _dt
+import getpass
+import runpy
+
+# local module (same dir): Planner
+from Planner import Planner ,PlannerContext ,Journal ,Command
+
+# ===== Utilities (general / reusable) =====
+
+def iso_utc_now_str()-> str:
+ "Given n/a. Does return compact UTC timestamp. Returns YYYYMMDDTHHMMSSZ."
+ return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+
+def find_configs(stage_root_dpath: Path ,glob_pat_str: str)-> list[Path]:
+ "Given stage root and glob. Does find matching files under stage. Returns list of absolute Paths."
+ root = stage_root_dpath.resolve()
+ return sorted((p for p in root.glob(glob_pat_str) if p.is_file()) ,key=lambda p: p.as_posix())
+
+def human_size(n: int)-> str:
+ "Given byte count. Does format human size. Returns string."
+ units = ["B","KB","MB","GB","TB"]
+ i = 0
+ x = float(max(0 ,n))
+ while x >= 1024 and i < len(units)-1:
+ x /= 1024.0
+ i += 1
+ return f"{x:.1f} {units[i]}"
+
+def _dst_path_str(args_map: dict)-> str:
+ "Given args map. Does join write_file path. Returns POSIX path or '?'."
+ d = args_map.get("write_file_dpath_str") or ""
+ f = args_map.get("write_file_fname_str") or ""
+ try:
+ if d and f and "/" not in f:
+ return (Path(d)/f).as_posix()
+ except Exception:
+ pass
+ return "?"
+
+# ===== WF invariants (MVP) =====
+# These are “well-formedness” rules (shape/encoding/domain), not policy or privilege checks.
+
+def wf_check(journal: Journal)-> list[str]:
+ """
+ Given Journal. Does run invariant checks on meta and each Command entry. Returns list of error strings.
+ Invariants (MVP):
+ - meta_map: must include generator identity and stage_root_dpath_str.
+ - entry.op ∈ {'copy','displace','delete'}
+ - all ops: write_file_dpath_str absolute; write_file_fname_str is bare filename.
+ - copy: owner_name_str non-empty; mode_int ∈ [0..0o7777] and no suid/sgid; content_bytes present (bytes).
+ """
+ errs: list[str] = []
+ meta = journal.meta_map or {}
+
+ # meta presence (light placeholder)
+ if not isinstance(meta ,dict):
+ errs.append("WF_META: meta_map must be a map")
+ else:
+ if not meta.get("stage_root_dpath_str"):
+ errs.append("WF_META: missing stage_root_dpath_str")
+ if not meta.get("generator_prog_str"):
+ errs.append("WF_META: missing generator_prog_str")
+
+ # entries
+ for idx ,cmd in enumerate(journal.commands_list ,1):
+ prefix = f"WF[{idx:02d}]"
+ if not isinstance(cmd ,Command):
+ errs.append(f"{prefix}: entry is not Command")
+ continue
+ op = cmd.name_str
+ if op not in {"copy","displace","delete"}:
+ errs.append(f"{prefix}: unknown op '{op}'")
+ continue
+ am = cmd.args_map or {}
+ dpath = am.get("write_file_dpath_str")
+ fname = am.get("write_file_fname_str")
+ if not isinstance(dpath ,str) or not dpath.startswith("/"):
+ errs.append(f"{prefix}: write_file_dpath_str must be absolute")
+ if not isinstance(fname ,str) or not fname or "/" in fname:
+ errs.append(f"{prefix}: write_file_fname_str must be a bare filename")
+
+ if op == "copy":
+ owner = am.get("owner_name_str")
+ mode = am.get("mode_int")
+ data = am.get("content_bytes")
+ if not isinstance(owner ,str) or not owner.strip():
+ errs.append(f"{prefix}: owner_name_str must be non-empty")
+ if not isinstance(mode ,int) or not (0 <= mode <= 0o7777):
+ errs.append(f"{prefix}: mode_int must be int in [0..0o7777]")
+ elif (mode & 0o6000):
+ errs.append(f"{prefix}: mode_int suid/sgid not allowed in MVP")
+ if not isinstance(data ,(bytes,bytearray)):
+ errs.append(f"{prefix}: content_bytes must be bytes")
+ return errs
+
+# ===== Planner execution =====
+
+def _run_one_config(config_abs_fpath: Path ,stage_root_dpath: Path)-> Planner:
+ """
+ Given abs path to a config script and stage root. Does construct a PlannerContext and Planner,
+ then executes the script with 'P' (Planner instance) bound in globals. Returns Planner with Journal.
+ Notes:
+ - Defaults are intentionally spartan; config should refine them via P.set_context(...).
+ - This is UNPRIVILEGED; no filesystem changes are performed here.
+ """
+ read_rel = config_abs_fpath.resolve().relative_to(stage_root_dpath.resolve())
+ ctx = PlannerContext.from_values(
+ stage_root_dpath=stage_root_dpath
+ ,read_file_rel_fpath=read_rel
+ ,write_file_dpath_str="/"
+ ,write_file_fname_str="."
+ ,owner_name_str=getpass.getuser()
+ ,perm=0o644
+ ,content=None
+ )
+ P = Planner(ctx)
+ g = {"Planner": Planner ,"PlannerContext": PlannerContext ,"P": P}
+ runpy.run_path(str(config_abs_fpath) ,init_globals=g)
+ return P
+
+def _aggregate_journal(planners_list: list[Planner] ,stage_root_dpath: Path)-> Journal:
+ "Given planners and stage root. Does aggregate Commands into a single Journal with meta. Returns Journal."
+ J = Journal()
+ J.set_meta(
+ version_int=1
+ ,generator_prog_str="plan_show.py"
+ ,generated_at_utc_str=iso_utc_now_str()
+ ,user_name_str=getpass.getuser()
+ ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown"
+ ,stage_root_dpath_str=str(stage_root_dpath.resolve())
+ ,configs_list=[p.context().read_file_rel_fpath.as_posix() for p in planners_list]
+ )
+ for p in planners_list:
+ for cmd in p.journal().commands_list:
+ J.append(cmd)
+ return J
+
+def _print_plan(journal: Journal)-> None:
+ "Given Journal. Does print a readable summary. Returns None."
+ meta = journal.meta_map or {}
+ print(f"Stage: {meta.get('stage_root_dpath_str','?')}")
+ print(f"Generated: {meta.get('generated_at_utc_str','?')} by {meta.get('user_name_str','?')}@{meta.get('host_name_str','?')}\n")
+
+ entries = journal.commands_list
+ if not entries:
+ print("(plan is empty)")
+ return
+
+ n_copy = sum(1 for c in entries if c.name_str=="copy")
+ n_disp = sum(1 for c in entries if c.name_str=="displace")
+ n_del = sum(1 for c in entries if c.name_str=="delete")
+ print(f"Entries: {len(entries)} copy:{n_copy} displace:{n_disp} delete:{n_del}\n")
+
+ for i ,cmd in enumerate(entries ,1):
+ am = cmd.args_map
+ dst = _dst_path_str(am)
+ if cmd.name_str == "copy":
+ size = len(am.get("content_bytes") or b"")
+ mode = am.get("mode_int")
+ owner = am.get("owner_name_str")
+ print(f"{i:02d}. copy -> {dst} mode {mode:04o} owner {owner} bytes {size} ({human_size(size)})")
+ elif cmd.name_str == "displace":
+ print(f"{i:02d}. displace -> {dst}")
+ elif cmd.name_str == "delete":
+ print(f"{i:02d}. delete -> {dst}")
+ else:
+ print(f"{i:02d}. ?op? -> {dst}")
+
+def _maybe_emit_CBOR(journal: Journal ,emit_CBOR_fpath: Path|None)-> None:
+ "Given Journal and optional path. Does write CBOR if requested. Returns None."
+ if not emit_CBOR_fpath:
+ return
+ try:
+ data = journal.to_CBOR_bytes(canonical_bool=True)
+ except Exception as e:
+ print(f"error: CBOR encode failed: {e}" ,file=sys.stderr)
+ raise
+ emit_CBOR_fpath.parent.mkdir(parents=True ,exist_ok=True)
+ with open(emit_CBOR_fpath ,"wb") as fh:
+ fh.write(data)
+ print(f"\nWrote CBOR plan: {emit_CBOR_fpath} ({len(data)} bytes)")
+
+# ===== CLI =====
+
+def main(argv: list[str]|None=None)-> int:
+ "Given CLI. Does discover configs, build plan, (optionally) CBOR round-trip before printing, run WF, optionally emit CBOR. Returns exit code."
+ ap = argparse.ArgumentParser(prog="plan_show.py"
+ ,description="Build and show a staged plan (no privilege, no apply).")
+ ap.add_argument("--stage",default="stage",help="stage directory root (default: ./stage)")
+ ap.add_argument("--glob",default="**/*.stage.py",help="glob for config scripts under --stage")
+ ap.add_argument("--emit-CBOR",default=None,help="write CBOR plan to this path (optional)")
+ ap.add_argument("--print-from-journal",action="store_true"
+ ,help="print directly from in-memory Journal (skip CBOR round-trip)")
+ args = ap.parse_args(argv)
+
+ stage_root_dpath = Path(args.stage)
+ if not stage_root_dpath.is_dir():
+ print(f"error: --stage not a directory: {stage_root_dpath}" ,file=sys.stderr)
+ return 2
+
+ configs = find_configs(stage_root_dpath ,args.glob)
+ if not configs:
+ print("No config scripts found.")
+ return 0
+
+ planners: list[Planner] = []
+ for cfg in configs:
+ try:
+ planners.append(_run_one_config(cfg ,stage_root_dpath))
+ except SystemExit:
+ raise
+ except Exception as e:
+ print(f"error: executing {cfg}: {e}" ,file=sys.stderr)
+ return 2
+
+ journal_src = _aggregate_journal(planners ,stage_root_dpath)
+
+ if not args.print_from_journal:
+ try:
+ cbor_bytes = journal_src.to_CBOR_bytes(canonical_bool=True)
+ journal = Journal.from_CBOR_bytes(cbor_bytes)
+ except Exception as e:
+ print(f"error: CBOR round-trip failed: {e}" ,file=sys.stderr)
+ return 2
+ else:
+ journal = journal_src
+
+ _print_plan(journal)
+
+ errs = wf_check(journal)
+ if errs:
+ print("\nerror(s):" ,file=sys.stderr)
+ for e in errs:
+ print(f" - {e}" ,file=sys.stderr)
+ return 2
+
+ emit = Path(args.emit_CBOR) if args.emit_CBOR else None
+ if emit:
+ try:
+ data = (cbor_bytes if not args.print_from_journal else journal_src.to_CBOR_bytes(canonical_bool=True))
+ emit.parent.mkdir(parents=True ,exist_ok=True)
+ with open(emit ,"wb") as fh:
+ fh.write(data)
+ print(f"\nWrote CBOR plan: {emit} ({len(data)} bytes)")
+ except Exception as e:
+ print(f"error: failed to write CBOR: {e}" ,file=sys.stderr)
+ return 2
+
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+stage_cp.py — build a CBOR plan from staged configs; show, validate, and apply with privilege.
+
+Given: a stage root directory.
+Does: (user) run configs → build native plan → WF checks → summarize → encode plan → sudo re-exec
+ (root) decode plan → VALID + SANITY → apply ops (displace/copy/delete) safely.
+Returns: exit code.
+
+Requires: pip install cbor2
+"""
+from __future__ import annotations
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,tempfile ,subprocess ,pwd
+from typing import Any
+import cbor2
+
+# ---------- small utils ----------
+
+def _load_stage_module(stage_root_dpath: Path):
+ "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
+ mod_fpath = stage_root_dpath/"Stage.py"
+ if not mod_fpath.exists():
+ raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
+ spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
+ mod = importlib.util.module_from_spec(spec)
+ sys.modules["Stage"] = mod
+ assert spec and spec.loader
+ spec.loader.exec_module(mod) # type: ignore
+ return mod
+
+def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
+ "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
+ rel_fpath_list: list[Path] = []
+ for p in stage_root_dpath.rglob("*.py"):
+ if p.name == "Stage.py": continue
+ if p.is_file():
+ rel_fpath_list.append(p.relative_to(stage_root_dpath))
+ return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
+
+def _sha256_bytes(b: bytes)-> bytes:
+ "Given: bytes. Does: sha256. Returns: 32-byte digest."
+ return hashlib.sha256(b).digest()
+
+def _dst_fpath_str(dst_dpath_str: str ,dst_fname_str: str)-> str:
+ "Given: a directory path string and a filename string. Does: join. Returns: combined POSIX path string."
+ if "/" in dst_fname_str:
+ return "" # invalid; WF will flag
+ return str((Path(dst_dpath_str)/dst_fname_str))
+
+# ---------- WF / VALID / SANITY ----------
+
+_ALLOWLIST_PREFIXES_LIST = ["/etc" ,"/usr/local" ,"/etc/systemd/system"]
+
+def wf_check(plan_map: dict[str,Any])-> list[str]:
+ "Given: plan map. Does: shape/lexical checks only. Returns: list of error strings."
+ errs_list: list[str] = []
+ if plan_map.get("version_int") != 1:
+ errs_list.append("WF_VERSION: unsupported plan version")
+ entries_list = plan_map.get("entries_list")
+ if not isinstance(entries_list ,list):
+ errs_list.append("WF_ENTRIES: 'entries_list' missing or not a list")
+ return errs_list
+ for i ,e_map in enumerate(entries_list ,1):
+ op = e_map.get("op")
+ dst_dpath_str = e_map.get("dst_dpath")
+ dst_fname_str = e_map.get("dst_fname")
+ where = f"entry {i}"
+ if op not in ("copy","displace","delete"):
+ errs_list.append(f"WF_OP:{where}: invalid op {op!r}")
+ continue
+ if not isinstance(dst_dpath_str ,str) or not dst_dpath_str:
+ errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath missing or not str")
+ if not isinstance(dst_fname_str ,str) or not dst_fname_str:
+ errs_list.append(f"WF_DST_FNAME:{where}: dst_fname missing or not str")
+ if isinstance(dst_fname_str ,str) and "/" in dst_fname_str:
+ errs_list.append(f"WF_DST_FNAME:{where}: dst_fname must not contain '/'")
+ if isinstance(dst_dpath_str ,str) and not dst_dpath_str.startswith("/"):
+ errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath must be absolute")
+ full_fpath_str = _dst_fpath_str(dst_dpath_str or "" ,dst_fname_str or "")
+ if not full_fpath_str or not full_fpath_str.startswith("/"):
+ errs_list.append(f"WF_PATH:{where}: failed to construct absolute path from dst_dpath/fname")
+ if op == "copy":
+ mode_int = e_map.get("mode_int")
+ if not isinstance(mode_int ,int) or not (0 <= mode_int <= 0o7777):
+ errs_list.append(f"WF_MODE:{where}: mode_int must be int in [0..0o7777]")
+ if isinstance(mode_int ,int) and (mode_int & 0o6000):
+ errs_list.append(f"WF_MODE:{where}: suid/sgid bits not allowed in MVP")
+ owner_name = e_map.get("owner_name")
+ if not isinstance(owner_name ,str) or not owner_name:
+ errs_list.append(f"WF_OWNER:{where}: owner_name must be non-empty username string")
+ content_bytes = e_map.get("content_bytes")
+ if not (isinstance(content_bytes ,(bytes,bytearray)) and len(content_bytes) >= 0):
+ errs_list.append(f"WF_CONTENT:{where}: content_bytes must be bytes (may be empty)")
+ sha = e_map.get("sha256_bytes")
+ if sha is not None:
+ if not isinstance(sha ,(bytes,bytearray)) or len(sha)!=32:
+ errs_list.append(f"WF_SHA256:{where}: sha256_bytes must be 32-byte digest if present")
+ elif isinstance(content_bytes ,(bytes,bytearray)) and sha != _sha256_bytes(content_bytes):
+ errs_list.append(f"WF_SHA256_MISMATCH:{where}: sha256_bytes does not match content_bytes")
+ return errs_list
+
+def valid_check(plan_map: dict[str,Any])-> list[str]:
+ "Given: plan map. Does: environment (read-only) checks. Returns: list of error strings."
+ errs_list: list[str] = []
+ for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
+ op = e_map.get("op")
+ dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
+ where = f"entry {i}"
+ try:
+ parent_dpath = Path(dst_fpath_str).parent
+ if not parent_dpath.exists():
+ errs_list.append(f"VAL_PARENT_MISSING:{where}: parent dir does not exist: {parent_dpath}")
+ elif not parent_dpath.is_dir():
+ errs_list.append(f"VAL_PARENT_NOT_DIR:{where}: parent is not a directory: {parent_dpath}")
+ if Path(dst_fpath_str).is_dir():
+ errs_list.append(f"VAL_DST_IS_DIR:{where}: destination exists as a directory: {dst_fpath_str}")
+ if op == "copy":
+ owner_name = e_map.get("owner_name")
+ try:
+ pw = pwd.getpwnam(owner_name) # may raise KeyError
+ e_map["_resolved_uid_int"] = pw.pw_uid
+ e_map["_resolved_gid_int"] = pw.pw_gid
+ except Exception:
+ errs_list.append(f"VAL_OWNER_UNKNOWN:{where}: user not found: {owner_name!r}")
+ except Exception as x:
+ errs_list.append(f"VAL_EXCEPTION:{where}: {x}")
+ return errs_list
+
+def sanity_check(plan_map: dict[str,Any])-> list[str]:
+ "Given: plan map. Does: policy checks (allowlist, denials). Returns: list of error strings."
+ errs_list: list[str] = []
+ for i ,e_map in enumerate(plan_map.get("entries_list",[]) ,1):
+ dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
+ where = f"entry {i}"
+ if not any(dst_fpath_str.startswith(pref + "/") or dst_fpath_str==pref for pref in _ALLOWLIST_PREFIXES_LIST):
+ errs_list.append(f"POL_PATH_DENY:{where}: destination outside allowlist: {dst_fpath_str}")
+ return errs_list
+
+# ---------- APPLY (root) ----------
+
+def _utc_str()-> str:
+ "Given: n/a. Does: current UTC compact. Returns: string."
+ import datetime as _dt
+ return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+
+def _ensure_parent_dirs(dst_fpath: Path)-> None:
+ "Given: destination file path. Does: create parents. Returns: None."
+ dst_fpath.parent.mkdir(parents=True ,exist_ok=True)
+
+def _displace_in_place(dst_fpath: Path)-> None:
+ "Given: destination file path. Does: rename existing file/symlink to add UTC suffix. Returns: None."
+ try:
+ if dst_fpath.exists() or dst_fpath.is_symlink():
+ suffix = "_" + _utc_str()
+ dst_fpath.rename(dst_fpath.with_name(dst_fpath.name + suffix))
+ except FileNotFoundError:
+ pass
+
+def _apply_copy(dst_fpath: Path ,content_bytes: bytes ,mode_int: int ,uid_int: int ,gid_int: int)-> None:
+ "Given: target, bytes, mode, uid, gid. Does: write temp, fsync, chmod/chown, atomic replace. Returns: None."
+ _ensure_parent_dirs(dst_fpath)
+ _displace_in_place(dst_fpath)
+ tmp_fpath = dst_fpath.with_name("." + dst_fpath.name + ".stage_tmp")
+ with open(tmp_fpath ,"wb") as fh:
+ fh.write(content_bytes)
+ fh.flush()
+ os.fsync(fh.fileno())
+ try:
+ os.chmod(tmp_fpath ,mode_int & 0o777)
+ except Exception:
+ pass
+ try:
+ os.chown(tmp_fpath ,uid_int ,gid_int)
+ except Exception:
+ pass
+ os.replace(tmp_fpath ,dst_fpath) # atomic within same dir/device
+
+def _apply_delete(dst_fpath: Path)-> None:
+ "Given: target file path. Does: unlink file/symlink if present. Returns: None."
+ try:
+ if dst_fpath.is_symlink() or dst_fpath.is_file():
+ dst_fpath.unlink()
+ except FileNotFoundError:
+ pass
+
+def apply_plan(plan_map: dict[str,Any] ,dry_run_bool: bool=False)-> int:
+ "Given: plan map and dry flag. Does: execute ops sequentially. Returns: exit code."
+ for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
+ op = e_map.get("op")
+ dst_fpath = Path(_dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")))
+ if op == "displace":
+ print(f"+ displace {dst_fpath}")
+ if not dry_run_bool:
+ _displace_in_place(dst_fpath)
+ elif op == "delete":
+ print(f"+ delete {dst_fpath}")
+ if not dry_run_bool:
+ _apply_delete(dst_fpath)
+ elif op == "copy":
+ mode_int = e_map.get("mode_int") or 0o644
+ uid_int = e_map.get("_resolved_uid_int" ,0)
+ gid_int = e_map.get("_resolved_gid_int" ,0)
+ content_bytes = e_map.get("content_bytes") or b""
+ print(f"+ copy {dst_fpath} mode {mode_int:04o} uid {uid_int} gid {gid_int} bytes {len(content_bytes)}")
+ if not dry_run_bool:
+ _apply_copy(dst_fpath ,content_bytes ,mode_int ,uid_int ,gid_int)
+ else:
+ print(f"! unknown op {op} (skipping)")
+ return 2
+ return 0
+
+# ---------- orchestration ----------
+
+def _build_plan_unpriv(stage_root_dpath: Path)-> dict[str,Any]:
+ "Given: stage root. Does: execute configs, accumulate entries, add sha256. Returns: plan map."
+ StageMod = _load_stage_module(stage_root_dpath)
+ Stage = StageMod.Stage
+ Stage._reset()
+ Stage.set_meta(
+ planner_user_name=getpass.getuser()
+ ,planner_uid_int=os.getuid()
+ ,planner_gid_int=os.getgid()
+ ,host_name=socket.gethostname()
+ ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
+ )
+ for rel_fpath in _config_rel_fpaths(stage_root_dpath):
+ Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
+ runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
+ Stage._end()
+ for e_map in Stage.plan_entries():
+ if e_map.get("op") == "copy" and isinstance(e_map.get("content_bytes") ,(bytes,bytearray)):
+ e_map["sha256_bytes"] = _sha256_bytes(e_map["content_bytes"])
+ return Stage.plan_object()
+
+def _sudo_apply_self(plan_fpath: Path ,dry_run_bool: bool)-> int:
+ "Given: plan file path and dry flag. Does: sudo re-exec current script with --apply. Returns: exit code."
+ cmd_list = ["sudo",sys.executable,os.path.abspath(__file__),"--apply"
+ ,"--plan",str(plan_fpath)]
+ if dry_run_bool:
+ cmd_list.append("--dry-run")
+ return subprocess.call(cmd_list)
+
+def main(argv: list[str]|None=None)-> int:
+ "Given: CLI. Does: plan, WF (user) then VALID+SANITY+APPLY (root). Returns: exit code."
+ ap = argparse.ArgumentParser(prog="stage_cp.py"
+ ,description="Plan staged config application and apply with sudo.")
+ ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
+ ap.add_argument("--dry-run",action="store_true",help="validate and show actions, do not change files")
+ ap.add_argument("--apply",action="store_true",help=argparse.SUPPRESS) # internal (root path)
+ ap.add_argument("--plan",default=None,help=argparse.SUPPRESS) # internal (root path)
+ args = ap.parse_args(argv)
+
+ # Root path (apply)
+ if args.apply:
+ if os.geteuid() != 0:
+ print("error: --apply requires root" ,file=sys.stderr)
+ return 2
+ if not args.plan:
+ print("error: --plan path required for --apply" ,file=sys.stderr)
+ return 2
+ with open(args.plan ,"rb") as fh:
+ plan_map = cbor2.load(fh)
+ val_errs = valid_check(plan_map)
+ pol_errs = sanity_check(plan_map)
+ if val_errs or pol_errs:
+ print("error(s) during validation/sanity:" ,file=sys.stderr)
+ for e in val_errs: print(f" - {e}" ,file=sys.stderr)
+ for e in pol_errs: print(f" - {e}" ,file=sys.stderr)
+ return 2
+ rc = apply_plan(plan_map ,dry_run_bool=args.dry_run)
+ return rc
+
+ # User path (plan + summarize + escalate)
+ stage_root_dpath = Path(args.stage)
+ plan_map = _build_plan_unpriv(stage_root_dpath)
+
+ entries_list = plan_map.get("entries_list" ,[])
+ print(f"Built plan with {len(entries_list)} entr{'y' if len(entries_list)==1 else 'ies'}")
+
+ total_bytes_int = sum(len(e_map.get("content_bytes") or b"")
+ for e_map in entries_list if e_map.get("op")=="copy")
+ print(f"Total bytes to write: {total_bytes_int}")
+ if args.dry_run:
+ print("\n--dry-run: would perform the following:")
+
+ for i ,e_map in enumerate(entries_list ,1):
+ op = e_map.get("op")
+ dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath") ,e_map.get("dst_fname"))
+ if op=="copy":
+ mode_int = e_map.get("mode_int") or 0o644
+ owner_name = e_map.get("owner_name") or "?"
+ size = len(e_map.get("content_bytes") or b"")
+ print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode_int:04o} owner {owner_name} bytes {size}")
+ elif op=="displace":
+ print(f"{i:02d}. displace -> {dst_fpath_str}")
+ elif op=="delete":
+ print(f"{i:02d}. delete -> {dst_fpath_str}")
+ else:
+ print(f"{i:02d}. ?op? -> {dst_fpath_str}")
+
+ with tempfile.NamedTemporaryFile(prefix="plan_" ,suffix=".cbor" ,delete=False) as tf:
+ cbor2.dump(plan_map ,tf)
+ plan_fpath = Path(tf.name)
+ try:
+ if args.dry_run:
+ return _sudo_apply_self(plan_fpath ,dry_run_bool=True)
+ ans = input("\nProceed with apply under sudo? [y/N] ").strip().lower()
+ if ans not in ("y","yes"):
+ print("Aborted.")
+ return 0
+ return _sudo_apply_self(plan_fpath ,dry_run_bool=False)
+ finally:
+ try: os.unlink(plan_fpath)
+ except Exception: pass
+
+if __name__ == "__main__":
+ sys.exit(main())
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-ls_stage.py — list staged files and their header-declared install metadata.
-
-Header line format (first line of each file):
- <owner> <permissions> <write_file_name> <target_directory_path>
-
-- owner: username string (need not exist until install time)
-- permissions: four octal digits, e.g. 0644
-- write_file_name: '.' means use the read file's basename, else use the given POSIX filename
-- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound)
-
-Output formats:
-- list (default): "read_file_path: owner permissions write_file_name target_directory_path"
-- table: columns aligned for readability
-"""
-
-from __future__ import annotations
-
-# never write bytecode (root/sudo friendly)
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from dataclasses import dataclass
-from pathlib import Path
-import argparse
-import re
-
-# === Stage utilities (importable) ===
-
-def stage_read_file_paths(stage_root: Path)-> list[Path]:
- """Given: stage_root directory.
- Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root.
- Returns: list[Path] of POSIX-order sorted relative paths (no leading slash).
- """
- rels: list[Path] = []
- for p in stage_root.rglob("*"):
- try:
- if p.is_file(): # follows symlink-to-file
- rels.append(p.relative_to(stage_root))
- except (FileNotFoundError ,RuntimeError):
- # broken link or race; skip conservatively
- continue
- return sorted(rels ,key=lambda x: x.as_posix())
-
-@dataclass
-class StageRow:
- read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged")
- owner: str # token[0]
- perm_octal_str: str # token[1], exactly as in header (validated ####)
- perm_int: int # token[1] parsed as base-8
- write_name: str # token[2] ('.' resolved to read_rel.name)
- target_dir: Path # token[3] (Path)
- header_raw: str # original header line (sans newline)
-
- # convenience
- def write_abs(self ,root: Path)-> Path:
- return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name
-
-# header parsing rules
-_PERM_RE = re.compile(r"^[0-7]{4}$")
-
-def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]:
- """Given: raw first line of a staged file and its stage-relative path.
- Does: parse '<owner> <perm> <write_name> <target_dir>' with max 4 tokens (target_dir may contain spaces if quoted not required).
- Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem.
- """
- # strip BOM and trailing newline/spaces
- h = header.lstrip("\ufeff").strip()
- if not h:
- return None ,f"empty header line in {read_rel}"
- parts = h.split(maxsplit=3)
- if len(parts) != 4:
- return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}"
- owner ,perm_s ,write_name ,target_dir_s = parts
-
- if not _PERM_RE.fullmatch(perm_s):
- return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits"
-
- # resolve '.' → basename
- resolved_write_name = read_rel.name if write_name == "." else write_name
-
- # MVP guard: write_name should be a single filename (no '/')
- if "/" in resolved_write_name:
- return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}"
-
- # target dir may be absolute (recommended) or relative (we treat relative as under the install root)
- target_dir = Path(target_dir_s)
-
- try:
- row = StageRow(
- read_rel = read_rel
- ,owner = owner
- ,perm_octal_str = perm_s
- ,perm_int = int(perm_s ,8)
- ,write_name = resolved_write_name
- ,target_dir = target_dir
- ,header_raw = h
- )
- return row ,None
- except Exception as e:
- return None ,f"internal parse error in {read_rel}: {e}"
-
-def read_first_line(p: Path)-> str:
- """Return the first line (sans newline). UTF-8 with BOM tolerant."""
- with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh:
- line = fh.readline()
- return line.rstrip("\n\r")
-
-def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]:
- """Given: stage_root.
- Does: enumerate files, parse each header line, collect rows and errors.
- Returns: (rows, errors)
- """
- rows: list[StageRow] = []
- errs: list[str] = []
- for rel in stage_read_file_paths(stage_root):
- abs_path = stage_root / rel
- try:
- header = read_first_line(abs_path)
- except Exception as e:
- errs.append(f"read error in {rel}: {e}")
- continue
- row ,err = parse_stage_header_line(header ,rel)
- if err:
- errs.append(err)
- else:
- rows.append(row) # type: ignore[arg-type]
- return rows ,errs
-
-# === Printers ===
-
-def print_list(rows: list[StageRow])-> None:
- """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line."""
- for r in rows:
- print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}")
-
-def print_table(rows: list[StageRow])-> None:
- """Aligned table printer (no headers, just data in columns)."""
- if not rows:
- return
- a = [r.read_rel.as_posix() for r in rows]
- b = [r.owner for r in rows]
- c = [r.perm_octal_str for r in rows]
- d = [r.write_name for r in rows]
- e = [str(r.target_dir) for r in rows]
- wa = max(len(s) for s in a)
- wb = max(len(s) for s in b)
- wc = max(len(s) for s in c)
- wd = max(len(s) for s in d)
- # e (target_dir) left ragged
- for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
- print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}")
-
-# === Orchestrator ===
-
-def ls_stage(stage_root: Path ,fmt: str="list")-> int:
- """Given: stage_root and output format ('list'|'table').
- Does: scan and parse staged files, print in the requested format; report syntax errors to stderr.
- Returns: 0 on success; 1 if any syntax errors were encountered.
- """
- rows ,errs = scan_stage(stage_root)
- if fmt == "table":
- print_table(rows)
- else:
- print_list(rows)
- if errs:
- print("\nerror(s):" ,file=sys.stderr)
- for e in errs:
- print(f" - {e}" ,file=sys.stderr)
- return 1
- return 0
-
-# === CLI ===
-
-def main(argv: list[str] | None=None)-> int:
- ap = argparse.ArgumentParser(
- prog="ls_stage.py"
- ,description="List staged files and their header-declared install metadata."
- )
- ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
- ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list"
- ,help="output format (default: list)")
- args = ap.parse_args(argv)
- stage_root = Path(args.stage)
- if not stage_root.exists() or not stage_root.is_dir():
- print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
- return 2
- return ls_stage(stage_root ,fmt=args.format)
-
-if __name__ == "__main__":
- sys.exit(main())
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan.
+
+Given: a stage root directory.
+Does: loads Stage.py, executes each config, builds a native plan map, summarizes it.
+Returns: exit code 0 on success, non-zero on error.
+"""
+from __future__ import annotations
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib
+
+# ---------- helpers ----------
+
+def _load_stage_module(stage_root_dpath: Path):
+ "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
+ mod_fpath = stage_root_dpath/"Stage.py"
+ if not mod_fpath.exists():
+ raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
+ spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
+ mod = importlib.util.module_from_spec(spec)
+ sys.modules["Stage"] = mod
+ assert spec and spec.loader
+ spec.loader.exec_module(mod) # type: ignore
+ return mod
+
+def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
+ "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
+ rel_fpath_list: list[Path] = []
+ for p in stage_root_dpath.rglob("*.py"):
+ if p.name == "Stage.py": continue
+ if p.is_file():
+ rel_fpath_list.append(p.relative_to(stage_root_dpath))
+ return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
+
+def _sha256_hex(b: bytes)-> str:
+ "Given: bytes. Does: sha256. Returns: hex string."
+ return hashlib.sha256(b).hexdigest()
+
+# ---------- main ----------
+
+def main(argv: list[str]|None=None)-> int:
+ "Given: CLI. Does: show plan. Returns: exit code."
+ ap = argparse.ArgumentParser(prog="stage_show_plan.py"
+ ,description="Run staged config scripts and print the resulting plan.")
+ ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
+ args = ap.parse_args(argv)
+
+ stage_root_dpath = Path(args.stage)
+ StageMod = _load_stage_module(stage_root_dpath)
+ Stage = StageMod.Stage
+ Stage._reset()
+ Stage.set_meta(
+ planner_user_name=getpass.getuser()
+ ,planner_uid_int=os.getuid()
+ ,planner_gid_int=os.getgid()
+ ,host_name=socket.gethostname()
+ ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
+ )
+
+ for rel_fpath in _config_rel_fpaths(stage_root_dpath):
+ Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
+ runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
+ Stage._end()
+
+ plan_map = Stage.plan_object()
+ entries_list = plan_map["entries_list"]
+ print(f"Plan version: {plan_map['version_int']}")
+ print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} "
+ f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}")
+ print(f"Created: {plan_map['meta_map'].get('created_utc_str')}")
+ print(f"Entries: {len(entries_list)}\n")
+
+ for i ,e_map in enumerate(entries_list ,1):
+ op = e_map.get("op")
+ dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}"
+ if op == "copy":
+ content = e_map.get("content_bytes") or b""
+ sz = len(content)
+ mode = e_map.get("mode_octal_str") or "????"
+ owner = e_map.get("owner_name") or "?"
+ h = _sha256_hex(content)
+ print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…")
+ elif op == "displace":
+ print(f"{i:02d}. displace -> {dst_fpath_str}")
+ elif op == "delete":
+ print(f"{i:02d}. delete -> {dst_fpath_str}")
+ else:
+ print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
--- /dev/null
+#!/usr/bin/env -S python3 -B
+import Stage
+
+# You can compute these with arbitrary Python if you like
+svc = "unbound"
+zone = "US"
+fname = f"unbound-{zone}.conf"
+
+Stage.init(
+ write_file_name="." # '.' → use basename of this file -> 'example_dns.py'
+, write_file_directory_path="/etc/unbound"
+, write_file_owner="root"
+, write_file_permissions=0o644 # or "0644"
+, read_file_contents="""\
+# generated config (example)
+server:
+ verbosity: 1
+ interface: 127.0.0.1
+"""
+)
+
+# declare the desired operations (no effect in 'noop'/'dry' without a copier)
+Stage.displace()
+Stage.copy()
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+stage_ls.py — execute staged Python programs with Stage in 'noop' mode and list metadata.
+
+For each *.py under --stage (recursively, excluding Stage.py), this tool:
+ 1) loads Stage.py from the stage root,
+ 2) switches mode to 'noop' (no side effects, no printing),
+ 3) executes the program via runpy.run_path(...) with the proper __file__,
+ 4) collects the resolved write_file_* metadata and declared ops,
+ 5) prints either list or aligned table,
+ 6) reports any collected errors.
+
+This lets admins compute metadata with arbitrary Python while guaranteeing no writes.
+"""
+
+from __future__ import annotations
+
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from dataclasses import dataclass
+from pathlib import Path
+import argparse
+import importlib.util ,runpy
+import traceback
+
+# --- utility dataclass (for printing) ---
+
+@dataclass
+class Row:
+ read_rel: Path
+ owner: str|None
+ perm: str|None
+ write_name: str|None
+ target_dir: Path|None
+ ops: list[str]
+ errors: list[str]
+
+# --- helpers ---
+
+def _load_stage_module(stage_root: Path):
+ """Load Stage.py from stage_root into sys.modules['Stage'] (overwriting if present). Returns the Stage module."""
+ stage_py = stage_root/"Stage.py"
+ if not stage_py.exists():
+ raise FileNotFoundError(f"Stage.py not found at {stage_py} — place Stage.py in the stage root.")
+ spec = importlib.util.spec_from_file_location("Stage" ,str(stage_py))
+ if spec is None or spec.loader is None:
+ raise RuntimeError(f"cannot load Stage module from {stage_py}")
+ mod = importlib.util.module_from_spec(spec)
+ sys.modules["Stage"] = mod
+ spec.loader.exec_module(mod) # type: ignore[union-attr]
+ return mod
+
+def _stage_program_paths(stage_root: Path)-> list[Path]:
+ rels: list[Path] = []
+ for p in stage_root.rglob("*.py"):
+ if p.name == "Stage.py":
+ continue
+ try:
+ if p.is_file():
+ rels.append(p.relative_to(stage_root))
+ except Exception:
+ continue
+ return sorted(rels ,key=lambda x: x.as_posix())
+
+def print_list(rows: list[Row])-> None:
+ for r in rows:
+ owner = r.owner or "?"
+ perm = r.perm or "????"
+ name = r.write_name or "?"
+ tdir = str(r.target_dir) if r.target_dir is not None else "?"
+ print(f"{r.read_rel.as_posix()}: {owner} {perm} {name} {tdir}")
+
+def print_table(rows: list[Row])-> None:
+ if not rows:
+ return
+ a = [r.read_rel.as_posix() for r in rows]
+ b = [(r.owner or "?") for r in rows]
+ c = [(r.perm or "????") for r in rows]
+ d = [(r.write_name or "?") for r in rows]
+ e = [str(r.target_dir) if r.target_dir is not None else "?" for r in rows]
+ wa = max(len(s) for s in a)
+ wb = max(len(s) for s in b)
+ wc = max(len(s) for s in c)
+ wd = max(len(s) for s in d)
+ for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
+ print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}")
+
+# --- core ---
+
+def ls_stage(stage_root: Path ,fmt: str="list")-> int:
+ Stage = _load_stage_module(stage_root)
+ Stage.Stage.set_mode("noop") # hard safety for this tool
+
+ rows: list[Row] = []
+ errs: list[str] = []
+
+ for rel in _stage_program_paths(stage_root):
+ abs_path = stage_root/rel
+ try:
+ # isolate per-run state
+ Stage.Stage._current = None
+ Stage.Stage._all_records.clear()
+ Stage.Stage._begin(read_rel=rel ,stage_root=stage_root)
+
+ # execute the staged program under its real path
+ runpy.run_path(str(abs_path) ,run_name="__main__")
+
+ rec = Stage.Stage._end()
+ if rec is None:
+ errs.append(f"{rel}: program executed but Stage.init(...) was never called")
+ continue
+
+ rows.append(
+ Row(
+ read_rel=rel
+ , owner=rec.owner
+ , perm=rec.perm_octal_str
+ , write_name=rec.write_name
+ , target_dir=rec.target_dir
+ , ops=list(rec.ops)
+ , errors=list(rec.errors)
+ )
+ )
+
+ except SystemExit as e:
+ errs.append(f"{rel}: program called sys.exit({e.code}) during listing")
+ except Exception:
+ tb = traceback.format_exc(limit=2)
+ errs.append(f"{rel}: exception during execution:\n{tb}")
+
+ # print data
+ if fmt == "table":
+ print_table(rows)
+ else:
+ print_list(rows)
+
+ # print per-row Stage errors
+ row_errs = [f"{r.read_rel}: {msg}" for r in rows for msg in r.errors]
+ all_errs = row_errs + errs
+ if all_errs:
+ print("\nerror(s):" ,file=sys.stderr)
+ for e in all_errs:
+ print(f" - {e}" ,file=sys.stderr)
+ return 1
+ return 0
+
+# --- CLI ---
+
+def main(argv: list[str] | None=None)-> int:
+ import argparse
+ ap = argparse.ArgumentParser(
+ prog="stage_ls.py"
+ , description="Execute staged Python configs with Stage in 'noop' mode and list resolved metadata."
+ )
+ ap.add_argument("--stage" ,default="stage" ,help="stage directory (default: ./stage)")
+ ap.add_argument("--format" ,choices=["list","table"] ,default="list" ,help="output format")
+ args = ap.parse_args(argv)
+
+ stage_root = Path(args.stage)
+ if not stage_root.exists() or not stage_root.is_dir():
+ print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
+ return 2
+
+ return ls_stage(stage_root ,fmt=args.format)
+
+if __name__ == "__main__":
+ sys.exit(main())