mvoing to python cofigurations and Planner
authorThomas Walker Lynch <eknp9n@reasoningtechnology.com>
Wed, 17 Sep 2025 14:39:53 +0000 (07:39 -0700)
committerThomas Walker Lynch <eknp9n@reasoningtechnology.com>
Wed, 17 Sep 2025 14:39:53 +0000 (07:39 -0700)
developer/source/DNS/Planner.py [new file with mode: 0644]
developer/source/DNS/deprecated/stage_ls.py [new file with mode: 0755]
developer/source/DNS/plan_show.py [new file with mode: 0755]
developer/source/DNS/stage_cp.py [new file with mode: 0644]
developer/source/DNS/stage_ls.py [deleted file]
developer/source/DNS/stage_show_plan.py [new file with mode: 0644]
developer/source/DNS/stage_test_0/example.py [new file with mode: 0644]
developer/source/DNS/stage_test_0/stage_ls.py [new file with mode: 0644]

diff --git a/developer/source/DNS/Planner.py b/developer/source/DNS/Planner.py
new file mode 100644 (file)
index 0000000..0da1384
--- /dev/null
@@ -0,0 +1,356 @@
+#!/usr/bin/env -S python3 -B
+"""
+Planner.py — plan builder for staged configuration (UNPRIVILEGED).
+
+The Planner accumulates Command objects into a Journal.
+
+Journal building is orchestrated by the outer runner (e.g., stage_show_plan, stage_cp)
+which constructs a Planner per config file and invokes Planner command methods.
+
+Defaults and provenance come from a PlannerContext instance. You can replace the
+context at any time via set_context(ctx).
+
+The Journal can be exported as CBOR via Journal.to_CBOR_bytes(), and reconstructed
+on the privileged side via Journal.from_CBOR_bytes().
+
+On-wire field names are snake_case and use explicit suffixes (_str,_bytes,_int, etc.)
+to avoid ambiguity.
+"""
+
+from __future__ import annotations
+
+# no bytecode anywhere (works under sudo/root shells too)
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE","1")
+
+from dataclasses import dataclass ,field
+from pathlib import Path
+from typing import Any
+
+# ===== Utilities =====
+
+def _norm_perm(value: int|str)-> tuple[int,str]|None:
+  "Given int or 4-char octal string. Does validate/normalize. Returns (int,'%04o') or None."
+  if isinstance(value ,int):
+    if 0 <= value <= 0o7777:
+      return value ,f"{value:04o}"
+    return None
+  if isinstance(value ,str):
+    s = value.strip()
+    if len(s)==4 and all(ch in "01234567" for ch in s):
+      try:
+        v = int(s ,8)
+        return v ,s
+      except Exception:
+        return None
+  return None
+
+def _is_abs_dpath(dpath_str: str)-> bool:
+  "Given path string. Does quick abs dir check. Returns bool."
+  return bool(dpath_str) and dpath_str.startswith("/")
+
+def _join_write_file(dpath_str: str ,fname_str: str)-> str:
+  "Given dir path string and filename string. Does join. Returns POSIX path string or ''."
+  if not _is_abs_dpath(dpath_str): return ""
+  if not fname_str or "/" in fname_str: return ""
+  return (Path(dpath_str)/fname_str).as_posix()
+
+# ===== Core data types =====
+
+@dataclass(slots=True)
+class Command:
+  """
+  Command — a single planned operation.
+
+  Given a command name and an argument map (native values).
+  Does hold the op name, owns a distinct args map, accumulates errors for this op.
+  Returns serializable mapping via to_map().
+  """
+  name_str: str
+  args_map: dict[str,Any] = field(default_factory=dict)
+  errors_list: list[str] = field(default_factory=list)
+
+  def add_error(self ,msg_str: str)-> None:
+    "Given message. Does append to errors_list. Returns None."
+    self.errors_list.append(msg_str)
+
+  def to_map(self)-> dict[str,Any]:
+    "Given self. Does convert to a plain dict. Returns {'op','args_map','errors_list'}."
+    return {
+      "op": self.name_str
+      ,"args_map": dict(self.args_map)
+      ,"errors_list": list(self.errors_list)
+    }
+
+@dataclass(slots=True)
+class PlannerContext:
+  """
+  PlannerContext — per-config provenance and defaults.
+
+  Given: stage_root_dpath, read_file_rel_fpath, default write_file location/name,
+         default owner name, default permission (int or '0644'), optional default content.
+  Does:  provide ambient defaults and provenance to Planner methods.
+  Returns: n/a (data holder).
+  """
+  stage_root_dpath: Path
+  read_file_rel_fpath: Path
+  default_write_file_dpath_str: str
+  default_write_file_fname_str: str
+  default_owner_name_str: str
+  default_mode_int: int|None = None
+  default_mode_octal_str: str|None = None
+  default_content_bytes: bytes|None = None
+
+  @staticmethod
+  def from_values(stage_root_dpath: Path
+                  ,read_file_rel_fpath: Path
+                  ,write_file_dpath_str: str
+                  ,write_file_fname_str: str
+                  ,owner_name_str: str
+                  ,perm: int|str
+                  ,content: bytes|str|None
+  )-> PlannerContext:
+    "Given raw values. Does normalize perm and content. Returns PlannerContext."
+    if isinstance(content ,str):
+      content_b = content.encode("utf-8")
+    else:
+      content_b = content
+    perm_norm = _norm_perm(perm)
+    if perm_norm is None:
+      m_int ,m_oct = None ,None
+    else:
+      m_int ,m_oct = perm_norm
+    return PlannerContext(
+      stage_root_dpath=stage_root_dpath
+      ,read_file_rel_fpath=read_file_rel_fpath
+      ,default_write_file_dpath_str=write_file_dpath_str
+      ,default_write_file_fname_str=write_file_fname_str
+      ,default_owner_name_str=owner_name_str
+      ,default_mode_int=m_int
+      ,default_mode_octal_str=m_oct
+      ,default_content_bytes=content_b
+    )
+
+@dataclass(slots=True)
+class Journal:
+  """
+  Journal — ordered list of Commands plus provenance metadata.
+
+  Given optional meta map.
+  Does append commands, expose entries, produce plain or CBOR encodings, and rebuild from CBOR.
+  Returns plain dict via to_map(), bytes via to_CBOR_bytes(), Journal via from_CBOR_bytes().
+  """
+  meta_map: dict[str,Any] = field(default_factory=dict)
+  commands_list: list[Command] = field(default_factory=list)
+
+  def set_meta(self ,**kv)-> None:
+    "Given keyword meta. Does merge into meta_map. Returns None."
+    self.meta_map.update(kv)
+
+  def append(self ,cmd: Command)-> None:
+    "Given Command. Does append to commands_list. Returns None."
+    self.commands_list.append(cmd)
+
+  def entries_list(self)-> list[dict[str,Any]]:
+    "Given n/a. Does return list of entry dicts (copy). Returns list[dict]."
+    return [c.to_map() for c in self.commands_list]
+
+  def to_map(self)-> dict[str,Any]:
+    "Given n/a. Does package a plan map (ready for CBOR). Returns dict."
+    return {
+      "version_int": 1
+      ,"meta_map": dict(self.meta_map)
+      ,"entries_list": self.entries_list()
+    }
+
+  def to_CBOR_bytes(self ,canonical_bool: bool=True)-> bytes:
+    "Given n/a. Does CBOR-encode to bytes (requires cbor2). Returns bytes."
+    try:
+      import cbor2
+    except Exception as e:
+      raise RuntimeError(f"package cbor2 required for to_CBOR_bytes: {e}")
+    return cbor2.dumps(self.to_map() ,canonical=canonical_bool)
+
+  @staticmethod
+  def from_CBOR_bytes(data_bytes: bytes)-> Journal:
+    "Given CBOR bytes. Does decode and rebuild a Journal (Commands + meta). Returns Journal."
+    try:
+      import cbor2
+    except Exception as e:
+      raise RuntimeError(f"package cbor2 required for from_CBOR_bytes: {e}")
+    obj = cbor2.loads(data_bytes)
+    if not isinstance(obj ,dict): raise ValueError("CBOR root must be a map")
+    meta = dict(obj.get("meta_map") or {})
+    entries = obj.get("entries_list") or []
+    j = Journal(meta_map=meta)
+    for e in entries:
+      if not isinstance(e ,dict): continue
+      op = e.get("op") or "?"
+      args = e.get("args_map") or {}
+      errs = e.get("errors_list") or []
+      j.append(Command(name_str=op ,args_map=dict(args) ,errors_list=list(errs)))
+    return j
+
+# ===== Planner =====
+
+class Planner:
+  """
+  Planner — constructs a Journal of Commands from config scripts.
+
+  Given: PlannerContext (provenance + defaults).
+  Does:  maintains a Journal; command methods (copy/displace/delete) create Command objects,
+         fill missing args from context defaults, preflight minimal shape checks, then append.
+  Returns: accessors for Journal and meta; no I/O or privilege here.
+  """
+  def __init__(self ,ctx: PlannerContext)-> None:
+    self._ctx = ctx
+    self._journal = Journal()
+    # seed provenance; outer tools can add more later
+    self._journal.set_meta(
+      source_read_file_rel_fpath_str=ctx.read_file_rel_fpath.as_posix()
+      ,stage_root_dpath_str=str(ctx.stage_root_dpath)
+    )
+
+  # --- Context management ---
+
+  def set_context(self ,ctx: PlannerContext)-> None:
+    "Given PlannerContext. Does replace current context. Returns None."
+    self._ctx = ctx
+
+  def context(self)-> PlannerContext:
+    "Given n/a. Does return current context. Returns PlannerContext."
+    return self._ctx
+
+  # --- Journal access ---
+
+  def journal(self)-> Journal:
+    "Given n/a. Does return the Journal (live). Returns Journal."
+    return self._journal
+
+  # --- Helpers ---
+
+  def _resolve_write_file(self ,write_file_dpath_str: str|None ,write_file_fname_str: str|None)-> tuple[str,str]:
+    "Given optional write_file dpath/fname. Does fill from context; '.' fname → read_file basename. Returns (dpath,fname)."
+    dpath_str = write_file_dpath_str if write_file_dpath_str is not None else self._ctx.default_write_file_dpath_str
+    fname_str = write_file_fname_str if write_file_fname_str is not None else self._ctx.default_write_file_fname_str
+    if fname_str == ".":
+      fname_str = self._ctx.read_file_rel_fpath.name
+    return dpath_str ,fname_str
+
+  def _resolve_owner(self ,owner_name_str: str|None)-> str:
+    "Given optional owner. Does fill from context. Returns owner string."
+    return owner_name_str if owner_name_str is not None else self._ctx.default_owner_name_str
+
+  def _resolve_mode(self ,perm: int|str|None)-> tuple[int|None,str|None]:
+    "Given optional perm. Does normalize or fall back to context. Returns (mode_int,mode_octal_str)."
+    if perm is None:
+      return self._ctx.default_mode_int ,self._ctx.default_mode_octal_str
+    norm = _norm_perm(perm)
+    return (norm if norm is not None else (None ,None))
+
+  def _resolve_content(self ,content: bytes|str|None)-> bytes|None:
+    "Given optional content (bytes or str). Does normalize or fall back to context. Returns bytes|None."
+    if content is None:
+      return self._ctx.default_content_bytes
+    if isinstance(content ,str):
+      return content.encode("utf-8")
+    return content
+
+  # --- Command builders ---
+
+  def copy(self
+           ,*
+           ,write_file_dpath_str: str|None=None
+           ,write_file_fname_str: str|None=None
+           ,owner_name_str: str|None=None
+           ,perm: int|str|None=None
+           ,content: bytes|str|None=None
+           ,read_file_rel_fpath: Path|None=None
+  )-> Command:
+    """
+    Given: optional overrides for write_file (dpath,fname,owner,perm), content, and read_file_rel_fpath.
+    Does:  build a 'copy' command entry (content is embedded; read_file path kept as provenance).
+    Returns: Command (also appended to Journal).
+    """
+    cmd = Command("copy")
+    # resolve basics
+    wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
+    owner_str = self._resolve_owner(owner_name_str)
+    mode_int ,mode_oct = self._resolve_mode(perm)
+    content_b = self._resolve_content(content)
+    read_rel = (read_file_rel_fpath if read_file_rel_fpath is not None else self._ctx.read_file_rel_fpath)
+
+    # minimal shape checks (well-formedness, not policy)
+    if not _is_abs_dpath(wf_dpath_str):
+      cmd.add_error("write_file_dpath_str must be absolute and non-empty")
+    if not wf_fname_str or "/" in wf_fname_str:
+      cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
+    if not owner_str:
+      cmd.add_error("owner_name_str must be non-empty")
+    if (mode_int ,mode_oct) == (None ,None):
+      cmd.add_error("perm must be an int <= 0o7777 or a 4-digit octal string")
+    if content_b is None:
+      cmd.add_error("content is required for copy() (bytes or str)")
+
+    cmd.args_map.update({
+      "write_file_dpath_str": wf_dpath_str
+      ,"write_file_fname_str": wf_fname_str
+      ,"owner_name_str": owner_str
+      ,"mode_int": mode_int
+      ,"mode_octal_str": mode_oct
+      ,"content_bytes": content_b
+      ,"read_file_rel_fpath_str": read_rel.as_posix()
+    })
+    self._journal.append(cmd)
+    return cmd
+
+  def displace(self
+               ,*
+               ,write_file_dpath_str: str|None=None
+               ,write_file_fname_str: str|None=None
+  )-> Command:
+    """
+    Given: optional write_file dpath/fname overrides.
+    Does:  build a 'displace' command (rename existing write_file in-place with UTC suffix).
+    Returns: Command (appended).
+    """
+    cmd = Command("displace")
+    wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
+
+    if not _is_abs_dpath(wf_dpath_str):
+      cmd.add_error("write_file_dpath_str must be absolute and non-empty")
+    if not wf_fname_str or "/" in wf_fname_str:
+      cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
+
+    cmd.args_map.update({
+      "write_file_dpath_str": wf_dpath_str
+      ,"write_file_fname_str": wf_fname_str
+    })
+    self._journal.append(cmd)
+    return cmd
+
+  def delete(self
+             ,*
+             ,write_file_dpath_str: str|None=None
+             ,write_file_fname_str: str|None=None
+  )-> Command:
+    """
+    Given: optional write_file dpath/fname overrides.
+    Does:  build a 'delete' command (unlink if present).
+    Returns: Command (appended).
+    """
+    cmd = Command("delete")
+    wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
+
+    if not _is_abs_dpath(wf_dpath_str):
+      cmd.add_error("write_file_dpath_str must be absolute and non-empty")
+    if not wf_fname_str or "/" in wf_fname_str:
+      cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
+
+    cmd.args_map.update({
+      "write_file_dpath_str": wf_dpath_str
+      ,"write_file_fname_str": wf_fname_str
+    })
+    self._journal.append(cmd)
+    return cmd
diff --git a/developer/source/DNS/deprecated/stage_ls.py b/developer/source/DNS/deprecated/stage_ls.py
new file mode 100755 (executable)
index 0000000..93dd3d2
--- /dev/null
@@ -0,0 +1,193 @@
+#!/usr/bin/env -S python3 -B
+"""
+ls_stage.py — list staged files and their header-declared install metadata.
+
+Header line format (first line of each file):
+  <owner> <permissions> <write_file_name> <target_directory_path>
+
+- owner:               username string (need not exist until install time)
+- permissions:         four octal digits, e.g. 0644
+- write_file_name:     '.' means use the read file's basename, else use the given POSIX filename
+- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound)
+
+Output formats:
+- list (default):  "read_file_path: owner permissions write_file_name target_directory_path"
+- table:           columns aligned for readability
+"""
+
+from __future__ import annotations
+
+# never write bytecode (root/sudo friendly)
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from dataclasses import dataclass
+from pathlib import Path
+import argparse
+import re
+
+# === Stage utilities (importable) ===
+
+def stage_read_file_paths(stage_root: Path)-> list[Path]:
+  """Given:   stage_root directory.
+     Does:    recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root.
+     Returns: list[Path] of POSIX-order sorted relative paths (no leading slash).
+  """
+  rels: list[Path] = []
+  for p in stage_root.rglob("*"):
+    try:
+      if p.is_file():  # follows symlink-to-file
+        rels.append(p.relative_to(stage_root))
+    except (FileNotFoundError ,RuntimeError):
+      # broken link or race; skip conservatively
+      continue
+  return sorted(rels ,key=lambda x: x.as_posix())
+
+@dataclass
+class StageRow:
+  read_rel: Path                 # e.g. Path("etc/unbound/unbound.conf.staged")
+  owner: str                     # token[0]
+  perm_octal_str: str            # token[1], exactly as in header (validated ####)
+  perm_int: int                  # token[1] parsed as base-8
+  write_name: str                # token[2] ('.' resolved to read_rel.name)
+  target_dir: Path               # token[3] (Path)
+  header_raw: str                # original header line (sans newline)
+
+    # convenience
+  def write_abs(self ,root: Path)-> Path:
+    return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name
+
+# header parsing rules
+_PERM_RE = re.compile(r"^[0-7]{4}$")
+
+def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]:
+  """Given:   raw first line of a staged file and its stage-relative path.
+     Does:    parse '<owner> <perm> <write_name> <target_dir>' with max 4 tokens (target_dir may contain spaces if quoted not required).
+     Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem.
+  """
+  # strip BOM and trailing newline/spaces
+  h = header.lstrip("\ufeff").strip()
+  if not h:
+    return None ,f"empty header line in {read_rel}"
+  parts = h.split(maxsplit=3)
+  if len(parts) != 4:
+    return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}"
+  owner ,perm_s ,write_name ,target_dir_s = parts
+
+  if not _PERM_RE.fullmatch(perm_s):
+    return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits"
+
+  # resolve '.' → basename
+  resolved_write_name = read_rel.name if write_name == "." else write_name
+
+  # MVP guard: write_name should be a single filename (no '/')
+  if "/" in resolved_write_name:
+    return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}"
+
+  # target dir may be absolute (recommended) or relative (we treat relative as under the install root)
+  target_dir = Path(target_dir_s)
+
+  try:
+    row = StageRow(
+      read_rel = read_rel
+      ,owner = owner
+      ,perm_octal_str = perm_s
+      ,perm_int = int(perm_s ,8)
+      ,write_name = resolved_write_name
+      ,target_dir = target_dir
+      ,header_raw = h
+    )
+    return row ,None
+  except Exception as e:
+    return None ,f"internal parse error in {read_rel}: {e}"
+
+def read_first_line(p: Path)-> str:
+  """Return the first line (sans newline). UTF-8 with BOM tolerant."""
+  with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh:
+    line = fh.readline()
+  return line.rstrip("\n\r")
+
+def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]:
+  """Given:   stage_root.
+     Does:    enumerate files, parse each header line, collect rows and errors.
+     Returns: (rows, errors)
+  """
+  rows: list[StageRow] = []
+  errs: list[str] = []
+  for rel in stage_read_file_paths(stage_root):
+    abs_path = stage_root / rel
+    try:
+      header = read_first_line(abs_path)
+    except Exception as e:
+      errs.append(f"read error in {rel}: {e}")
+      continue
+    row ,err = parse_stage_header_line(header ,rel)
+    if err:
+      errs.append(err)
+    else:
+      rows.append(row)  # type: ignore[arg-type]
+  return rows ,errs
+
+# === Printers ===
+
+def print_list(rows: list[StageRow])-> None:
+  """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line."""
+  for r in rows:
+    print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}")
+
+def print_table(rows: list[StageRow])-> None:
+  """Aligned table printer (no headers, just data in columns)."""
+  if not rows:
+    return
+  a = [r.read_rel.as_posix() for r in rows]
+  b = [r.owner for r in rows]
+  c = [r.perm_octal_str for r in rows]
+  d = [r.write_name for r in rows]
+  e = [str(r.target_dir) for r in rows]
+  wa = max(len(s) for s in a)
+  wb = max(len(s) for s in b)
+  wc = max(len(s) for s in c)
+  wd = max(len(s) for s in d)
+  # e (target_dir) left ragged
+  for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
+    print(f"{sa:<{wa}}  {sb:<{wb}}  {sc:<{wc}}  {sd:<{wd}}  {se}")
+
+# === Orchestrator ===
+
+def ls_stage(stage_root: Path ,fmt: str="list")-> int:
+  """Given:   stage_root and output format ('list'|'table').
+     Does:    scan and parse staged files, print in the requested format; report syntax errors to stderr.
+     Returns: 0 on success; 1 if any syntax errors were encountered.
+  """
+  rows ,errs = scan_stage(stage_root)
+  if fmt == "table":
+    print_table(rows)
+  else:
+    print_list(rows)
+  if errs:
+    print("\nerror(s):" ,file=sys.stderr)
+    for e in errs:
+      print(f"  - {e}" ,file=sys.stderr)
+    return 1
+  return 0
+
+# === CLI ===
+
+def main(argv: list[str] | None=None)-> int:
+  ap = argparse.ArgumentParser(
+    prog="ls_stage.py"
+    ,description="List staged files and their header-declared install metadata."
+  )
+  ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
+  ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list"
+                 ,help="output format (default: list)")
+  args = ap.parse_args(argv)
+  stage_root = Path(args.stage)
+  if not stage_root.exists() or not stage_root.is_dir():
+    print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
+    return 2
+  return ls_stage(stage_root ,fmt=args.format)
+
+if __name__ == "__main__":
+  sys.exit(main())
diff --git a/developer/source/DNS/plan_show.py b/developer/source/DNS/plan_show.py
new file mode 100755 (executable)
index 0000000..780d34a
--- /dev/null
@@ -0,0 +1,273 @@
+#!/usr/bin/env -S python3 -B
+"""
+plan_show.py — build and display a staged plan (UNPRIVILEGED).
+
+Given: a stage directory of config scripts (*.stage.py by default).
+Does:  executes each script with a pre-created Planner (P) and PlannerContext,
+       aggregates Commands into a single Journal, by default prints from the CBOR
+       round-trip (encode→decode) so the human view matches what will be shipped
+       to stage_cp; runs well-formed (WF) invariant checks. Can emit CBOR if requested.
+Returns: exit status 0 on success; 2 on WF errors or usage errors.
+"""
+
+from __future__ import annotations
+
+# no bytecode anywhere
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse
+import datetime as _dt
+import getpass
+import runpy
+
+# local module (same dir): Planner
+from Planner import Planner ,PlannerContext ,Journal ,Command
+
+# ===== Utilities (general / reusable) =====
+
+def iso_utc_now_str()-> str:
+  "Given n/a. Does return compact UTC timestamp. Returns YYYYMMDDTHHMMSSZ."
+  return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+
+def find_configs(stage_root_dpath: Path ,glob_pat_str: str)-> list[Path]:
+  "Given stage root and glob. Does find matching files under stage. Returns list of absolute Paths."
+  root = stage_root_dpath.resolve()
+  return sorted((p for p in root.glob(glob_pat_str) if p.is_file()) ,key=lambda p: p.as_posix())
+
+def human_size(n: int)-> str:
+  "Given byte count. Does format human size. Returns string."
+  units = ["B","KB","MB","GB","TB"]
+  i = 0
+  x = float(max(0 ,n))
+  while x >= 1024 and i < len(units)-1:
+    x /= 1024.0
+    i += 1
+  return f"{x:.1f} {units[i]}"
+
+def _dst_path_str(args_map: dict)-> str:
+  "Given args map. Does join write_file path. Returns POSIX path or '?'."
+  d = args_map.get("write_file_dpath_str") or ""
+  f = args_map.get("write_file_fname_str") or ""
+  try:
+    if d and f and "/" not in f:
+      return (Path(d)/f).as_posix()
+  except Exception:
+    pass
+  return "?"
+
+# ===== WF invariants (MVP) =====
+# These are “well-formedness” rules (shape/encoding/domain), not policy or privilege checks.
+
+def wf_check(journal: Journal)-> list[str]:
+  """
+  Given Journal. Does run invariant checks on meta and each Command entry. Returns list of error strings.
+  Invariants (MVP):
+    - meta_map: must include generator identity and stage_root_dpath_str.
+    - entry.op ∈ {'copy','displace','delete'}
+    - all ops: write_file_dpath_str absolute; write_file_fname_str is bare filename.
+    - copy: owner_name_str non-empty; mode_int ∈ [0..0o7777] and no suid/sgid; content_bytes present (bytes).
+  """
+  errs: list[str] = []
+  meta = journal.meta_map or {}
+
+  # meta presence (light placeholder)
+  if not isinstance(meta ,dict):
+    errs.append("WF_META: meta_map must be a map")
+  else:
+    if not meta.get("stage_root_dpath_str"):
+      errs.append("WF_META: missing stage_root_dpath_str")
+    if not meta.get("generator_prog_str"):
+      errs.append("WF_META: missing generator_prog_str")
+
+  # entries
+  for idx ,cmd in enumerate(journal.commands_list ,1):
+    prefix = f"WF[{idx:02d}]"
+    if not isinstance(cmd ,Command):
+      errs.append(f"{prefix}: entry is not Command")
+      continue
+    op = cmd.name_str
+    if op not in {"copy","displace","delete"}:
+      errs.append(f"{prefix}: unknown op '{op}'")
+      continue
+    am = cmd.args_map or {}
+    dpath = am.get("write_file_dpath_str")
+    fname = am.get("write_file_fname_str")
+    if not isinstance(dpath ,str) or not dpath.startswith("/"):
+      errs.append(f"{prefix}: write_file_dpath_str must be absolute")
+    if not isinstance(fname ,str) or not fname or "/" in fname:
+      errs.append(f"{prefix}: write_file_fname_str must be a bare filename")
+
+    if op == "copy":
+      owner = am.get("owner_name_str")
+      mode  = am.get("mode_int")
+      data  = am.get("content_bytes")
+      if not isinstance(owner ,str) or not owner.strip():
+        errs.append(f"{prefix}: owner_name_str must be non-empty")
+      if not isinstance(mode ,int) or not (0 <= mode <= 0o7777):
+        errs.append(f"{prefix}: mode_int must be int in [0..0o7777]")
+      elif (mode & 0o6000):
+        errs.append(f"{prefix}: mode_int suid/sgid not allowed in MVP")
+      if not isinstance(data ,(bytes,bytearray)):
+        errs.append(f"{prefix}: content_bytes must be bytes")
+  return errs
+
+# ===== Planner execution =====
+
+def _run_one_config(config_abs_fpath: Path ,stage_root_dpath: Path)-> Planner:
+  """
+  Given abs path to a config script and stage root. Does construct a PlannerContext and Planner,
+  then executes the script with 'P' (Planner instance) bound in globals. Returns Planner with Journal.
+  Notes:
+    - Defaults are intentionally spartan; config should refine them via P.set_context(...).
+    - This is UNPRIVILEGED; no filesystem changes are performed here.
+  """
+  read_rel = config_abs_fpath.resolve().relative_to(stage_root_dpath.resolve())
+  ctx = PlannerContext.from_values(
+    stage_root_dpath=stage_root_dpath
+    ,read_file_rel_fpath=read_rel
+    ,write_file_dpath_str="/"
+    ,write_file_fname_str="."
+    ,owner_name_str=getpass.getuser()
+    ,perm=0o644
+    ,content=None
+  )
+  P = Planner(ctx)
+  g = {"Planner": Planner ,"PlannerContext": PlannerContext ,"P": P}
+  runpy.run_path(str(config_abs_fpath) ,init_globals=g)
+  return P
+
+def _aggregate_journal(planners_list: list[Planner] ,stage_root_dpath: Path)-> Journal:
+  "Given planners and stage root. Does aggregate Commands into a single Journal with meta. Returns Journal."
+  J = Journal()
+  J.set_meta(
+    version_int=1
+    ,generator_prog_str="plan_show.py"
+    ,generated_at_utc_str=iso_utc_now_str()
+    ,user_name_str=getpass.getuser()
+    ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown"
+    ,stage_root_dpath_str=str(stage_root_dpath.resolve())
+    ,configs_list=[p.context().read_file_rel_fpath.as_posix() for p in planners_list]
+  )
+  for p in planners_list:
+    for cmd in p.journal().commands_list:
+      J.append(cmd)
+  return J
+
+def _print_plan(journal: Journal)-> None:
+  "Given Journal. Does print a readable summary. Returns None."
+  meta = journal.meta_map or {}
+  print(f"Stage: {meta.get('stage_root_dpath_str','?')}")
+  print(f"Generated: {meta.get('generated_at_utc_str','?')} by {meta.get('user_name_str','?')}@{meta.get('host_name_str','?')}\n")
+
+  entries = journal.commands_list
+  if not entries:
+    print("(plan is empty)")
+    return
+
+  n_copy = sum(1 for c in entries if c.name_str=="copy")
+  n_disp = sum(1 for c in entries if c.name_str=="displace")
+  n_del  = sum(1 for c in entries if c.name_str=="delete")
+  print(f"Entries: {len(entries)}  copy:{n_copy}  displace:{n_disp}  delete:{n_del}\n")
+
+  for i ,cmd in enumerate(entries ,1):
+    am = cmd.args_map
+    dst = _dst_path_str(am)
+    if cmd.name_str == "copy":
+      size = len(am.get("content_bytes") or b"")
+      mode = am.get("mode_int")
+      owner = am.get("owner_name_str")
+      print(f"{i:02d}. copy     -> {dst}  mode {mode:04o} owner {owner} bytes {size} ({human_size(size)})")
+    elif cmd.name_str == "displace":
+      print(f"{i:02d}. displace -> {dst}")
+    elif cmd.name_str == "delete":
+      print(f"{i:02d}. delete   -> {dst}")
+    else:
+      print(f"{i:02d}. ?op?     -> {dst}")
+
+def _maybe_emit_CBOR(journal: Journal ,emit_CBOR_fpath: Path|None)-> None:
+  "Given Journal and optional path. Does write CBOR if requested. Returns None."
+  if not emit_CBOR_fpath:
+    return
+  try:
+    data = journal.to_CBOR_bytes(canonical_bool=True)
+  except Exception as e:
+    print(f"error: CBOR encode failed: {e}" ,file=sys.stderr)
+    raise
+  emit_CBOR_fpath.parent.mkdir(parents=True ,exist_ok=True)
+  with open(emit_CBOR_fpath ,"wb") as fh:
+    fh.write(data)
+  print(f"\nWrote CBOR plan: {emit_CBOR_fpath}  ({len(data)} bytes)")
+
+# ===== CLI =====
+
+def main(argv: list[str]|None=None)-> int:
+  "Given CLI. Does discover configs, build plan, (optionally) CBOR round-trip before printing, run WF, optionally emit CBOR. Returns exit code."
+  ap = argparse.ArgumentParser(prog="plan_show.py"
+   ,description="Build and show a staged plan (no privilege, no apply).")
+  ap.add_argument("--stage",default="stage",help="stage directory root (default: ./stage)")
+  ap.add_argument("--glob",default="**/*.stage.py",help="glob for config scripts under --stage")
+  ap.add_argument("--emit-CBOR",default=None,help="write CBOR plan to this path (optional)")
+  ap.add_argument("--print-from-journal",action="store_true"
+                 ,help="print directly from in-memory Journal (skip CBOR round-trip)")
+  args = ap.parse_args(argv)
+
+  stage_root_dpath = Path(args.stage)
+  if not stage_root_dpath.is_dir():
+    print(f"error: --stage not a directory: {stage_root_dpath}" ,file=sys.stderr)
+    return 2
+
+  configs = find_configs(stage_root_dpath ,args.glob)
+  if not configs:
+    print("No config scripts found.")
+    return 0
+
+  planners: list[Planner] = []
+  for cfg in configs:
+    try:
+      planners.append(_run_one_config(cfg ,stage_root_dpath))
+    except SystemExit:
+      raise
+    except Exception as e:
+      print(f"error: executing {cfg}: {e}" ,file=sys.stderr)
+      return 2
+
+  journal_src = _aggregate_journal(planners ,stage_root_dpath)
+
+  if not args.print_from_journal:
+    try:
+      cbor_bytes = journal_src.to_CBOR_bytes(canonical_bool=True)
+      journal = Journal.from_CBOR_bytes(cbor_bytes)
+    except Exception as e:
+      print(f"error: CBOR round-trip failed: {e}" ,file=sys.stderr)
+      return 2
+  else:
+    journal = journal_src
+
+  _print_plan(journal)
+
+  errs = wf_check(journal)
+  if errs:
+    print("\nerror(s):" ,file=sys.stderr)
+    for e in errs:
+      print(f"  - {e}" ,file=sys.stderr)
+    return 2
+
+  emit = Path(args.emit_CBOR) if args.emit_CBOR else None
+  if emit:
+    try:
+      data = (cbor_bytes if not args.print_from_journal else journal_src.to_CBOR_bytes(canonical_bool=True))
+      emit.parent.mkdir(parents=True ,exist_ok=True)
+      with open(emit ,"wb") as fh:
+        fh.write(data)
+      print(f"\nWrote CBOR plan: {emit}  ({len(data)} bytes)")
+    except Exception as e:
+      print(f"error: failed to write CBOR: {e}" ,file=sys.stderr)
+      return 2
+
+  return 0
+
+if __name__ == "__main__":
+  sys.exit(main())
diff --git a/developer/source/DNS/stage_cp.py b/developer/source/DNS/stage_cp.py
new file mode 100644 (file)
index 0000000..0114853
--- /dev/null
@@ -0,0 +1,322 @@
+#!/usr/bin/env -S python3 -B
+"""
+stage_cp.py — build a CBOR plan from staged configs; show, validate, and apply with privilege.
+
+Given: a stage root directory.
+Does:  (user) run configs → build native plan → WF checks → summarize → encode plan → sudo re-exec
+       (root) decode plan → VALID + SANITY → apply ops (displace/copy/delete) safely.
+Returns: exit code.
+
+Requires: pip install cbor2
+"""
+from __future__ import annotations
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,tempfile ,subprocess ,pwd
+from typing import Any
+import cbor2
+
+# ---------- small utils ----------
+
+def _load_stage_module(stage_root_dpath: Path):
+  "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
+  mod_fpath = stage_root_dpath/"Stage.py"
+  if not mod_fpath.exists():
+    raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
+  spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
+  mod = importlib.util.module_from_spec(spec)
+  sys.modules["Stage"] = mod
+  assert spec and spec.loader
+  spec.loader.exec_module(mod)  # type: ignore
+  return mod
+
+def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
+  "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
+  rel_fpath_list: list[Path] = []
+  for p in stage_root_dpath.rglob("*.py"):
+    if p.name == "Stage.py": continue
+    if p.is_file():
+      rel_fpath_list.append(p.relative_to(stage_root_dpath))
+  return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
+
+def _sha256_bytes(b: bytes)-> bytes:
+  "Given: bytes. Does: sha256. Returns: 32-byte digest."
+  return hashlib.sha256(b).digest()
+
+def _dst_fpath_str(dst_dpath_str: str ,dst_fname_str: str)-> str:
+  "Given: a directory path string and a filename string. Does: join. Returns: combined POSIX path string."
+  if "/" in dst_fname_str:
+    return ""  # invalid; WF will flag
+  return str((Path(dst_dpath_str)/dst_fname_str))
+
+# ---------- WF / VALID / SANITY ----------
+
+_ALLOWLIST_PREFIXES_LIST = ["/etc" ,"/usr/local" ,"/etc/systemd/system"]
+
+def wf_check(plan_map: dict[str,Any])-> list[str]:
+  "Given: plan map. Does: shape/lexical checks only. Returns: list of error strings."
+  errs_list: list[str] = []
+  if plan_map.get("version_int") != 1:
+    errs_list.append("WF_VERSION: unsupported plan version")
+  entries_list = plan_map.get("entries_list")
+  if not isinstance(entries_list ,list):
+    errs_list.append("WF_ENTRIES: 'entries_list' missing or not a list")
+    return errs_list
+  for i ,e_map in enumerate(entries_list ,1):
+    op = e_map.get("op")
+    dst_dpath_str = e_map.get("dst_dpath")
+    dst_fname_str = e_map.get("dst_fname")
+    where = f"entry {i}"
+    if op not in ("copy","displace","delete"):
+      errs_list.append(f"WF_OP:{where}: invalid op {op!r}")
+      continue
+    if not isinstance(dst_dpath_str ,str) or not dst_dpath_str:
+      errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath missing or not str")
+    if not isinstance(dst_fname_str ,str) or not dst_fname_str:
+      errs_list.append(f"WF_DST_FNAME:{where}: dst_fname missing or not str")
+    if isinstance(dst_fname_str ,str) and "/" in dst_fname_str:
+      errs_list.append(f"WF_DST_FNAME:{where}: dst_fname must not contain '/'")
+    if isinstance(dst_dpath_str ,str) and not dst_dpath_str.startswith("/"):
+      errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath must be absolute")
+    full_fpath_str = _dst_fpath_str(dst_dpath_str or "" ,dst_fname_str or "")
+    if not full_fpath_str or not full_fpath_str.startswith("/"):
+      errs_list.append(f"WF_PATH:{where}: failed to construct absolute path from dst_dpath/fname")
+    if op == "copy":
+      mode_int = e_map.get("mode_int")
+      if not isinstance(mode_int ,int) or not (0 <= mode_int <= 0o7777):
+        errs_list.append(f"WF_MODE:{where}: mode_int must be int in [0..0o7777]")
+      if isinstance(mode_int ,int) and (mode_int & 0o6000):
+        errs_list.append(f"WF_MODE:{where}: suid/sgid bits not allowed in MVP")
+      owner_name = e_map.get("owner_name")
+      if not isinstance(owner_name ,str) or not owner_name:
+        errs_list.append(f"WF_OWNER:{where}: owner_name must be non-empty username string")
+      content_bytes = e_map.get("content_bytes")
+      if not (isinstance(content_bytes ,(bytes,bytearray)) and len(content_bytes) >= 0):
+        errs_list.append(f"WF_CONTENT:{where}: content_bytes must be bytes (may be empty)")
+      sha = e_map.get("sha256_bytes")
+      if sha is not None:
+        if not isinstance(sha ,(bytes,bytearray)) or len(sha)!=32:
+          errs_list.append(f"WF_SHA256:{where}: sha256_bytes must be 32-byte digest if present")
+        elif isinstance(content_bytes ,(bytes,bytearray)) and sha != _sha256_bytes(content_bytes):
+          errs_list.append(f"WF_SHA256_MISMATCH:{where}: sha256_bytes does not match content_bytes")
+  return errs_list
+
+def valid_check(plan_map: dict[str,Any])-> list[str]:
+  "Given: plan map. Does: environment (read-only) checks. Returns: list of error strings."
+  errs_list: list[str] = []
+  for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
+    op = e_map.get("op")
+    dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
+    where = f"entry {i}"
+    try:
+      parent_dpath = Path(dst_fpath_str).parent
+      if not parent_dpath.exists():
+        errs_list.append(f"VAL_PARENT_MISSING:{where}: parent dir does not exist: {parent_dpath}")
+      elif not parent_dpath.is_dir():
+        errs_list.append(f"VAL_PARENT_NOT_DIR:{where}: parent is not a directory: {parent_dpath}")
+      if Path(dst_fpath_str).is_dir():
+        errs_list.append(f"VAL_DST_IS_DIR:{where}: destination exists as a directory: {dst_fpath_str}")
+      if op == "copy":
+        owner_name = e_map.get("owner_name")
+        try:
+          pw = pwd.getpwnam(owner_name)  # may raise KeyError
+          e_map["_resolved_uid_int"] = pw.pw_uid
+          e_map["_resolved_gid_int"] = pw.pw_gid
+        except Exception:
+          errs_list.append(f"VAL_OWNER_UNKNOWN:{where}: user not found: {owner_name!r}")
+    except Exception as x:
+      errs_list.append(f"VAL_EXCEPTION:{where}: {x}")
+  return errs_list
+
+def sanity_check(plan_map: dict[str,Any])-> list[str]:
+  "Given: plan map. Does: policy checks (allowlist, denials). Returns: list of error strings."
+  errs_list: list[str] = []
+  for i ,e_map in enumerate(plan_map.get("entries_list",[]) ,1):
+    dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
+    where = f"entry {i}"
+    if not any(dst_fpath_str.startswith(pref + "/") or dst_fpath_str==pref for pref in _ALLOWLIST_PREFIXES_LIST):
+      errs_list.append(f"POL_PATH_DENY:{where}: destination outside allowlist: {dst_fpath_str}")
+  return errs_list
+
+# ---------- APPLY (root) ----------
+
+def _utc_str()-> str:
+  "Given: n/a. Does: current UTC compact. Returns: string."
+  import datetime as _dt
+  return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+
+def _ensure_parent_dirs(dst_fpath: Path)-> None:
+  "Given: destination file path. Does: create parents. Returns: None."
+  dst_fpath.parent.mkdir(parents=True ,exist_ok=True)
+
+def _displace_in_place(dst_fpath: Path)-> None:
+  "Given: destination file path. Does: rename existing file/symlink to add UTC suffix. Returns: None."
+  try:
+    if dst_fpath.exists() or dst_fpath.is_symlink():
+      suffix = "_" + _utc_str()
+      dst_fpath.rename(dst_fpath.with_name(dst_fpath.name + suffix))
+  except FileNotFoundError:
+    pass
+
+def _apply_copy(dst_fpath: Path ,content_bytes: bytes ,mode_int: int ,uid_int: int ,gid_int: int)-> None:
+  "Given: target, bytes, mode, uid, gid. Does: write temp, fsync, chmod/chown, atomic replace. Returns: None."
+  _ensure_parent_dirs(dst_fpath)
+  _displace_in_place(dst_fpath)
+  tmp_fpath = dst_fpath.with_name("." + dst_fpath.name + ".stage_tmp")
+  with open(tmp_fpath ,"wb") as fh:
+    fh.write(content_bytes)
+    fh.flush()
+    os.fsync(fh.fileno())
+  try:
+    os.chmod(tmp_fpath ,mode_int & 0o777)
+  except Exception:
+    pass
+  try:
+    os.chown(tmp_fpath ,uid_int ,gid_int)
+  except Exception:
+    pass
+  os.replace(tmp_fpath ,dst_fpath)  # atomic within same dir/device
+
+def _apply_delete(dst_fpath: Path)-> None:
+  "Given: target file path. Does: unlink file/symlink if present. Returns: None."
+  try:
+    if dst_fpath.is_symlink() or dst_fpath.is_file():
+      dst_fpath.unlink()
+  except FileNotFoundError:
+    pass
+
+def apply_plan(plan_map: dict[str,Any] ,dry_run_bool: bool=False)-> int:
+  "Given: plan map and dry flag. Does: execute ops sequentially. Returns: exit code."
+  for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
+    op = e_map.get("op")
+    dst_fpath = Path(_dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")))
+    if op == "displace":
+      print(f"+ displace {dst_fpath}")
+      if not dry_run_bool:
+        _displace_in_place(dst_fpath)
+    elif op == "delete":
+      print(f"+ delete   {dst_fpath}")
+      if not dry_run_bool:
+        _apply_delete(dst_fpath)
+    elif op == "copy":
+      mode_int = e_map.get("mode_int") or 0o644
+      uid_int  = e_map.get("_resolved_uid_int" ,0)
+      gid_int  = e_map.get("_resolved_gid_int" ,0)
+      content_bytes = e_map.get("content_bytes") or b""
+      print(f"+ copy     {dst_fpath}  mode {mode_int:04o} uid {uid_int} gid {gid_int} bytes {len(content_bytes)}")
+      if not dry_run_bool:
+        _apply_copy(dst_fpath ,content_bytes ,mode_int ,uid_int ,gid_int)
+    else:
+      print(f"! unknown op {op} (skipping)")
+      return 2
+  return 0
+
+# ---------- orchestration ----------
+
+def _build_plan_unpriv(stage_root_dpath: Path)-> dict[str,Any]:
+  "Given: stage root. Does: execute configs, accumulate entries, add sha256. Returns: plan map."
+  StageMod = _load_stage_module(stage_root_dpath)
+  Stage = StageMod.Stage
+  Stage._reset()
+  Stage.set_meta(
+    planner_user_name=getpass.getuser()
+    ,planner_uid_int=os.getuid()
+    ,planner_gid_int=os.getgid()
+    ,host_name=socket.gethostname()
+    ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
+  )
+  for rel_fpath in _config_rel_fpaths(stage_root_dpath):
+    Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
+    runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
+    Stage._end()
+  for e_map in Stage.plan_entries():
+    if e_map.get("op") == "copy" and isinstance(e_map.get("content_bytes") ,(bytes,bytearray)):
+      e_map["sha256_bytes"] = _sha256_bytes(e_map["content_bytes"])
+  return Stage.plan_object()
+
+def _sudo_apply_self(plan_fpath: Path ,dry_run_bool: bool)-> int:
+  "Given: plan file path and dry flag. Does: sudo re-exec current script with --apply. Returns: exit code."
+  cmd_list = ["sudo",sys.executable,os.path.abspath(__file__),"--apply"
+             ,"--plan",str(plan_fpath)]
+  if dry_run_bool:
+    cmd_list.append("--dry-run")
+  return subprocess.call(cmd_list)
+
+def main(argv: list[str]|None=None)-> int:
+  "Given: CLI. Does: plan, WF (user) then VALID+SANITY+APPLY (root). Returns: exit code."
+  ap = argparse.ArgumentParser(prog="stage_cp.py"
+   ,description="Plan staged config application and apply with sudo.")
+  ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
+  ap.add_argument("--dry-run",action="store_true",help="validate and show actions, do not change files")
+  ap.add_argument("--apply",action="store_true",help=argparse.SUPPRESS)     # internal (root path)
+  ap.add_argument("--plan",default=None,help=argparse.SUPPRESS)             # internal (root path)
+  args = ap.parse_args(argv)
+
+  # Root path (apply)
+  if args.apply:
+    if os.geteuid() != 0:
+      print("error: --apply requires root" ,file=sys.stderr)
+      return 2
+    if not args.plan:
+      print("error: --plan path required for --apply" ,file=sys.stderr)
+      return 2
+    with open(args.plan ,"rb") as fh:
+      plan_map = cbor2.load(fh)
+    val_errs = valid_check(plan_map)
+    pol_errs = sanity_check(plan_map)
+    if val_errs or pol_errs:
+      print("error(s) during validation/sanity:" ,file=sys.stderr)
+      for e in val_errs: print(f"  - {e}" ,file=sys.stderr)
+      for e in pol_errs: print(f"  - {e}" ,file=sys.stderr)
+      return 2
+    rc = apply_plan(plan_map ,dry_run_bool=args.dry_run)
+    return rc
+
+  # User path (plan + summarize + escalate)
+  stage_root_dpath = Path(args.stage)
+  plan_map = _build_plan_unpriv(stage_root_dpath)
+
+  entries_list = plan_map.get("entries_list" ,[])
+  print(f"Built plan with {len(entries_list)} entr{'y' if len(entries_list)==1 else 'ies'}")
+
+  total_bytes_int = sum(len(e_map.get("content_bytes") or b"")
+                        for e_map in entries_list if e_map.get("op")=="copy")
+  print(f"Total bytes to write: {total_bytes_int}")
+  if args.dry_run:
+    print("\n--dry-run: would perform the following:")
+
+  for i ,e_map in enumerate(entries_list ,1):
+    op = e_map.get("op")
+    dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath") ,e_map.get("dst_fname"))
+    if op=="copy":
+      mode_int = e_map.get("mode_int") or 0o644
+      owner_name = e_map.get("owner_name") or "?"
+      size = len(e_map.get("content_bytes") or b"")
+      print(f"{i:02d}. copy     -> {dst_fpath_str}  mode {mode_int:04o} owner {owner_name} bytes {size}")
+    elif op=="displace":
+      print(f"{i:02d}. displace -> {dst_fpath_str}")
+    elif op=="delete":
+      print(f"{i:02d}. delete   -> {dst_fpath_str}")
+    else:
+      print(f"{i:02d}. ?op?     -> {dst_fpath_str}")
+
+  with tempfile.NamedTemporaryFile(prefix="plan_" ,suffix=".cbor" ,delete=False) as tf:
+    cbor2.dump(plan_map ,tf)
+    plan_fpath = Path(tf.name)
+  try:
+    if args.dry_run:
+      return _sudo_apply_self(plan_fpath ,dry_run_bool=True)
+    ans = input("\nProceed with apply under sudo? [y/N] ").strip().lower()
+    if ans not in ("y","yes"):
+      print("Aborted.")
+      return 0
+    return _sudo_apply_self(plan_fpath ,dry_run_bool=False)
+  finally:
+    try: os.unlink(plan_fpath)
+    except Exception: pass
+
+if __name__ == "__main__":
+  sys.exit(main())
diff --git a/developer/source/DNS/stage_ls.py b/developer/source/DNS/stage_ls.py
deleted file mode 100755 (executable)
index 93dd3d2..0000000
+++ /dev/null
@@ -1,193 +0,0 @@
-#!/usr/bin/env -S python3 -B
-"""
-ls_stage.py — list staged files and their header-declared install metadata.
-
-Header line format (first line of each file):
-  <owner> <permissions> <write_file_name> <target_directory_path>
-
-- owner:               username string (need not exist until install time)
-- permissions:         four octal digits, e.g. 0644
-- write_file_name:     '.' means use the read file's basename, else use the given POSIX filename
-- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound)
-
-Output formats:
-- list (default):  "read_file_path: owner permissions write_file_name target_directory_path"
-- table:           columns aligned for readability
-"""
-
-from __future__ import annotations
-
-# never write bytecode (root/sudo friendly)
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from dataclasses import dataclass
-from pathlib import Path
-import argparse
-import re
-
-# === Stage utilities (importable) ===
-
-def stage_read_file_paths(stage_root: Path)-> list[Path]:
-  """Given:   stage_root directory.
-     Does:    recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root.
-     Returns: list[Path] of POSIX-order sorted relative paths (no leading slash).
-  """
-  rels: list[Path] = []
-  for p in stage_root.rglob("*"):
-    try:
-      if p.is_file():  # follows symlink-to-file
-        rels.append(p.relative_to(stage_root))
-    except (FileNotFoundError ,RuntimeError):
-      # broken link or race; skip conservatively
-      continue
-  return sorted(rels ,key=lambda x: x.as_posix())
-
-@dataclass
-class StageRow:
-  read_rel: Path                 # e.g. Path("etc/unbound/unbound.conf.staged")
-  owner: str                     # token[0]
-  perm_octal_str: str            # token[1], exactly as in header (validated ####)
-  perm_int: int                  # token[1] parsed as base-8
-  write_name: str                # token[2] ('.' resolved to read_rel.name)
-  target_dir: Path               # token[3] (Path)
-  header_raw: str                # original header line (sans newline)
-
-    # convenience
-  def write_abs(self ,root: Path)-> Path:
-    return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name
-
-# header parsing rules
-_PERM_RE = re.compile(r"^[0-7]{4}$")
-
-def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]:
-  """Given:   raw first line of a staged file and its stage-relative path.
-     Does:    parse '<owner> <perm> <write_name> <target_dir>' with max 4 tokens (target_dir may contain spaces if quoted not required).
-     Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem.
-  """
-  # strip BOM and trailing newline/spaces
-  h = header.lstrip("\ufeff").strip()
-  if not h:
-    return None ,f"empty header line in {read_rel}"
-  parts = h.split(maxsplit=3)
-  if len(parts) != 4:
-    return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}"
-  owner ,perm_s ,write_name ,target_dir_s = parts
-
-  if not _PERM_RE.fullmatch(perm_s):
-    return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits"
-
-  # resolve '.' → basename
-  resolved_write_name = read_rel.name if write_name == "." else write_name
-
-  # MVP guard: write_name should be a single filename (no '/')
-  if "/" in resolved_write_name:
-    return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}"
-
-  # target dir may be absolute (recommended) or relative (we treat relative as under the install root)
-  target_dir = Path(target_dir_s)
-
-  try:
-    row = StageRow(
-      read_rel = read_rel
-      ,owner = owner
-      ,perm_octal_str = perm_s
-      ,perm_int = int(perm_s ,8)
-      ,write_name = resolved_write_name
-      ,target_dir = target_dir
-      ,header_raw = h
-    )
-    return row ,None
-  except Exception as e:
-    return None ,f"internal parse error in {read_rel}: {e}"
-
-def read_first_line(p: Path)-> str:
-  """Return the first line (sans newline). UTF-8 with BOM tolerant."""
-  with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh:
-    line = fh.readline()
-  return line.rstrip("\n\r")
-
-def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]:
-  """Given:   stage_root.
-     Does:    enumerate files, parse each header line, collect rows and errors.
-     Returns: (rows, errors)
-  """
-  rows: list[StageRow] = []
-  errs: list[str] = []
-  for rel in stage_read_file_paths(stage_root):
-    abs_path = stage_root / rel
-    try:
-      header = read_first_line(abs_path)
-    except Exception as e:
-      errs.append(f"read error in {rel}: {e}")
-      continue
-    row ,err = parse_stage_header_line(header ,rel)
-    if err:
-      errs.append(err)
-    else:
-      rows.append(row)  # type: ignore[arg-type]
-  return rows ,errs
-
-# === Printers ===
-
-def print_list(rows: list[StageRow])-> None:
-  """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line."""
-  for r in rows:
-    print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}")
-
-def print_table(rows: list[StageRow])-> None:
-  """Aligned table printer (no headers, just data in columns)."""
-  if not rows:
-    return
-  a = [r.read_rel.as_posix() for r in rows]
-  b = [r.owner for r in rows]
-  c = [r.perm_octal_str for r in rows]
-  d = [r.write_name for r in rows]
-  e = [str(r.target_dir) for r in rows]
-  wa = max(len(s) for s in a)
-  wb = max(len(s) for s in b)
-  wc = max(len(s) for s in c)
-  wd = max(len(s) for s in d)
-  # e (target_dir) left ragged
-  for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
-    print(f"{sa:<{wa}}  {sb:<{wb}}  {sc:<{wc}}  {sd:<{wd}}  {se}")
-
-# === Orchestrator ===
-
-def ls_stage(stage_root: Path ,fmt: str="list")-> int:
-  """Given:   stage_root and output format ('list'|'table').
-     Does:    scan and parse staged files, print in the requested format; report syntax errors to stderr.
-     Returns: 0 on success; 1 if any syntax errors were encountered.
-  """
-  rows ,errs = scan_stage(stage_root)
-  if fmt == "table":
-    print_table(rows)
-  else:
-    print_list(rows)
-  if errs:
-    print("\nerror(s):" ,file=sys.stderr)
-    for e in errs:
-      print(f"  - {e}" ,file=sys.stderr)
-    return 1
-  return 0
-
-# === CLI ===
-
-def main(argv: list[str] | None=None)-> int:
-  ap = argparse.ArgumentParser(
-    prog="ls_stage.py"
-    ,description="List staged files and their header-declared install metadata."
-  )
-  ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
-  ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list"
-                 ,help="output format (default: list)")
-  args = ap.parse_args(argv)
-  stage_root = Path(args.stage)
-  if not stage_root.exists() or not stage_root.is_dir():
-    print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
-    return 2
-  return ls_stage(stage_root ,fmt=args.format)
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/stage_show_plan.py b/developer/source/DNS/stage_show_plan.py
new file mode 100644 (file)
index 0000000..075e65b
--- /dev/null
@@ -0,0 +1,97 @@
+#!/usr/bin/env -S python3 -B
+"""
+stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan.
+
+Given: a stage root directory.
+Does:  loads Stage.py, executes each config, builds a native plan map, summarizes it.
+Returns: exit code 0 on success, non-zero on error.
+"""
+from __future__ import annotations
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib
+
+# ---------- helpers ----------
+
+def _load_stage_module(stage_root_dpath: Path):
+  "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
+  mod_fpath = stage_root_dpath/"Stage.py"
+  if not mod_fpath.exists():
+    raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
+  spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
+  mod = importlib.util.module_from_spec(spec)
+  sys.modules["Stage"] = mod
+  assert spec and spec.loader
+  spec.loader.exec_module(mod)  # type: ignore
+  return mod
+
+def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
+  "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
+  rel_fpath_list: list[Path] = []
+  for p in stage_root_dpath.rglob("*.py"):
+    if p.name == "Stage.py": continue
+    if p.is_file():
+      rel_fpath_list.append(p.relative_to(stage_root_dpath))
+  return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
+
+def _sha256_hex(b: bytes)-> str:
+  "Given: bytes. Does: sha256. Returns: hex string."
+  return hashlib.sha256(b).hexdigest()
+
+# ---------- main ----------
+
+def main(argv: list[str]|None=None)-> int:
+  "Given: CLI. Does: show plan. Returns: exit code."
+  ap = argparse.ArgumentParser(prog="stage_show_plan.py"
+   ,description="Run staged config scripts and print the resulting plan.")
+  ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
+  args = ap.parse_args(argv)
+
+  stage_root_dpath = Path(args.stage)
+  StageMod = _load_stage_module(stage_root_dpath)
+  Stage = StageMod.Stage
+  Stage._reset()
+  Stage.set_meta(
+    planner_user_name=getpass.getuser()
+    ,planner_uid_int=os.getuid()
+    ,planner_gid_int=os.getgid()
+    ,host_name=socket.gethostname()
+    ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
+  )
+
+  for rel_fpath in _config_rel_fpaths(stage_root_dpath):
+    Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
+    runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
+    Stage._end()
+
+  plan_map = Stage.plan_object()
+  entries_list = plan_map["entries_list"]
+  print(f"Plan version: {plan_map['version_int']}")
+  print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')}  "
+        f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}")
+  print(f"Created: {plan_map['meta_map'].get('created_utc_str')}")
+  print(f"Entries: {len(entries_list)}\n")
+
+  for i ,e_map in enumerate(entries_list ,1):
+    op = e_map.get("op")
+    dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}"
+    if op == "copy":
+      content = e_map.get("content_bytes") or b""
+      sz = len(content)
+      mode = e_map.get("mode_octal_str") or "????"
+      owner = e_map.get("owner_name") or "?"
+      h = _sha256_hex(content)
+      print(f"{i:02d}. copy     -> {dst_fpath_str}  mode {mode} owner {owner}  bytes {sz} sha256 {h[:16]}…")
+    elif op == "displace":
+      print(f"{i:02d}. displace -> {dst_fpath_str}")
+    elif op == "delete":
+      print(f"{i:02d}. delete   -> {dst_fpath_str}")
+    else:
+      print(f"{i:02d}. ?op?     -> {dst_fpath_str}  ({op})")
+  return 0
+
+if __name__ == "__main__":
+  sys.exit(main())
diff --git a/developer/source/DNS/stage_test_0/example.py b/developer/source/DNS/stage_test_0/example.py
new file mode 100644 (file)
index 0000000..6f2c257
--- /dev/null
@@ -0,0 +1,24 @@
+#!/usr/bin/env -S python3 -B
+import Stage
+
+# You can compute these with arbitrary Python if you like
+svc = "unbound"
+zone = "US"
+fname = f"unbound-{zone}.conf"
+
+Stage.init(
+  write_file_name="."                 # '.' → use basename of this file -> 'example_dns.py'
+, write_file_directory_path="/etc/unbound"
+, write_file_owner="root"
+, write_file_permissions=0o644        # or "0644"
+, read_file_contents="""\
+# generated config (example)
+server:
+  verbosity: 1
+  interface: 127.0.0.1
+"""
+)
+
+# declare the desired operations (no effect in 'noop'/'dry' without a copier)
+Stage.displace()
+Stage.copy()
diff --git a/developer/source/DNS/stage_test_0/stage_ls.py b/developer/source/DNS/stage_test_0/stage_ls.py
new file mode 100644 (file)
index 0000000..1e27c26
--- /dev/null
@@ -0,0 +1,169 @@
+#!/usr/bin/env -S python3 -B
+"""
+stage_ls.py — execute staged Python programs with Stage in 'noop' mode and list metadata.
+
+For each *.py under --stage (recursively, excluding Stage.py), this tool:
+  1) loads Stage.py from the stage root,
+  2) switches mode to 'noop' (no side effects, no printing),
+  3) executes the program via runpy.run_path(...) with the proper __file__,
+  4) collects the resolved write_file_* metadata and declared ops,
+  5) prints either list or aligned table,
+  6) reports any collected errors.
+
+This lets admins compute metadata with arbitrary Python while guaranteeing no writes.
+"""
+
+from __future__ import annotations
+
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from dataclasses import dataclass
+from pathlib import Path
+import argparse
+import importlib.util ,runpy
+import traceback
+
+# --- utility dataclass (for printing) ---
+
+@dataclass
+class Row:
+  read_rel: Path
+  owner: str|None
+  perm: str|None
+  write_name: str|None
+  target_dir: Path|None
+  ops: list[str]
+  errors: list[str]
+
+# --- helpers ---
+
+def _load_stage_module(stage_root: Path):
+  """Load Stage.py from stage_root into sys.modules['Stage'] (overwriting if present). Returns the Stage module."""
+  stage_py = stage_root/"Stage.py"
+  if not stage_py.exists():
+    raise FileNotFoundError(f"Stage.py not found at {stage_py} — place Stage.py in the stage root.")
+  spec = importlib.util.spec_from_file_location("Stage" ,str(stage_py))
+  if spec is None or spec.loader is None:
+    raise RuntimeError(f"cannot load Stage module from {stage_py}")
+  mod = importlib.util.module_from_spec(spec)
+  sys.modules["Stage"] = mod
+  spec.loader.exec_module(mod)  # type: ignore[union-attr]
+  return mod
+
+def _stage_program_paths(stage_root: Path)-> list[Path]:
+  rels: list[Path] = []
+  for p in stage_root.rglob("*.py"):
+    if p.name == "Stage.py":
+      continue
+    try:
+      if p.is_file():
+        rels.append(p.relative_to(stage_root))
+    except Exception:
+      continue
+  return sorted(rels ,key=lambda x: x.as_posix())
+
+def print_list(rows: list[Row])-> None:
+  for r in rows:
+    owner = r.owner or "?"
+    perm  = r.perm or "????"
+    name  = r.write_name or "?"
+    tdir  = str(r.target_dir) if r.target_dir is not None else "?"
+    print(f"{r.read_rel.as_posix()}: {owner} {perm} {name} {tdir}")
+
+def print_table(rows: list[Row])-> None:
+  if not rows:
+    return
+  a = [r.read_rel.as_posix() for r in rows]
+  b = [(r.owner or "?") for r in rows]
+  c = [(r.perm or "????") for r in rows]
+  d = [(r.write_name or "?") for r in rows]
+  e = [str(r.target_dir) if r.target_dir is not None else "?" for r in rows]
+  wa = max(len(s) for s in a)
+  wb = max(len(s) for s in b)
+  wc = max(len(s) for s in c)
+  wd = max(len(s) for s in d)
+  for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
+    print(f"{sa:<{wa}}  {sb:<{wb}}  {sc:<{wc}}  {sd:<{wd}}  {se}")
+
+# --- core ---
+
+def ls_stage(stage_root: Path ,fmt: str="list")-> int:
+  Stage = _load_stage_module(stage_root)
+  Stage.Stage.set_mode("noop")  # hard safety for this tool
+
+  rows: list[Row] = []
+  errs: list[str] = []
+
+  for rel in _stage_program_paths(stage_root):
+    abs_path = stage_root/rel
+    try:
+      # isolate per-run state
+      Stage.Stage._current = None
+      Stage.Stage._all_records.clear()
+      Stage.Stage._begin(read_rel=rel ,stage_root=stage_root)
+
+      # execute the staged program under its real path
+      runpy.run_path(str(abs_path) ,run_name="__main__")
+
+      rec = Stage.Stage._end()
+      if rec is None:
+        errs.append(f"{rel}: program executed but Stage.init(...) was never called")
+        continue
+
+      rows.append(
+        Row(
+          read_rel=rel
+        , owner=rec.owner
+        , perm=rec.perm_octal_str
+        , write_name=rec.write_name
+        , target_dir=rec.target_dir
+        , ops=list(rec.ops)
+        , errors=list(rec.errors)
+        )
+      )
+
+    except SystemExit as e:
+      errs.append(f"{rel}: program called sys.exit({e.code}) during listing")
+    except Exception:
+      tb = traceback.format_exc(limit=2)
+      errs.append(f"{rel}: exception during execution:\n{tb}")
+
+  # print data
+  if fmt == "table":
+    print_table(rows)
+  else:
+    print_list(rows)
+
+  # print per-row Stage errors
+  row_errs = [f"{r.read_rel}: {msg}" for r in rows for msg in r.errors]
+  all_errs = row_errs + errs
+  if all_errs:
+    print("\nerror(s):" ,file=sys.stderr)
+    for e in all_errs:
+      print(f"  - {e}" ,file=sys.stderr)
+    return 1
+  return 0
+
+# --- CLI ---
+
+def main(argv: list[str] | None=None)-> int:
+  import argparse
+  ap = argparse.ArgumentParser(
+    prog="stage_ls.py"
+  , description="Execute staged Python configs with Stage in 'noop' mode and list resolved metadata."
+  )
+  ap.add_argument("--stage" ,default="stage" ,help="stage directory (default: ./stage)")
+  ap.add_argument("--format" ,choices=["list","table"] ,default="list" ,help="output format")
+  args = ap.parse_args(argv)
+
+  stage_root = Path(args.stage)
+  if not stage_root.exists() or not stage_root.is_dir():
+    print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
+    return 2
+
+  return ls_stage(stage_root ,fmt=args.format)
+
+if __name__ == "__main__":
+  sys.exit(main())