check point working on staging
authorThomas Walker Lynch <eknp9n@reasoningtechnology.com>
Thu, 18 Sep 2025 15:27:58 +0000 (08:27 -0700)
committerThomas Walker Lynch <eknp9n@reasoningtechnology.com>
Thu, 18 Sep 2025 15:27:58 +0000 (08:27 -0700)
28 files changed:
developer/source/DNS/Planner.py
developer/source/DNS/deploy.py [deleted file]
developer/source/DNS/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft [new file with mode: 0644]
developer/source/DNS/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft [new file with mode: 0644]
developer/source/DNS/deprecated/stage_orig/etc/systemd/system/unbound@.service [new file with mode: 0644]
developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-US.conf [new file with mode: 0644]
developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-x6.conf [new file with mode: 0644]
developer/source/DNS/deprecated/stage_orig/usr/local/sbin/DNS_status.sh [new file with mode: 0755]
developer/source/DNS/deprecated/stage_show_plan.py [new file with mode: 0644]
developer/source/DNS/executor.py [new file with mode: 0755]
developer/source/DNS/install_staged_tree.py [deleted file]
developer/source/DNS/plan_show.py [deleted file]
developer/source/DNS/stage [deleted symlink]
developer/source/DNS/stage_cp.py [deleted file]
developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft [deleted file]
developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft [deleted file]
developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service [deleted file]
developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf [deleted file]
developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf [deleted file]
developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh [deleted file]
developer/source/DNS/stage_show_plan.py [deleted file]
developer/source/DNS/stage_test_0/a.conf [deleted file]
developer/source/DNS/stage_test_0/b.conf [deleted file]
developer/source/DNS/stage_test_0/c.conf [deleted file]
developer/source/DNS/stage_test_0/example.py [deleted file]
developer/source/DNS/stage_test_0/name_space/c.conf [deleted file]
developer/source/DNS/stage_test_0/stage_ls.py [deleted file]
developer/source/DNS/stage_test_0/unbound_conf.py [new file with mode: 0644]

index 0da1384..0782707 100644 (file)
@@ -2,19 +2,12 @@
 """
 Planner.py — plan builder for staged configuration (UNPRIVILEGED).
 
-The Planner accumulates Command objects into a Journal.
-
-Journal building is orchestrated by the outer runner (e.g., stage_show_plan, stage_cp)
-which constructs a Planner per config file and invokes Planner command methods.
-
-Defaults and provenance come from a PlannerContext instance. You can replace the
-context at any time via set_context(ctx).
-
-The Journal can be exported as CBOR via Journal.to_CBOR_bytes(), and reconstructed
-on the privileged side via Journal.from_CBOR_bytes().
-
-On-wire field names are snake_case and use explicit suffixes (_str,_bytes,_int, etc.)
-to avoid ambiguity.
+Given:  runner-side provenance (PlanProvenance) and optional defaults (WriteFileMeta).
+Does:   expose Planner whose command methods (copy/displace/delete) build Command entries,
+        resolving arguments with precedence: kwarg > per-call WriteFileMeta > planner default
+        (and for filename, fallback to provenance-derived basename). On any argument error,
+        the Command is returned with errors and NOT appended to the Journal.
+Returns: Journal (model only; dict in/out) via planner.journal().
 """
 
 from __future__ import annotations
@@ -22,175 +15,318 @@ from __future__ import annotations
 # no bytecode anywhere (works under sudo/root shells too)
 import sys ,os
 sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE","1")
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
 
-from dataclasses import dataclass ,field
 from pathlib import Path
-from typing import Any
+import getpass
+
 
 # ===== Utilities =====
 
-def _norm_perm(value: int|str)-> tuple[int,str]|None:
-  "Given int or 4-char octal string. Does validate/normalize. Returns (int,'%04o') or None."
+def norm_perm(value: int|str)-> tuple[int,str]|None:
+  "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int,'%04o') or None."
   if isinstance(value ,int):
     if 0 <= value <= 0o7777:
       return value ,f"{value:04o}"
     return None
   if isinstance(value ,str):
-    s = value.strip()
-    if len(s)==4 and all(ch in "01234567" for ch in s):
+    s = value.strip().lower()
+    if s.startswith("0o"):
+      try:
+        v = int(s ,8)
+        return v ,f"{v:04o}"
+      except Exception:
+        return None
+    if len(s) in (3 ,4) and all(ch in "01234567" for ch in s):
       try:
         v = int(s ,8)
-        return v ,s
+        return v ,f"{v:04o}"
       except Exception:
         return None
   return None
 
-def _is_abs_dpath(dpath_str: str)-> bool:
+def is_abs_dpath(dpath_str: str|None)-> bool:
   "Given path string. Does quick abs dir check. Returns bool."
-  return bool(dpath_str) and dpath_str.startswith("/")
+  return isinstance(dpath_str ,str) and dpath_str.startswith("/") and "\x00" not in dpath_str
+
+def norm_abs_dpath_str(value: str|Path|None)-> str|None:
+  "Given str/Path/None. Does normalize absolute dir path string. Returns str or None."
+  if value is None: return None
+  s = value.as_posix() if isinstance(value ,Path) else str(value)
+  return s if is_abs_dpath(s) else None
+
+def norm_fname_or_none(value: str|None)-> str|None:
+  "Given candidate filename or None. Does validate bare filename. Returns str or None."
+  if value is None: return None
+  s = str(value)
+  if not s: return None
+  if "/" in s or s in ("." ,"..") or "\x00" in s: return None
+  return s
+
+def norm_nonempty_owner(value: str|None)-> str|None:
+  "Given owner string or None. Does minimally validate (non-empty). Returns str or None."
+  if value is None: return None
+  s = str(value).strip()
+  return s if s else None
+
+def parse_mode(value: int|str|None)-> tuple[int|None ,str|None]:
+  "Given int/str/None. Does normalize via norm_perm. Returns (int,'%04o') or (None,None)."
+  if value is None: return None ,None
+  r = norm_perm(value)
+  return r if r is not None else (None ,None)
+
+def norm_content_bytes(value: bytes|str|None)-> bytes|None:
+  "Given bytes/str/None. Does normalize to UTF-8 bytes or None. Returns bytes|None."
+  if value is None: return None
+  if isinstance(value ,bytes): return value
+  return value.encode("utf-8")
+
+def norm_dpath_str(value: str|Path|None)-> str|None:
+  "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None."
+  if value is None: return None
+  s = value.as_posix() if isinstance(value ,Path) else str(value)
+  if not s or "\x00" in s: return None
+  return s
+
+
+# ===== Wire-ready model types (no CBOR here) =====
 
-def _join_write_file(dpath_str: str ,fname_str: str)-> str:
-  "Given dir path string and filename string. Does join. Returns POSIX path string or ''."
-  if not _is_abs_dpath(dpath_str): return ""
-  if not fname_str or "/" in fname_str: return ""
-  return (Path(dpath_str)/fname_str).as_posix()
-
-# ===== Core data types =====
-
-@dataclass(slots=True)
 class Command:
   """
   Command — a single planned operation.
 
-  Given a command name and an argument map (native values).
-  Does hold the op name, owns a distinct args map, accumulates errors for this op.
-  Returns serializable mapping via to_map().
+  Given name_str ('copy'|'displace'|'delete'), optional arg_dict, optional errors_list.
+  Does hold op name, own a fresh arg_dict, collect per-entry errors.
+  Returns dictionary via as_dictionary().
   """
-  name_str: str
-  args_map: dict[str,Any] = field(default_factory=dict)
-  errors_list: list[str] = field(default_factory=list)
+  __slots__ = ("name_str" ,"arg_dict" ,"errors_list")
+
+  def __init__(self ,name_str: str ,arg_dict: dict|None=None ,errors_list: list[str]|None=None)-> None:
+    self.name_str = name_str
+    self.arg_dict = dict(arg_dict) if arg_dict is not None else {}
+    self.errors_list = list(errors_list) if errors_list is not None else []
 
   def add_error(self ,msg_str: str)-> None:
-    "Given message. Does append to errors_list. Returns None."
     self.errors_list.append(msg_str)
 
-  def to_map(self)-> dict[str,Any]:
-    "Given self. Does convert to a plain dict. Returns {'op','args_map','errors_list'}."
+  def as_dictionary(self)-> dict:
     return {
       "op": self.name_str
-      ,"args_map": dict(self.args_map)
+      ,"arg_dict": dict(self.arg_dict)
       ,"errors_list": list(self.errors_list)
     }
 
-@dataclass(slots=True)
-class PlannerContext:
-  """
-  PlannerContext — per-config provenance and defaults.
+  def print(self, *, index: int|None=None, file=None)-> None:
+    """
+    Given: optional index for numbering and optional file-like (defaults to stdout).
+    Does:  print a compact, human-readable one-line summary of this command; prints any errors indented below.
+    Returns: None.
+    """
+    if file is None:
+      import sys as _sys
+      file = _sys.stdout
 
-  Given: stage_root_dpath, read_file_rel_fpath, default write_file location/name,
-         default owner name, default permission (int or '0644'), optional default content.
-  Does:  provide ambient defaults and provenance to Planner methods.
-  Returns: n/a (data holder).
-  """
-  stage_root_dpath: Path
-  read_file_rel_fpath: Path
-  default_write_file_dpath_str: str
-  default_write_file_fname_str: str
-  default_owner_name_str: str
-  default_mode_int: int|None = None
-  default_mode_octal_str: str|None = None
-  default_content_bytes: bytes|None = None
-
-  @staticmethod
-  def from_values(stage_root_dpath: Path
-                  ,read_file_rel_fpath: Path
-                  ,write_file_dpath_str: str
-                  ,write_file_fname_str: str
-                  ,owner_name_str: str
-                  ,perm: int|str
-                  ,content: bytes|str|None
-  )-> PlannerContext:
-    "Given raw values. Does normalize perm and content. Returns PlannerContext."
-    if isinstance(content ,str):
-      content_b = content.encode("utf-8")
-    else:
-      content_b = content
-    perm_norm = _norm_perm(perm)
-    if perm_norm is None:
-      m_int ,m_oct = None ,None
+    op = self.name_str
+    ad = self.arg_dict or {}
+
+    # Compose destination path for display
+    d = ad.get("write_file_dpath_str") or ""
+    f = ad.get("write_file_fname") or ""
+    try:
+      from pathlib import Path as _Path
+      dst = (_Path(d)/f).as_posix() if d and f and "/" not in f else "?"
+    except Exception:
+      dst = "?"
+
+    # Numbering prefix
+    prefix = f"{index:02d}. " if index is not None else ""
+
+    if op == "copy":
+      mode  = ad.get("mode_int")
+      owner = ad.get("owner_name")
+      size  = len(ad.get("content_bytes") or b"")
+      line  = f"{prefix}copy     -> {dst}  mode {mode:04o} owner {owner} bytes {size}"
+    elif op == "displace":
+      line  = f"{prefix}displace -> {dst}"
+    elif op == "delete":
+      line  = f"{prefix}delete   -> {dst}"
     else:
-      m_int ,m_oct = perm_norm
-    return PlannerContext(
-      stage_root_dpath=stage_root_dpath
-      ,read_file_rel_fpath=read_file_rel_fpath
-      ,default_write_file_dpath_str=write_file_dpath_str
-      ,default_write_file_fname_str=write_file_fname_str
-      ,default_owner_name_str=owner_name_str
-      ,default_mode_int=m_int
-      ,default_mode_octal_str=m_oct
-      ,default_content_bytes=content_b
-    )
+      line  = f"{prefix}?op?     -> {dst}"
+
+    print(line, file=file)
+
+    # Print any per-entry errors underneath
+    for err in self.errors_list:
+      print(f"    ! {err}", file=file)
+
 
-@dataclass(slots=True)
 class Journal:
   """
-  Journal — ordered list of Commands plus provenance metadata.
+  Journal — ordered list of Command plus provenance metadata (model only; no CBOR).
 
-  Given optional meta map.
-  Does append commands, expose entries, produce plain or CBOR encodings, and rebuild from CBOR.
-  Returns plain dict via to_map(), bytes via to_CBOR_bytes(), Journal via from_CBOR_bytes().
+  Given optional plan_dict in wire shape (for reconstruction).
+  Does manage meta, append commands, expose entries, and pack to dict.
+  Returns dict via as_dictionary().
   """
-  meta_map: dict[str,Any] = field(default_factory=dict)
-  commands_list: list[Command] = field(default_factory=list)
+  __slots__ = ("meta_dict" ,"command_list")
+
+  def __init__(self ,plan_dict: dict|None=None)-> None:
+    self.meta_dict = {}
+    self.command_list = []
+    if plan_dict is not None:
+      self._init_from_dict(plan_dict)
+
+  def _init_from_dict(self ,plan_dict: dict)-> None:
+    if not isinstance(plan_dict ,dict):
+      raise ValueError("plan_dict must be a dict")
+    meta = dict(plan_dict.get("meta_dict") or {})
+    entries = plan_dict.get("entries_list") or []
+    self.meta_dict.update(meta)
+    for e in entries:
+      if not isinstance(e ,dict): 
+        continue
+      op   = e.get("op") or "?"
+      args = e.get("arg_dict") or {}
+      errs = e.get("errors_list") or []
+      self.command_list.append(Command(name_str=op ,arg_dict=dict(args) ,errors_list=list(errs)))
 
   def set_meta(self ,**kv)-> None:
-    "Given keyword meta. Does merge into meta_map. Returns None."
-    self.meta_map.update(kv)
+    self.meta_dict.update(kv)
 
   def append(self ,cmd: Command)-> None:
-    "Given Command. Does append to commands_list. Returns None."
-    self.commands_list.append(cmd)
+    self.command_list.append(cmd)
 
-  def entries_list(self)-> list[dict[str,Any]]:
-    "Given n/a. Does return list of entry dicts (copy). Returns list[dict]."
-    return [c.to_map() for c in self.commands_list]
+  def entries_list(self)-> list[dict]:
+    return [c.as_dictionary() for c in self.command_list]
 
-  def to_map(self)-> dict[str,Any]:
-    "Given n/a. Does package a plan map (ready for CBOR). Returns dict."
+  def as_dictionary(self)-> dict:
     return {
       "version_int": 1
-      ,"meta_map": dict(self.meta_map)
+      ,"meta_dict": dict(self.meta_dict)
       ,"entries_list": self.entries_list()
     }
 
-  def to_CBOR_bytes(self ,canonical_bool: bool=True)-> bytes:
-    "Given n/a. Does CBOR-encode to bytes (requires cbor2). Returns bytes."
-    try:
-      import cbor2
-    except Exception as e:
-      raise RuntimeError(f"package cbor2 required for to_CBOR_bytes: {e}")
-    return cbor2.dumps(self.to_map() ,canonical=canonical_bool)
-
-  @staticmethod
-  def from_CBOR_bytes(data_bytes: bytes)-> Journal:
-    "Given CBOR bytes. Does decode and rebuild a Journal (Commands + meta). Returns Journal."
+  def print(self, *, index_start: int = 1, file=None) -> None:
+    """
+    Given: optional starting index and optional file-like (defaults to stdout).
+    Does:  print each Command on a single line via Command.print(), numbered.
+    Returns: None.
+    """
+    if file is None:
+      import sys as _sys
+      file = _sys.stdout
+
+    if not self.command_list:
+      print("(plan is empty)", file=file)
+      return
+
+    for i, cmd in enumerate(self.command_list, start=index_start):
+      cmd.print(index=i, file=file)
+
+# ===== Runner-provided provenance =====
+
+# Planner.py
+class PlanProvenance:
+  """
+  Runner-provided, read-only provenance for a single config script.
+  """
+  __slots__ = ("stage_root_dpath","config_abs_fpath","config_rel_fpath",
+               "read_dir_dpath","read_fname")
+
+  def __init__(self, *, stage_root: Path, config_path: Path):
+    self.stage_root_dpath = stage_root.resolve()
+    self.config_abs_fpath = config_path.resolve()
     try:
-      import cbor2
-    except Exception as e:
-      raise RuntimeError(f"package cbor2 required for from_CBOR_bytes: {e}")
-    obj = cbor2.loads(data_bytes)
-    if not isinstance(obj ,dict): raise ValueError("CBOR root must be a map")
-    meta = dict(obj.get("meta_map") or {})
-    entries = obj.get("entries_list") or []
-    j = Journal(meta_map=meta)
-    for e in entries:
-      if not isinstance(e ,dict): continue
-      op = e.get("op") or "?"
-      args = e.get("args_map") or {}
-      errs = e.get("errors_list") or []
-      j.append(Command(name_str=op ,args_map=dict(args) ,errors_list=list(errs)))
-    return j
+      self.config_rel_fpath = self.config_abs_fpath.relative_to(self.stage_root_dpath)
+    except Exception:
+      self.config_rel_fpath = Path(self.config_abs_fpath.name)
+
+    # Where the config file lives (used to anchor relative write dirs)
+    self.read_dir_dpath = self.config_abs_fpath.parent
+
+    # “py-less” filename: strip .stage.py, else .py, else keep name
+    name = self.config_abs_fpath.name
+    if name.endswith(".stage.py"):
+      self.read_fname = name[:-len(".stage.py")]
+    elif name.endswith(".py"):
+      self.read_fname = name[:-3]
+    else:
+      self.read_fname = name
+
+  def print(self, *, file=None) -> None:
+    """
+    Given: optional file-like (defaults to stdout).
+    Does:  print a readable, multi-line summary of provenance.
+    Returns: None.
+    """
+    if file is None:
+      import sys as _sys
+      file = _sys.stdout
+
+    print(f"Stage root:   {self.stage_root_dpath}", file=file)
+    print(f"Config (rel): {self.config_rel_fpath.as_posix()}", file=file)
+    print(f"Config (abs): {self.config_abs_fpath}", file=file)
+    print(f"Read dir:     {self.read_dir_dpath}", file=file)
+    print(f"Read fname:   {self.read_fname}", file=file)
+
+
+
+# ===== Admin-facing defaults carrier =====
+
+class WriteFileMeta:
+  """
+  WriteFileMeta — per-call or planner-default write-file attributes.
+
+  Given dpath (abs str/Path) ,fname (bare name or None) ,owner (str)
+        ,mode (int|'0644') ,content (bytes|str|None).
+  Does normalize into fields (may remain None if absent/invalid).
+  Returns object suitable for providing defaults to Planner methods.
+  """
+  __slots__ = ("dpath_str" ,"fname" ,"owner_name_str" ,"mode_int" ,"mode_octal_str" ,"content_bytes")
+
+  def __init__(self
+    ,*
+    ,dpath="/"
+    ,fname=None            # None or "." → let Planner resolve (provenance fallback)
+    ,owner="root"          # "." → current process user (resolved by Planner)
+    ,mode=0o444
+    ,content=None
+  ):
+    self.dpath_str           = norm_dpath_str(dpath)
+    # keep "." as a sentinel; otherwise validate the filename
+    if fname == ".":
+      self.fname = "."
+    else:
+      self.fname = norm_fname_or_none(fname)
+
+    # keep "." as a sentinel; otherwise normalize owner
+    if owner == ".":
+      self.owner_name_str = "."
+    else:
+      self.owner_name_str = norm_nonempty_owner(owner)
+
+    self.mode_int, self.mode_octal_str = parse_mode(mode)
+    # content_'bytes' due to UTF8 encoding
+    self.content_bytes = norm_content_bytes(content)
+
+  def print(self, *, label: str | None = None, file=None) -> None:
+    """
+    Given: optional label and optional file-like (defaults to stdout).
+    Does:  print a single-line summary of defaults/overrides.
+    Returns: None.
+    """
+    if file is None:
+      import sys as _sys
+      file = _sys.stdout
+
+    dpath = self.dpath_str or "?"
+    fname = self.fname or "?"
+    owner = self.owner_name_str or "?"
+    mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int, int) else (self.mode_octal_str or "?")
+    size = len(self.content_bytes) if isinstance(self.content_bytes, (bytes, bytearray)) else 0
+    prefix = (label + ": ") if label else ""
+    print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}", file=file)
+
 
 # ===== Planner =====
 
@@ -198,159 +334,200 @@ class Planner:
   """
   Planner — constructs a Journal of Commands from config scripts.
 
-  Given: PlannerContext (provenance + defaults).
-  Does:  maintains a Journal; command methods (copy/displace/delete) create Command objects,
-         fill missing args from context defaults, preflight minimal shape checks, then append.
-  Returns: accessors for Journal and meta; no I/O or privilege here.
+  Given provenance (PlanProvenance) and optional default WriteFileMeta.
+  Does resolve command parameters by precedence: kwarg > per-call WriteFileMeta > planner default,
+      with a final filename fallback to provenance basename if still missing.
+      On any argument error, returns the Command with errors and DOES NOT append it to Journal.
+  Returns live Journal via journal().
   """
-  def __init__(self ,ctx: PlannerContext)-> None:
-    self._ctx = ctx
+  __slots__ = ("_prov" ,"_defaults" ,"_journal")
+
+  def __init__(self ,provenance: PlanProvenance ,defaults: WriteFileMeta|None=None)-> None:
+    self._prov = provenance
+    self._defaults = defaults if defaults is not None else WriteFileMeta(
+      dpath="/"
+      ,fname=provenance.read_fname
+      ,owner="root"
+      ,mode=0o444
+      ,content=None
+    )
     self._journal = Journal()
-    # seed provenance; outer tools can add more later
     self._journal.set_meta(
-      source_read_file_rel_fpath_str=ctx.read_file_rel_fpath.as_posix()
-      ,stage_root_dpath_str=str(ctx.stage_root_dpath)
+      stage_root_dpath_str=str(self._prov.stage_root_dpath)
+      ,config_rel_fpath_str=self._prov.config_rel_fpath.as_posix()
     )
 
-  # --- Context management ---
-
-  def set_context(self ,ctx: PlannerContext)-> None:
-    "Given PlannerContext. Does replace current context. Returns None."
-    self._ctx = ctx
+  # --- defaults management / access ---
 
-  def context(self)-> PlannerContext:
-    "Given n/a. Does return current context. Returns PlannerContext."
-    return self._ctx
+  def set_defaults(self ,defaults: WriteFileMeta)-> None:
+    "Given WriteFileMeta. Does replace planner defaults. Returns None."
+    self._defaults = defaults
 
-  # --- Journal access ---
+  def defaults(self)-> WriteFileMeta:
+    "Given n/a. Does return current WriteFileMeta defaults. Returns WriteFileMeta."
+    return self._defaults
 
   def journal(self)-> Journal:
-    "Given n/a. Does return the Journal (live). Returns Journal."
+    "Given n/a.  Returns Journal reference (live, still being modified here)."
     return self._journal
 
-  # --- Helpers ---
-
-  def _resolve_write_file(self ,write_file_dpath_str: str|None ,write_file_fname_str: str|None)-> tuple[str,str]:
-    "Given optional write_file dpath/fname. Does fill from context; '.' fname → read_file basename. Returns (dpath,fname)."
-    dpath_str = write_file_dpath_str if write_file_dpath_str is not None else self._ctx.default_write_file_dpath_str
-    fname_str = write_file_fname_str if write_file_fname_str is not None else self._ctx.default_write_file_fname_str
-    if fname_str == ".":
-      fname_str = self._ctx.read_file_rel_fpath.name
-    return dpath_str ,fname_str
-
-  def _resolve_owner(self ,owner_name_str: str|None)-> str:
-    "Given optional owner. Does fill from context. Returns owner string."
-    return owner_name_str if owner_name_str is not None else self._ctx.default_owner_name_str
-
-  def _resolve_mode(self ,perm: int|str|None)-> tuple[int|None,str|None]:
-    "Given optional perm. Does normalize or fall back to context. Returns (mode_int,mode_octal_str)."
-    if perm is None:
-      return self._ctx.default_mode_int ,self._ctx.default_mode_octal_str
-    norm = _norm_perm(perm)
-    return (norm if norm is not None else (None ,None))
-
-  def _resolve_content(self ,content: bytes|str|None)-> bytes|None:
-    "Given optional content (bytes or str). Does normalize or fall back to context. Returns bytes|None."
-    if content is None:
-      return self._ctx.default_content_bytes
-    if isinstance(content ,str):
-      return content.encode("utf-8")
-    return content
-
-  # --- Command builders ---
+  # --- resolution helpers ---
+
+  def _pick(self ,kw ,meta_attr ,default_attr):
+    "Given three sources. Does pick first non-None. Returns value or None."
+    return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr)
+
+  # Planner.py (inside Planner)
+  def _resolve_write_file(self, wfm, dpath, fname) -> tuple[str|None, str|None]:
+    # normalize explicit kwargs (allow "." to pass through untouched)
+    dpath_str = norm_dpath_str(dpath) if dpath is not None else None
+    if fname is not None and fname != ".":
+      fname = norm_fname_or_none(fname)
+
+    dpath_val = self._pick(dpath_str, (wfm.dpath_str if wfm else None), self._defaults.dpath_str)
+    fname_val = self._pick(fname,     (wfm.fname     if wfm else None), self._defaults.fname)
+
+    # final fallback for filename: "." or None → derive from config name
+    if fname_val == "." or fname_val is None:
+      fname_val = self._prov.read_fname
+
+    # anchor relative dpaths against the config’s directory
+    if dpath_val is not None and not is_abs_dpath(dpath_val):
+      dpath_val = (self._prov.read_dir_dpath / dpath_val).as_posix()
+
+    return dpath_val, fname_val
+
+  def _resolve_owner_mode_content(self
+    ,wfm: WriteFileMeta|None
+    ,owner: str|None
+    ,mode: int|str|None
+    ,content: bytes|str|None
+  )-> tuple[str|None ,tuple[int|None ,str|None] ,bytes|None]:
+    owner_norm = norm_nonempty_owner(owner) if (owner is not None and owner != ".") else owner
+    mode_norm  = parse_mode(mode) if mode is not None else (None, None)
+    content_b  = norm_content_bytes(content) if content is not None else None
+
+    owner_v = self._pick(owner_norm, (wfm.owner_name_str if wfm else None), self._defaults.owner_name_str)
+
+    # resolve "." → current process user
+    if owner_v == ".":
+      owner_v = getpass.getuser()
+
+    mode_v = (mode_norm if mode_norm != (None, None) else
+              ((wfm.mode_int, wfm.mode_octal_str) if wfm else (self._defaults.mode_int, self._defaults.mode_octal_str)))
+    content_v = self._pick(content_b, (wfm.content_bytes if wfm else None), self._defaults.content_bytes)
+    return owner_v, mode_v, content_v
+
+  def print(self, *, show_journal: bool = True, file=None) -> None:
+    """
+    Given: flags (show_journal) and optional file-like (defaults to stdout).
+    Does:  print provenance, defaults, and optionally the journal via delegation.
+    Returns: None.
+    """
+    if file is None:
+      import sys as _sys
+      file = _sys.stdout
+
+    print("== Provenance ==", file=file)
+    self._prov.print(file=file)
+
+    print("\n== Defaults ==", file=file)
+    self._defaults.print(label="defaults", file=file)
+
+    if show_journal:
+      entries = getattr(self._journal, "command_list", [])
+      n_total = len(entries)
+      n_copy = sum(1 for c in entries if getattr(c, "name_str", None) == "copy")
+      n_disp = sum(1 for c in entries if getattr(c, "name_str", None) == "displace")
+      n_del  = sum(1 for c in entries if getattr(c, "name_str", None) == "delete")
+
+      print("\n== Journal ==", file=file)
+      print(f"entries: {n_total}  copy:{n_copy}  displace:{n_disp}  delete:{n_del}", file=file)
+      if n_total:
+        self._journal.print(index_start=1, file=file)
+      else:
+        print("(plan is empty)", file=file)
+
+  # --- Command builders (first arg may be WriteFileMeta) ---
 
   def copy(self
-           ,*
-           ,write_file_dpath_str: str|None=None
-           ,write_file_fname_str: str|None=None
-           ,owner_name_str: str|None=None
-           ,perm: int|str|None=None
-           ,content: bytes|str|None=None
-           ,read_file_rel_fpath: Path|None=None
+    ,wfm: WriteFileMeta|None=None
+    ,*
+    ,write_file_dpath: str|Path|None=None
+    ,write_file_fname: str|None=None
+    ,owner: str|None=None
+    ,mode: int|str|None=None
+    ,content: bytes|str|None=None
   )-> Command:
     """
-    Given: optional overrides for write_file (dpath,fname,owner,perm), content, and read_file_rel_fpath.
-    Does:  build a 'copy' command entry (content is embedded; read_file path kept as provenance).
-    Returns: Command (also appended to Journal).
+    Given optional WriteFileMeta plus keyword overrides.
+    Does build a 'copy' command; on any argument error the command is returned with errors and NOT appended.
+    Returns Command.
     """
     cmd = Command("copy")
-    # resolve basics
-    wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
-    owner_str = self._resolve_owner(owner_name_str)
-    mode_int ,mode_oct = self._resolve_mode(perm)
-    content_b = self._resolve_content(content)
-    read_rel = (read_file_rel_fpath if read_file_rel_fpath is not None else self._ctx.read_file_rel_fpath)
-
-    # minimal shape checks (well-formedness, not policy)
-    if not _is_abs_dpath(wf_dpath_str):
-      cmd.add_error("write_file_dpath_str must be absolute and non-empty")
-    if not wf_fname_str or "/" in wf_fname_str:
-      cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
-    if not owner_str:
-      cmd.add_error("owner_name_str must be non-empty")
+    dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
+    owner_v ,(mode_int ,mode_oct) ,content_b = self._resolve_owner_mode_content(wfm ,owner ,mode ,content)
+
+    # well-formed checks
+    if not is_abs_dpath(dpath):            cmd.add_error("write_file_dpath must be absolute")
+    if norm_fname_or_none(fname) is None:  cmd.add_error("write_file_fname must be a bare filename")
+    if not owner_v:                        cmd.add_error("owner must be non-empty")
     if (mode_int ,mode_oct) == (None ,None):
-      cmd.add_error("perm must be an int <= 0o7777 or a 4-digit octal string")
+      cmd.add_error("mode must be int <= 0o7777 or 3/4-digit octal string")
     if content_b is None:
       cmd.add_error("content is required for copy() (bytes or str)")
 
-    cmd.args_map.update({
-      "write_file_dpath_str": wf_dpath_str
-      ,"write_file_fname_str": wf_fname_str
-      ,"owner_name_str": owner_str
-      ,"mode_int": mode_int
-      ,"mode_octal_str": mode_oct
-      ,"content_bytes": content_b
-      ,"read_file_rel_fpath_str": read_rel.as_posix()
+    cmd.arg_dict.update({
+      "write_file_dpath_str": dpath,
+      "write_file_fname": fname,           # was write_file_fname
+      "owner_name": owner_v,               # was owner_name_str
+      "mode_int": mode_int,
+      "mode_octal_str": mode_oct,
+      "content_bytes": content_b,
+      "provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix(),
     })
-    self._journal.append(cmd)
+
+    if not cmd.errors_list:
+      self._journal.append(cmd)
     return cmd
 
   def displace(self
-               ,*
-               ,write_file_dpath_str: str|None=None
-               ,write_file_fname_str: str|None=None
+    ,wfm: WriteFileMeta|None=None
+    ,*
+    ,write_file_dpath: str|Path|None=None
+    ,write_file_fname: str|None=None
   )-> Command:
-    """
-    Given: optional write_file dpath/fname overrides.
-    Does:  build a 'displace' command (rename existing write_file in-place with UTC suffix).
-    Returns: Command (appended).
-    """
+    "Given optional WriteFileMeta plus overrides. Does build 'displace' entry or return errors. Returns Command."
     cmd = Command("displace")
-    wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
-
-    if not _is_abs_dpath(wf_dpath_str):
-      cmd.add_error("write_file_dpath_str must be absolute and non-empty")
-    if not wf_fname_str or "/" in wf_fname_str:
-      cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
-
-    cmd.args_map.update({
-      "write_file_dpath_str": wf_dpath_str
-      ,"write_file_fname_str": wf_fname_str
+    dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
+    if not is_abs_dpath(dpath):            cmd.add_error("write_file_dpath must be absolute")
+    if norm_fname_or_none(fname) is None:  cmd.add_error("write_file_fname must be a bare filename")
+    cmd.arg_dict.update({
+      "write_file_dpath_str": dpath,
+      "write_file_fname": fname,
     })
-    self._journal.append(cmd)
+    if not cmd.errors_list:
+      self._journal.append(cmd)
     return cmd
 
   def delete(self
-             ,*
-             ,write_file_dpath_str: str|None=None
-             ,write_file_fname_str: str|None=None
+    ,wfm: WriteFileMeta|None=None
+    ,*
+    ,write_file_dpath: str|Path|None=None
+    ,write_file_fname: str|None=None
   )-> Command:
-    """
-    Given: optional write_file dpath/fname overrides.
-    Does:  build a 'delete' command (unlink if present).
-    Returns: Command (appended).
-    """
+    "Given optional WriteFileMeta plus overrides. Does build 'delete' entry or return errors. Returns Command."
     cmd = Command("delete")
-    wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
-
-    if not _is_abs_dpath(wf_dpath_str):
-      cmd.add_error("write_file_dpath_str must be absolute and non-empty")
-    if not wf_fname_str or "/" in wf_fname_str:
-      cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
-
-    cmd.args_map.update({
-      "write_file_dpath_str": wf_dpath_str
-      ,"write_file_fname_str": wf_fname_str
+    dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
+    if not is_abs_dpath(dpath):            cmd.add_error("write_file_dpath must be absolute")
+    if norm_fname_or_none(fname) is None:  cmd.add_error("write_file_fname must be a bare filename")
+    cmd.arg_dict.update({
+      "write_file_dpath_str": dpath,
+      "write_file_fname": fname,
     })
-    self._journal.append(cmd)
+    if not cmd.errors_list:
+      self._journal.append(cmd)
     return cmd
+
+
+  
diff --git a/developer/source/DNS/deploy.py b/developer/source/DNS/deploy.py
deleted file mode 100755 (executable)
index 5f12397..0000000
+++ /dev/null
@@ -1,148 +0,0 @@
-#!/usr/bin/env python3
-"""
-deploy.py — Deploy staged DNS bundle (Unbound per-subu + nft redirect)
-RT-v2025.09.15.4
-
-What it does
-  - Installs the staged tree under ./stage into /
-  - systemctl daemon-reload
-  - nft -f /etc/nftables.conf   (relies on: include "/etc/nftables.d/*.nft")
-  - enable + restart unbound@<instance> for each instance (default: US x6)
-
-Assumptions
-  - This file lives next to install_staged_tree.py
-  - Stage contains:
-      stage/etc/nftables.d/10-block-IPv6.nft
-      stage/etc/nftables.d/20-SUBU-ports.nft
-      stage/etc/systemd/system/unbound@.service
-      stage/etc/unbound/unbound-US.conf (127.0.0.1@5301)
-      stage/etc/unbound/unbound-x6.conf (127.0.0.1@5302)
-  - /etc/nftables.conf has:  include "/etc/nftables.d/*.nft"
-
-Exit codes
-  0 = success, 2 = preflight/deploy error
-"""
-from __future__ import annotations
-from pathlib import Path
-import argparse
-import importlib
-import os
-import subprocess
-import sys
-
-ROOT = Path(__file__).resolve().parent
-STAGE = ROOT / "stage"
-
-def _run(cmd: list[str]) -> tuple[int, str, str]:
-  cp = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-  return (cp.returncode, cp.stdout.strip(), cp.stderr.strip())
-
-def _preflight_errors() -> list[str]:
-  errs = []
-  if os.geteuid() != 0:
-    errs.append("must be run as root (sudo)")
-  if not STAGE.exists():
-    errs.append(f"stage dir missing: {STAGE}")
-  return errs
-
-def _install_stage(stage_root: Path) -> list[str]:
-  """
-  Call install_staged_tree.install_staged_tree(stage_root=..., dest_root=/, create_dirs=True)
-  and return its log lines.
-  """
-  sys.path.insert(0, str(ROOT))
-  try:
-    ist = importlib.import_module("install_staged_tree")
-  except Exception as e:
-    raise RuntimeError(f"failed to import install_staged_tree: {e}")
-
-  # Expect signature: (stage_root, dest_root, create_dirs=False, skip_identical=True) -> (logs, ifaces)
-  try:
-    logs, _ifaces = ist.install_staged_tree(
-      stage_root=stage_root,
-      dest_root=Path("/"),
-      create_dirs=True,
-      skip_identical=True,
-    )
-  except TypeError as te:
-    # Fallback for older two-arg signature: install_staged_tree(stage_root, dest_root)
-    try:
-      logs, _ifaces = ist.install_staged_tree(stage_root, Path("/"))
-    except Exception as e2:
-      raise RuntimeError(f"install_staged_tree() call failed: {e2}") from te
-  return logs
-
-def deploy(instances: list[str]) -> list[str]:
-  logs: list[str] = []
-
-  # Plan
-  logs.append("Deploy DNS plan:")
-  logs.append(f"  instances: {', '.join(instances)}")
-  logs.append(f"  stage: {STAGE}")
-  logs.append(f"  root:  /")
-  logs.append("")
-  logs.append("Installing staged artifacts…")
-
-  # Install staged files
-  install_logs = _install_stage(STAGE)
-  logs.extend(install_logs)
-
-  # Reload systemd units (for unbound@.service changes)
-  _run(["systemctl", "daemon-reload"])
-
-  # Apply nftables from the main config (which includes drop-ins)
-  rc, out, err = _run(["/usr/sbin/nft", "-f", "/etc/nftables.conf"])
-  if rc != 0:
-    raise RuntimeError(f"nftables apply failed:\n{err or out}")
-
-  # Sanity: verify our tables are present
-  rc2, out2, err2 = _run(["/usr/sbin/nft", "list", "tables"])
-  if rc2 != 0:
-    raise RuntimeError(f"nftables list tables failed:\n{err2 or out2}")
-
-  required = {"inet NO-IPV6", "inet SUBU-DNS-REDIRECT", "inet SUBU-PORT-EGRESS"}
-  present = set()
-  for line in out2.splitlines():
-    parts = line.strip().split()
-    # lines look like: "table inet FOO"
-    if len(parts) == 3 and parts[0] == "table":
-      present.add(f"{parts[1]} {parts[2]}")
-  missing = required - present
-  if missing:
-    raise RuntimeError(f"nftables missing tables: {', '.join(sorted(missing))}")
-
-  # Enable + restart unbound instances
-  for inst in instances:
-    unit = f"unbound@{inst}.service"
-    _run(["systemctl", "enable", unit])
-    _run(["systemctl", "restart", unit])
-    rcA, _, _ = _run(["systemctl", "is-active", unit])
-    logs.append(f"{unit}: {'active' if rcA == 0 else 'inactive'}")
-
-  logs.append("")
-  logs.append("✓ DNS deploy complete.")
-  return logs
-
-def main(argv=None) -> int:
-  ap = argparse.ArgumentParser(description="Deploy staged DNS (Unbound per-subu + nft redirect).")
-  ap.add_argument("--instances", nargs="+", default=["US", "x6"],
-                  help="Unbound instances to enable (default: US x6)")
-  args = ap.parse_args(argv)
-
-  errs = _preflight_errors()
-  if errs:
-    print("❌ deploy preflight found issue(s):", file=sys.stderr)
-    for e in errs:
-      print(f"  - {e}", file=sys.stderr)
-    return 2
-
-  try:
-    logs = deploy(args.instances)
-    print("\n".join(logs))
-    return 0
-  except Exception as e:
-    print(f"❌ deploy failed: {e}", file=sys.stderr)
-    return 2
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft b/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/10-block-IPv6.nft
new file mode 100644 (file)
index 0000000..eaee5be
--- /dev/null
@@ -0,0 +1,16 @@
+table inet NO-IPV6 {
+  chain input {
+    type filter hook input priority raw; policy accept;
+    meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop
+  }
+
+  chain output {
+    type filter hook output priority raw; policy accept;
+    meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop
+  }
+
+  chain forward {
+    type filter hook forward priority raw; policy accept;
+    meta nfproto ipv6 counter comment "drop all IPv6 forward" drop
+  }
+}
diff --git a/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft b/developer/source/DNS/deprecated/stage_orig/etc/nftables.d/20-SUBU-ports.nft
new file mode 100644 (file)
index 0000000..6c31446
--- /dev/null
@@ -0,0 +1,47 @@
+table inet SUBU-DNS-REDIRECT {
+  chain output {
+    type nat hook output priority -100; policy accept;
+
+    # Redirect DNS for the subu UIDs to local Unbound listeners
+    meta skuid 2017 udp dport 53 redirect to :5301
+    meta skuid 2018 udp dport 53 redirect to :5302
+    meta skuid 2017 tcp dport 53 redirect to :5301
+    meta skuid 2018 tcp dport 53 redirect to :5302
+  }
+}
+
+table inet SUBU-PORT-EGRESS {
+  chain output {
+    type filter hook output priority 0; policy accept;
+
+    # Always allow loopback on egress
+    oifname "lo" accept
+
+    # No IPv6 for subu (until you reintroduce v6)
+    meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop
+
+    ##### x6 (UID 2018)
+    # Block some exfil channels regardless of iface
+    meta skuid 2018 tcp dport {25,465,587}  counter comment "block SMTP/Submission" drop
+    meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop
+    meta skuid 2018 tcp dport 853            counter comment "block DoT (TCP/853)" drop
+
+    # (Optional) allow ICMP echo out via x6
+    meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept
+
+    # Enforce interface binding
+    meta skuid 2018 oifname "x6" accept
+    meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop
+
+    ##### US (UID 2017)
+    meta skuid 2017 tcp dport {25,465,587}  counter drop comment "block SMTP/Submission"
+    meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN"
+    meta skuid 2017 tcp dport 853            counter drop comment "block DoT (TCP/853)"
+
+    # (Optional) ICMP via US
+    meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept
+
+    meta skuid 2017 oifname "US" accept
+    meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop
+  }
+}
diff --git a/developer/source/DNS/deprecated/stage_orig/etc/systemd/system/unbound@.service b/developer/source/DNS/deprecated/stage_orig/etc/systemd/system/unbound@.service
new file mode 100644 (file)
index 0000000..ba2919b
--- /dev/null
@@ -0,0 +1,19 @@
+[Unit]
+Description=Unbound DNS instance for %i (per-subu tunnel egress)
+After=network-online.target wg-quick@%i.service
+Requires=wg-quick@%i.service
+Wants=network-online.target
+
+[Service]
+Type=simple
+ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf
+User=unbound
+Group=unbound
+Restart=on-failure
+RestartSec=2s
+AmbientCapabilities=CAP_NET_BIND_SERVICE
+CapabilityBoundingSet=CAP_NET_BIND_SERVICE
+NoNewPrivileges=true
+
+[Install]
+WantedBy=multi-user.target
diff --git a/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-US.conf b/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-US.conf
new file mode 100644 (file)
index 0000000..1995438
--- /dev/null
@@ -0,0 +1,18 @@
+server:
+  username: "unbound"
+  chroot: ""
+  directory: "/etc/unbound"
+  do-daemonize: no
+  interface: 127.0.0.1@5301
+  hide-identity: yes
+  hide-version: yes
+  harden-glue: yes
+  harden-dnssec-stripped: yes
+  qname-minimisation: yes
+  prefetch: yes
+  outgoing-interface: 10.0.0.1
+
+forward-zone:
+  name: "."
+  forward-addr: 1.1.1.1
+  forward-addr: 1.0.0.1
diff --git a/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-x6.conf b/developer/source/DNS/deprecated/stage_orig/etc/unbound/unbound-x6.conf
new file mode 100644 (file)
index 0000000..ed49241
--- /dev/null
@@ -0,0 +1,18 @@
+server:
+  username: "unbound"
+  chroot: ""
+  directory: "/etc/unbound"
+  do-daemonize: no
+  interface: 127.0.0.1@5302
+  hide-identity: yes
+  hide-version: yes
+  harden-glue: yes
+  harden-dnssec-stripped: yes
+  qname-minimisation: yes
+  prefetch: yes
+  outgoing-interface: 10.8.0.2
+
+forward-zone:
+  name: "."
+  forward-addr: 1.1.1.1
+  forward-addr: 1.0.0.1
diff --git a/developer/source/DNS/deprecated/stage_orig/usr/local/sbin/DNS_status.sh b/developer/source/DNS/deprecated/stage_orig/usr/local/sbin/DNS_status.sh
new file mode 100755 (executable)
index 0000000..d4db58e
--- /dev/null
@@ -0,0 +1,12 @@
+#!/usr/bin/env bash
+set -euo pipefail
+echo "== DNS status =="
+systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true
+echo
+echo "== nftables =="
+nft list table inet NAT-DNS-REDIRECT || true
+echo
+echo "== Unbound logs (last 50 lines each) =="
+journalctl -u unbound@US -n 50 --no-pager || true
+echo
+journalctl -u unbound@x6 -n 50 --no-pager || true
diff --git a/developer/source/DNS/deprecated/stage_show_plan.py b/developer/source/DNS/deprecated/stage_show_plan.py
new file mode 100644 (file)
index 0000000..075e65b
--- /dev/null
@@ -0,0 +1,97 @@
+#!/usr/bin/env -S python3 -B
+"""
+stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan.
+
+Given: a stage root directory.
+Does:  loads Stage.py, executes each config, builds a native plan map, summarizes it.
+Returns: exit code 0 on success, non-zero on error.
+"""
+from __future__ import annotations
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib
+
+# ---------- helpers ----------
+
+def _load_stage_module(stage_root_dpath: Path):
+  "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
+  mod_fpath = stage_root_dpath/"Stage.py"
+  if not mod_fpath.exists():
+    raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
+  spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
+  mod = importlib.util.module_from_spec(spec)
+  sys.modules["Stage"] = mod
+  assert spec and spec.loader
+  spec.loader.exec_module(mod)  # type: ignore
+  return mod
+
+def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
+  "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
+  rel_fpath_list: list[Path] = []
+  for p in stage_root_dpath.rglob("*.py"):
+    if p.name == "Stage.py": continue
+    if p.is_file():
+      rel_fpath_list.append(p.relative_to(stage_root_dpath))
+  return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
+
+def _sha256_hex(b: bytes)-> str:
+  "Given: bytes. Does: sha256. Returns: hex string."
+  return hashlib.sha256(b).hexdigest()
+
+# ---------- main ----------
+
+def main(argv: list[str]|None=None)-> int:
+  "Given: CLI. Does: show plan. Returns: exit code."
+  ap = argparse.ArgumentParser(prog="stage_show_plan.py"
+   ,description="Run staged config scripts and print the resulting plan.")
+  ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
+  args = ap.parse_args(argv)
+
+  stage_root_dpath = Path(args.stage)
+  StageMod = _load_stage_module(stage_root_dpath)
+  Stage = StageMod.Stage
+  Stage._reset()
+  Stage.set_meta(
+    planner_user_name=getpass.getuser()
+    ,planner_uid_int=os.getuid()
+    ,planner_gid_int=os.getgid()
+    ,host_name=socket.gethostname()
+    ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
+  )
+
+  for rel_fpath in _config_rel_fpaths(stage_root_dpath):
+    Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
+    runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
+    Stage._end()
+
+  plan_map = Stage.plan_object()
+  entries_list = plan_map["entries_list"]
+  print(f"Plan version: {plan_map['version_int']}")
+  print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')}  "
+        f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}")
+  print(f"Created: {plan_map['meta_map'].get('created_utc_str')}")
+  print(f"Entries: {len(entries_list)}\n")
+
+  for i ,e_map in enumerate(entries_list ,1):
+    op = e_map.get("op")
+    dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}"
+    if op == "copy":
+      content = e_map.get("content_bytes") or b""
+      sz = len(content)
+      mode = e_map.get("mode_octal_str") or "????"
+      owner = e_map.get("owner_name") or "?"
+      h = _sha256_hex(content)
+      print(f"{i:02d}. copy     -> {dst_fpath_str}  mode {mode} owner {owner}  bytes {sz} sha256 {h[:16]}…")
+    elif op == "displace":
+      print(f"{i:02d}. displace -> {dst_fpath_str}")
+    elif op == "delete":
+      print(f"{i:02d}. delete   -> {dst_fpath_str}")
+    else:
+      print(f"{i:02d}. ?op?     -> {dst_fpath_str}  ({op})")
+  return 0
+
+if __name__ == "__main__":
+  sys.exit(main())
diff --git a/developer/source/DNS/executor.py b/developer/source/DNS/executor.py
new file mode 100755 (executable)
index 0000000..fffa490
--- /dev/null
@@ -0,0 +1,271 @@
+#!/usr/bin/env -S python3 -B
+"""
+executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now)
+
+Phase 1 (outer):
+  - Build a combined plan by executing each config's `configure(prov, planner, WriteFileMeta)`.
+  - Optionally print the plan via Planner.print().
+  - Optionally stop.
+
+Phase 2 (inner shim in same program for now; no privilege yet):
+  - Encode combined plan to CBOR and pass to inner path.
+  - Inner decodes back to a Journal and optionally prints it.
+  - Optionally stop.
+
+Discovery:
+  - --stage (default: ./stage) points at the stage directory root.
+  - By default, *every file* under --stage (recursively) is executed as a config,
+    regardless of extension. Editors can still use .py for highlighting; we strip
+    only a trailing ".py" to derive prov.read_fname.
+
+"""
+
+from __future__ import annotations
+
+# no bytecode anywhere
+import sys, os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1")
+
+from pathlib import Path
+import argparse
+import getpass
+import tempfile
+import runpy
+import subprocess
+import datetime as _dt
+import os, fnmatch, stat
+
+
+# Local module: Planner.py (same directory)
+from Planner import (
+  Planner, PlanProvenance, WriteFileMeta, Journal, Command,
+)
+
+# -------- utilities --------
+
+def iso_utc_now_str() -> str:
+  return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+
+def _split_globs(glob_arg: str) -> list[str]:
+    parts = [g.strip() for g in (glob_arg or "").split(",") if g.strip()]
+    # Default includes both deep and top-level files
+    return parts or ["**/*", "*"]
+
+def find_config_paths(stage_root: Path, glob_arg: str) -> list[Path]:
+    """
+    Given stage root and comma-glob string, return sorted list of files (regular or symlink).
+    Defaults to match ALL files under stage, including top-level ones.
+    """
+    root = stage_root.resolve()
+    patterns = _split_globs(glob_arg)
+    out: set[Path] = set()
+
+    for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
+        # (optional) prune symlinked dirs to avoid cycles; files can still be symlinks
+        dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath, d))]
+
+        for fname in filenames:
+            f_abs = Path(dirpath, fname)
+            rel = f_abs.relative_to(root).as_posix()
+            if any(fnmatch.fnmatch(rel, pat) for pat in patterns):
+                try:
+                    st = f_abs.lstat()
+                    if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
+                        out.add(f_abs)
+                except Exception:
+                    # unreadable/broken entries are skipped
+                    pass
+
+    return sorted(out, key=lambda p: p.as_posix())
+
+
+
+def _run_one_config(config_path: Path, stage_root: Path) -> Planner:
+  """Execute a single config's `configure(prov, planner, WriteFileMeta)` and return that config's Planner."""
+  prov = PlanProvenance(stage_root=stage_root, config_path=config_path)
+  per_planner = Planner(provenance=prov)  # defaults derive from this file's provenance
+  env = runpy.run_path(str(config_path))
+  fn = env.get("configure")
+  if not callable(fn):
+    raise RuntimeError(f"{config_path}: missing callable configure(prov, planner, WriteFileMeta)")
+  fn(prov, per_planner, WriteFileMeta)
+  return per_planner
+
+def _aggregate_into_master(stage_root: Path, planners: list[Planner]) -> Planner:
+  """Create a master Planner and copy all Commands from per-config planners into it."""
+  # Synthetic provenance for the master planner (used only for display/meta)
+  fake_config = stage_root / "(aggregate).py"
+  master = Planner(PlanProvenance(stage_root=stage_root, config_path=fake_config))
+
+  # annotate meta
+  master.journal().set_meta(
+    generator_prog_str="executor.py",
+    generated_at_utc_str=iso_utc_now_str(),
+    user_name_str=getpass.getuser(),
+    host_name_str=os.uname().nodename if hasattr(os, "uname") else "unknown",
+    stage_root_dpath_str=str(stage_root.resolve()),
+    configs_list=[p._prov.config_rel_fpath.as_posix() for p in planners],
+  )
+
+  # copy commands
+  out_j = master.journal()
+  for p in planners:
+    for cmd in p.journal().command_list:
+      out_j.append(cmd)  # keep Command objects as-is
+  return master
+
+# ----- CBOR “matchbox” (simple wrapper kept local to executor) -----
+
+def _plan_to_cbor_bytes(planner: Planner) -> bytes:
+  """Serialize a Planner's Journal to CBOR bytes."""
+  try:
+    import cbor2
+  except Exception as e:
+    raise RuntimeError(f"cbor2 is required: {e}")
+  plan_dict = planner.journal().as_dictionary()
+  return cbor2.dumps(plan_dict, canonical=True)
+
+def _journal_from_cbor_bytes(data: bytes) -> Journal:
+  """Rebuild a Journal from CBOR bytes."""
+  try:
+    import cbor2
+  except Exception as e:
+    raise RuntimeError(f"cbor2 is required: {e}")
+  obj = cbor2.loads(data)
+  if not isinstance(obj, dict):
+    raise ValueError("CBOR root must be a dict")
+  return Journal(plan_dict=obj)
+
+# -------- inner executor (phase 2) --------
+
+def _inner_main(plan_path: Path, phase2_print: bool, phase2_then_stop: bool) -> int:
+  """Inner executor path: decode CBOR → Journal; optionally print; (apply TBD)."""
+  try:
+    data = Path(plan_path).read_bytes()
+  except Exception as e:
+    print(f"error: failed to read plan file: {e}", file=sys.stderr)
+    return 2
+
+  try:
+    journal = _journal_from_cbor_bytes(data)
+  except Exception as e:
+    print(f"error: failed to decode CBOR: {e}", file=sys.stderr)
+    return 2
+
+  if phase2_print:
+    journal.print()
+
+  if phase2_then_stop:
+    return 0
+
+  # (Stage 3 apply would go here; omitted in MVP)
+  return 0
+
+# -------- outer executor (phase 1 & handoff) --------
+
+def _outer_main(args) -> int:
+  stage_root = Path(args.stage)
+  if not stage_root.is_dir():
+    print(f"error: --stage not a directory: {stage_root}", file=sys.stderr)
+    return 2
+
+  cfgs = find_config_paths(stage_root, args.glob)
+  if not cfgs:
+    print("No configuration files found.")
+    return 0
+
+  # Execute each config into its own planner
+  per_planners: list[Planner] = []
+  for cfg in cfgs:
+    try:
+      per_planners.append(_run_one_config(cfg, stage_root))
+    except SystemExit:
+      raise
+    except Exception as e:
+      print(f"error: executing {cfg}: {e}", file=sys.stderr)
+      return 2
+
+  # Aggregate into a single master planner for printing/CBOR
+  master = _aggregate_into_master(stage_root, per_planners)
+
+  if args.phase_1_print:
+    master.print()
+
+  if args.phase_1_then_stop:
+    return 0
+
+  # Phase 2: encode CBOR and invoke inner path (same script, --inner)
+  try:
+    cbor_bytes = _plan_to_cbor_bytes(master)
+  except Exception as e:
+    print(f"error: CBOR encode failed: {e}", file=sys.stderr)
+    return 2
+
+  with tempfile.NamedTemporaryFile(prefix="stagehand_plan_", suffix=".cbor", delete=False) as tf:
+    tf.write(cbor_bytes)
+    plan_path = tf.name
+
+  try:
+    cmd = [
+      sys.executable,
+      str(Path(__file__).resolve()),
+      "--inner",
+      "--plan", plan_path,
+    ]
+    if args.phase_2_print:
+      cmd.append("--phase-2-print")
+    if args.phase_2_then_stop:
+      cmd.append("--phase-2-then-stop")
+
+    proc = subprocess.run(cmd)
+    return proc.returncode
+  finally:
+    try:
+      os.unlink(plan_path)
+    except Exception:
+      pass
+
+# -------- CLI --------
+
+def main(argv: list[str] | None = None) -> int:
+  ap = argparse.ArgumentParser(
+    prog="executor.py",
+    description="StageHand outer/inner executor (plan → CBOR → decode).",
+  )
+  ap.add_argument("--stage", default="stage", help="stage root directory (default: ./stage)")
+
+  ap.add_argument("--glob", default="**/*",
+                  help="glob for config scripts under --stage (default: '**/*' = all files)")
+
+  # ap.add_argument("--glob",
+  #                 default="**/*",
+  #                 help="comma-separated globs under --stage (default: **/*; every file is a config)")
+
+  # Phase-1 (outer) controls
+  ap.add_argument("--phase-1-print", action="store_true", help="print master planner (phase 1)")
+  ap.add_argument("--phase-1-then-stop", action="store_true", help="stop after phase 1")
+
+  # Phase-2 (inner) controls (outer forwards these to inner)
+  ap.add_argument("--phase-2-print", action="store_true", help="print decoded journal (phase 2)")
+  ap.add_argument("--phase-2-then-stop", action="store_true", help="stop after phase 2 decode")
+
+  # Inner-only flags (not for users)
+  ap.add_argument("--inner", action="store_true", help=argparse.SUPPRESS)
+  ap.add_argument("--plan", default=None, help=argparse.SUPPRESS)
+
+  args = ap.parse_args(argv)
+
+  if args.inner:
+    if not args.plan:
+      print("error: --inner requires --plan <file>", file=sys.stderr)
+      return 2
+    return _inner_main(Path(args.plan),
+                       phase2_print=args.phase_2_print,
+                       phase2_then_stop=args.phase_2_then_stop)
+
+  return _outer_main(args)
+
+
+if __name__ == "__main__":
+  sys.exit(main())
diff --git a/developer/source/DNS/install_staged_tree.py b/developer/source/DNS/install_staged_tree.py
deleted file mode 100755 (executable)
index 7c1786f..0000000
+++ /dev/null
@@ -1,262 +0,0 @@
-#!/usr/bin/env python3
-"""
-install_staged_tree.py
-RT-v2025.09.15.2
-
-A dumb installer: copy staged files into the target root with backups and
-deterministic permissions. No systemd stop/start, no daemon-reload.
-
-- Extended whitelist to include DNS bundle assets:
-  * /etc/unbound/*.conf             -> 0644
-  * /etc/nftables.d/*.nft           -> 0644
-  * /usr/local/sbin/*               -> 0500
-  * /etc/systemd/system/*.service   -> 0644
-- Keeps existing WireGuard/iproute2 handling.
-- API unchanged:
-    install_staged_tree(stage_root: Path, dest_root: Path,
-                        create_dirs=False, skip_identical=True)
-  -> returns (logs: list[str], detected_ifaces: list[str])
-"""
-
-from __future__ import annotations
-from pathlib import Path
-from typing import List, Optional, Sequence, Tuple
-import argparse
-import datetime as dt
-import hashlib
-import os
-import shutil
-import sys
-
-ROOT = Path(__file__).resolve().parent
-DEFAULT_STAGE = ROOT / "stage"
-
-def _sha256(path: Path) -> str:
-  h = hashlib.sha256()
-  with path.open("rb") as f:
-    for chunk in iter(lambda: f.read(1<<20), b""):
-      h.update(chunk)
-  return h.hexdigest()
-
-def _ensure_parents(dest_root: Path, rel: Path, create: bool) -> None:
-  parent = (dest_root / rel).parent
-  if parent.exists():
-    return
-  if not create:
-    raise RuntimeError(f"missing parent directory: {parent}")
-  parent.mkdir(parents=True, exist_ok=True)
-
-def _backup_existing_to_stage(stage_root: Path, dest_root: Path, rel: Path) -> Optional[Path]:
-  """If target exists, copy it back into stage/_backups/<ts>/<rel> and return backup path."""
-  target = dest_root / rel
-  if not target.exists():
-    return None
-  ts = dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-  backup = stage_root / "_backups" / ts / rel
-  backup.parent.mkdir(parents=True, exist_ok=True)
-  shutil.copy2(target, backup)
-  return backup
-
-def _atomic_install(src: Path, dst: Path, mode: int) -> None:
-  tmp = dst.with_suffix(dst.suffix + ".tmp")
-  shutil.copyfile(src, tmp)
-  os.chmod(tmp, mode)
-  try:
-    os.chown(tmp, 0, 0)  # best-effort
-  except PermissionError:
-    pass
-  os.replace(tmp, dst)
-
-def _mode_for_rel(rel: Path) -> Optional[int]:
-  """Choose a mode based on the relative path bucket."""
-  s = str(rel)
-
-  # Existing buckets
-  if s.startswith("usr/local/bin/"):
-    return 0o500
-  if s.startswith("etc/wireguard/") and rel.suffix == ".conf":
-    return 0o600
-  if s == "etc/iproute2/rt_tables":
-    return 0o644
-  if s.startswith("etc/systemd/system/") and s.endswith(".conf"):
-    return 0o644
-
-  # NEW: DNS bundle buckets
-  if s.startswith("usr/local/sbin/"):
-    return 0o500
-  if s.startswith("etc/unbound/") and s.endswith(".conf"):
-    return 0o644
-  if s.startswith("etc/nftables.d/") and s.endswith(".nft"):
-    return 0o644
-  if s.startswith("etc/systemd/system/") and s.endswith(".service"):
-    return 0o644
-
-  return None
-
-def _iter_stage_targets(stage_root: Path) -> List[Path]:
-  """Return a list of *relative* paths under stage that match our whitelist."""
-  rels: List[Path] = []
-
-  # /usr/local/bin/*
-  bin_dir = stage_root / "usr" / "local" / "bin"
-  if bin_dir.is_dir():
-    for p in sorted(bin_dir.glob("*")):
-      if p.is_file():
-        rels.append(p.relative_to(stage_root))
-
-  # NEW: /usr/local/sbin/*
-  sbin_dir = stage_root / "usr" / "local" / "sbin"
-  if sbin_dir.is_dir():
-    for p in sorted(sbin_dir.glob("*")):
-      if p.is_file():
-        rels.append(p.relative_to(stage_root))
-
-  # /etc/wireguard/*.conf
-  wg_dir = stage_root / "etc" / "wireguard"
-  if wg_dir.is_dir():
-    for p in sorted(wg_dir.glob("*.conf")):
-      rels.append(p.relative_to(stage_root))
-
-  # /etc/systemd/system/wg-quick@*.service.d/*.conf
-  sysd_dir = stage_root / "etc" / "systemd" / "system"
-  if sysd_dir.is_dir():
-    for p in sorted(sysd_dir.rglob("wg-quick@*.service.d/*.conf")):
-      rels.append(p.relative_to(stage_root))
-
-  # NEW: /etc/systemd/system/*.service
-  if sysd_dir.is_dir():
-    for p in sorted(sysd_dir.glob("*.service")):
-      if p.is_file():
-        rels.append(p.relative_to(stage_root))
-
-  # /etc/iproute2/rt_tables
-  rt = stage_root / "etc" / "iproute2" / "rt_tables"
-  if rt.is_file():
-    rels.append(rt.relative_to(stage_root))
-
-  # NEW: /etc/unbound/*.conf
-  ub_dir = stage_root / "etc" / "unbound"
-  if ub_dir.is_dir():
-    for p in sorted(ub_dir.glob("*.conf")):
-      rels.append(p.relative_to(stage_root))
-
-  # NEW: /etc/nftables.d/*.nft
-  nft_dir = stage_root / "etc" / "nftables.d"
-  if nft_dir.is_dir():
-    for p in sorted(nft_dir.glob("*.nft")):
-      rels.append(p.relative_to(stage_root))
-
-  return rels
-
-def _discover_ifaces_from_stage(stage_root: Path) -> List[str]:
-  """Peek into staged artifacts to guess iface names (for friendly next-steps)."""
-  names = set()
-
-  # from /etc/wireguard/<iface>.conf
-  wg_dir = stage_root / "etc" / "wireguard"
-  if wg_dir.is_dir():
-    for p in wg_dir.glob("*.conf"):
-      names.add(p.stem)
-
-  # from /etc/systemd/system/wg-quick@<iface>.service.d/
-  sysd = stage_root / "etc" / "systemd" / "system"
-  if sysd.is_dir():
-    for d in sysd.glob("wg-quick@*.service.d"):
-      name = d.name
-      at = name.find("@")
-      dot = name.find(".service.d")
-      if at != -1 and dot != -1 and dot > at:
-        names.add(name[at+1:dot])
-
-  return sorted(names)
-
-def install_staged_tree(
-   stage_root: Path,
-   dest_root: Path,
-   create_dirs: bool = False,
-   skip_identical: bool = True,
-) -> Tuple[List[str], List[str]]:
-  """
-  Copy files from stage_root to dest_root.
-  Returns (logs, detected_ifaces).
-  """
-  old_umask = os.umask(0o077)
-  logs: List[str] = []
-  try:
-    staged = _iter_stage_targets(stage_root)
-    if not staged:
-      raise RuntimeError("nothing to install (stage is empty or whitelist didn’t match)")
-
-    for rel in staged:
-      src = stage_root / rel
-      dst = dest_root / rel
-
-      mode = _mode_for_rel(rel)
-      if mode is None:
-        logs.append(f"skip (not whitelisted): {rel}")
-        continue
-
-      _ensure_parents(dest_root, rel, create_dirs)
-
-      backup = _backup_existing_to_stage(stage_root, dest_root, rel)
-      if backup:
-        logs.append(f"backup: {dst} -> {backup}")
-
-      if skip_identical and dst.exists():
-        try:
-          if _sha256(src) == _sha256(dst):
-            logs.append(f"identical: skip {rel}")
-            continue
-        except Exception:
-          pass
-
-      _atomic_install(src, dst, mode)
-      logs.append(f"install: {rel} (mode {oct(mode)})")
-
-    ifaces = _discover_ifaces_from_stage(stage_root)
-    return (logs, ifaces)
-  finally:
-    os.umask(old_umask)
-
-def _require_root(allow_nonroot: bool) -> None:
-  if not allow_nonroot and os.geteuid() != 0:
-    raise RuntimeError("must run as root (use --force-nonroot to override)")
-
-def main(argv: Optional[Sequence[str]] = None) -> int:
-  ap = argparse.ArgumentParser(description="Install staged artifacts into a target root. No service control.")
-  ap.add_argument("--stage", default=str(DEFAULT_STAGE))
-  ap.add_argument("--root",  default="/")
-  ap.add_argument("--create-dirs", action="store_true", help="create missing parent directories")
-  ap.add_argument("--no-skip-identical", action="store_true", help="always replace even if content identical")
-  ap.add_argument("--force-nonroot", action="store_true", help="allow non-root install (ownership may be wrong)")
-  args = ap.parse_args(argv)
-
-  try:
-    _require_root(allow_nonroot=args.force_nonroot)
-    logs, ifaces = install_staged_tree(
-      stage_root=Path(args.stage),
-      dest_root=Path(args.root),
-      create_dirs=args.create_dirs,
-      skip_identical=(not args.no_skip_identical),
-    )
-    for line in logs:
-      print(line)
-
-    print("\n=== Summary ===")
-    print(f"Installed {sum(1 for l in logs if l.startswith('install:'))} file(s).")
-    if ifaces:
-      lst = " ".join(ifaces)
-      print(f"Detected interfaces from stage: {lst}")
-      print("\nNext steps:")
-      print(f"  sudo ./start_iface.py {lst}")
-    else:
-      print("No interfaces detected in staged artifacts.")
-      print("\nNext steps:")
-      print("  sudo ./start_iface.py <iface> [more ifaces]")
-    return 0
-  except Exception as e:
-    print(f"❌ install failed: {e}", file=sys.stderr)
-    return 2
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/plan_show.py b/developer/source/DNS/plan_show.py
deleted file mode 100755 (executable)
index 780d34a..0000000
+++ /dev/null
@@ -1,273 +0,0 @@
-#!/usr/bin/env -S python3 -B
-"""
-plan_show.py — build and display a staged plan (UNPRIVILEGED).
-
-Given: a stage directory of config scripts (*.stage.py by default).
-Does:  executes each script with a pre-created Planner (P) and PlannerContext,
-       aggregates Commands into a single Journal, by default prints from the CBOR
-       round-trip (encode→decode) so the human view matches what will be shipped
-       to stage_cp; runs well-formed (WF) invariant checks. Can emit CBOR if requested.
-Returns: exit status 0 on success; 2 on WF errors or usage errors.
-"""
-
-from __future__ import annotations
-
-# no bytecode anywhere
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse
-import datetime as _dt
-import getpass
-import runpy
-
-# local module (same dir): Planner
-from Planner import Planner ,PlannerContext ,Journal ,Command
-
-# ===== Utilities (general / reusable) =====
-
-def iso_utc_now_str()-> str:
-  "Given n/a. Does return compact UTC timestamp. Returns YYYYMMDDTHHMMSSZ."
-  return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def find_configs(stage_root_dpath: Path ,glob_pat_str: str)-> list[Path]:
-  "Given stage root and glob. Does find matching files under stage. Returns list of absolute Paths."
-  root = stage_root_dpath.resolve()
-  return sorted((p for p in root.glob(glob_pat_str) if p.is_file()) ,key=lambda p: p.as_posix())
-
-def human_size(n: int)-> str:
-  "Given byte count. Does format human size. Returns string."
-  units = ["B","KB","MB","GB","TB"]
-  i = 0
-  x = float(max(0 ,n))
-  while x >= 1024 and i < len(units)-1:
-    x /= 1024.0
-    i += 1
-  return f"{x:.1f} {units[i]}"
-
-def _dst_path_str(args_map: dict)-> str:
-  "Given args map. Does join write_file path. Returns POSIX path or '?'."
-  d = args_map.get("write_file_dpath_str") or ""
-  f = args_map.get("write_file_fname_str") or ""
-  try:
-    if d and f and "/" not in f:
-      return (Path(d)/f).as_posix()
-  except Exception:
-    pass
-  return "?"
-
-# ===== WF invariants (MVP) =====
-# These are “well-formedness” rules (shape/encoding/domain), not policy or privilege checks.
-
-def wf_check(journal: Journal)-> list[str]:
-  """
-  Given Journal. Does run invariant checks on meta and each Command entry. Returns list of error strings.
-  Invariants (MVP):
-    - meta_map: must include generator identity and stage_root_dpath_str.
-    - entry.op ∈ {'copy','displace','delete'}
-    - all ops: write_file_dpath_str absolute; write_file_fname_str is bare filename.
-    - copy: owner_name_str non-empty; mode_int ∈ [0..0o7777] and no suid/sgid; content_bytes present (bytes).
-  """
-  errs: list[str] = []
-  meta = journal.meta_map or {}
-
-  # meta presence (light placeholder)
-  if not isinstance(meta ,dict):
-    errs.append("WF_META: meta_map must be a map")
-  else:
-    if not meta.get("stage_root_dpath_str"):
-      errs.append("WF_META: missing stage_root_dpath_str")
-    if not meta.get("generator_prog_str"):
-      errs.append("WF_META: missing generator_prog_str")
-
-  # entries
-  for idx ,cmd in enumerate(journal.commands_list ,1):
-    prefix = f"WF[{idx:02d}]"
-    if not isinstance(cmd ,Command):
-      errs.append(f"{prefix}: entry is not Command")
-      continue
-    op = cmd.name_str
-    if op not in {"copy","displace","delete"}:
-      errs.append(f"{prefix}: unknown op '{op}'")
-      continue
-    am = cmd.args_map or {}
-    dpath = am.get("write_file_dpath_str")
-    fname = am.get("write_file_fname_str")
-    if not isinstance(dpath ,str) or not dpath.startswith("/"):
-      errs.append(f"{prefix}: write_file_dpath_str must be absolute")
-    if not isinstance(fname ,str) or not fname or "/" in fname:
-      errs.append(f"{prefix}: write_file_fname_str must be a bare filename")
-
-    if op == "copy":
-      owner = am.get("owner_name_str")
-      mode  = am.get("mode_int")
-      data  = am.get("content_bytes")
-      if not isinstance(owner ,str) or not owner.strip():
-        errs.append(f"{prefix}: owner_name_str must be non-empty")
-      if not isinstance(mode ,int) or not (0 <= mode <= 0o7777):
-        errs.append(f"{prefix}: mode_int must be int in [0..0o7777]")
-      elif (mode & 0o6000):
-        errs.append(f"{prefix}: mode_int suid/sgid not allowed in MVP")
-      if not isinstance(data ,(bytes,bytearray)):
-        errs.append(f"{prefix}: content_bytes must be bytes")
-  return errs
-
-# ===== Planner execution =====
-
-def _run_one_config(config_abs_fpath: Path ,stage_root_dpath: Path)-> Planner:
-  """
-  Given abs path to a config script and stage root. Does construct a PlannerContext and Planner,
-  then executes the script with 'P' (Planner instance) bound in globals. Returns Planner with Journal.
-  Notes:
-    - Defaults are intentionally spartan; config should refine them via P.set_context(...).
-    - This is UNPRIVILEGED; no filesystem changes are performed here.
-  """
-  read_rel = config_abs_fpath.resolve().relative_to(stage_root_dpath.resolve())
-  ctx = PlannerContext.from_values(
-    stage_root_dpath=stage_root_dpath
-    ,read_file_rel_fpath=read_rel
-    ,write_file_dpath_str="/"
-    ,write_file_fname_str="."
-    ,owner_name_str=getpass.getuser()
-    ,perm=0o644
-    ,content=None
-  )
-  P = Planner(ctx)
-  g = {"Planner": Planner ,"PlannerContext": PlannerContext ,"P": P}
-  runpy.run_path(str(config_abs_fpath) ,init_globals=g)
-  return P
-
-def _aggregate_journal(planners_list: list[Planner] ,stage_root_dpath: Path)-> Journal:
-  "Given planners and stage root. Does aggregate Commands into a single Journal with meta. Returns Journal."
-  J = Journal()
-  J.set_meta(
-    version_int=1
-    ,generator_prog_str="plan_show.py"
-    ,generated_at_utc_str=iso_utc_now_str()
-    ,user_name_str=getpass.getuser()
-    ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown"
-    ,stage_root_dpath_str=str(stage_root_dpath.resolve())
-    ,configs_list=[p.context().read_file_rel_fpath.as_posix() for p in planners_list]
-  )
-  for p in planners_list:
-    for cmd in p.journal().commands_list:
-      J.append(cmd)
-  return J
-
-def _print_plan(journal: Journal)-> None:
-  "Given Journal. Does print a readable summary. Returns None."
-  meta = journal.meta_map or {}
-  print(f"Stage: {meta.get('stage_root_dpath_str','?')}")
-  print(f"Generated: {meta.get('generated_at_utc_str','?')} by {meta.get('user_name_str','?')}@{meta.get('host_name_str','?')}\n")
-
-  entries = journal.commands_list
-  if not entries:
-    print("(plan is empty)")
-    return
-
-  n_copy = sum(1 for c in entries if c.name_str=="copy")
-  n_disp = sum(1 for c in entries if c.name_str=="displace")
-  n_del  = sum(1 for c in entries if c.name_str=="delete")
-  print(f"Entries: {len(entries)}  copy:{n_copy}  displace:{n_disp}  delete:{n_del}\n")
-
-  for i ,cmd in enumerate(entries ,1):
-    am = cmd.args_map
-    dst = _dst_path_str(am)
-    if cmd.name_str == "copy":
-      size = len(am.get("content_bytes") or b"")
-      mode = am.get("mode_int")
-      owner = am.get("owner_name_str")
-      print(f"{i:02d}. copy     -> {dst}  mode {mode:04o} owner {owner} bytes {size} ({human_size(size)})")
-    elif cmd.name_str == "displace":
-      print(f"{i:02d}. displace -> {dst}")
-    elif cmd.name_str == "delete":
-      print(f"{i:02d}. delete   -> {dst}")
-    else:
-      print(f"{i:02d}. ?op?     -> {dst}")
-
-def _maybe_emit_CBOR(journal: Journal ,emit_CBOR_fpath: Path|None)-> None:
-  "Given Journal and optional path. Does write CBOR if requested. Returns None."
-  if not emit_CBOR_fpath:
-    return
-  try:
-    data = journal.to_CBOR_bytes(canonical_bool=True)
-  except Exception as e:
-    print(f"error: CBOR encode failed: {e}" ,file=sys.stderr)
-    raise
-  emit_CBOR_fpath.parent.mkdir(parents=True ,exist_ok=True)
-  with open(emit_CBOR_fpath ,"wb") as fh:
-    fh.write(data)
-  print(f"\nWrote CBOR plan: {emit_CBOR_fpath}  ({len(data)} bytes)")
-
-# ===== CLI =====
-
-def main(argv: list[str]|None=None)-> int:
-  "Given CLI. Does discover configs, build plan, (optionally) CBOR round-trip before printing, run WF, optionally emit CBOR. Returns exit code."
-  ap = argparse.ArgumentParser(prog="plan_show.py"
-   ,description="Build and show a staged plan (no privilege, no apply).")
-  ap.add_argument("--stage",default="stage",help="stage directory root (default: ./stage)")
-  ap.add_argument("--glob",default="**/*.stage.py",help="glob for config scripts under --stage")
-  ap.add_argument("--emit-CBOR",default=None,help="write CBOR plan to this path (optional)")
-  ap.add_argument("--print-from-journal",action="store_true"
-                 ,help="print directly from in-memory Journal (skip CBOR round-trip)")
-  args = ap.parse_args(argv)
-
-  stage_root_dpath = Path(args.stage)
-  if not stage_root_dpath.is_dir():
-    print(f"error: --stage not a directory: {stage_root_dpath}" ,file=sys.stderr)
-    return 2
-
-  configs = find_configs(stage_root_dpath ,args.glob)
-  if not configs:
-    print("No config scripts found.")
-    return 0
-
-  planners: list[Planner] = []
-  for cfg in configs:
-    try:
-      planners.append(_run_one_config(cfg ,stage_root_dpath))
-    except SystemExit:
-      raise
-    except Exception as e:
-      print(f"error: executing {cfg}: {e}" ,file=sys.stderr)
-      return 2
-
-  journal_src = _aggregate_journal(planners ,stage_root_dpath)
-
-  if not args.print_from_journal:
-    try:
-      cbor_bytes = journal_src.to_CBOR_bytes(canonical_bool=True)
-      journal = Journal.from_CBOR_bytes(cbor_bytes)
-    except Exception as e:
-      print(f"error: CBOR round-trip failed: {e}" ,file=sys.stderr)
-      return 2
-  else:
-    journal = journal_src
-
-  _print_plan(journal)
-
-  errs = wf_check(journal)
-  if errs:
-    print("\nerror(s):" ,file=sys.stderr)
-    for e in errs:
-      print(f"  - {e}" ,file=sys.stderr)
-    return 2
-
-  emit = Path(args.emit_CBOR) if args.emit_CBOR else None
-  if emit:
-    try:
-      data = (cbor_bytes if not args.print_from_journal else journal_src.to_CBOR_bytes(canonical_bool=True))
-      emit.parent.mkdir(parents=True ,exist_ok=True)
-      with open(emit ,"wb") as fh:
-        fh.write(data)
-      print(f"\nWrote CBOR plan: {emit}  ({len(data)} bytes)")
-    except Exception as e:
-      print(f"error: failed to write CBOR: {e}" ,file=sys.stderr)
-      return 2
-
-  return 0
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/stage b/developer/source/DNS/stage
deleted file mode 120000 (symlink)
index 6b42c53..0000000
+++ /dev/null
@@ -1 +0,0 @@
-stage_test_0/
\ No newline at end of file
diff --git a/developer/source/DNS/stage_cp.py b/developer/source/DNS/stage_cp.py
deleted file mode 100644 (file)
index 0114853..0000000
+++ /dev/null
@@ -1,322 +0,0 @@
-#!/usr/bin/env -S python3 -B
-"""
-stage_cp.py — build a CBOR plan from staged configs; show, validate, and apply with privilege.
-
-Given: a stage root directory.
-Does:  (user) run configs → build native plan → WF checks → summarize → encode plan → sudo re-exec
-       (root) decode plan → VALID + SANITY → apply ops (displace/copy/delete) safely.
-Returns: exit code.
-
-Requires: pip install cbor2
-"""
-from __future__ import annotations
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,tempfile ,subprocess ,pwd
-from typing import Any
-import cbor2
-
-# ---------- small utils ----------
-
-def _load_stage_module(stage_root_dpath: Path):
-  "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
-  mod_fpath = stage_root_dpath/"Stage.py"
-  if not mod_fpath.exists():
-    raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
-  spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
-  mod = importlib.util.module_from_spec(spec)
-  sys.modules["Stage"] = mod
-  assert spec and spec.loader
-  spec.loader.exec_module(mod)  # type: ignore
-  return mod
-
-def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
-  "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
-  rel_fpath_list: list[Path] = []
-  for p in stage_root_dpath.rglob("*.py"):
-    if p.name == "Stage.py": continue
-    if p.is_file():
-      rel_fpath_list.append(p.relative_to(stage_root_dpath))
-  return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
-
-def _sha256_bytes(b: bytes)-> bytes:
-  "Given: bytes. Does: sha256. Returns: 32-byte digest."
-  return hashlib.sha256(b).digest()
-
-def _dst_fpath_str(dst_dpath_str: str ,dst_fname_str: str)-> str:
-  "Given: a directory path string and a filename string. Does: join. Returns: combined POSIX path string."
-  if "/" in dst_fname_str:
-    return ""  # invalid; WF will flag
-  return str((Path(dst_dpath_str)/dst_fname_str))
-
-# ---------- WF / VALID / SANITY ----------
-
-_ALLOWLIST_PREFIXES_LIST = ["/etc" ,"/usr/local" ,"/etc/systemd/system"]
-
-def wf_check(plan_map: dict[str,Any])-> list[str]:
-  "Given: plan map. Does: shape/lexical checks only. Returns: list of error strings."
-  errs_list: list[str] = []
-  if plan_map.get("version_int") != 1:
-    errs_list.append("WF_VERSION: unsupported plan version")
-  entries_list = plan_map.get("entries_list")
-  if not isinstance(entries_list ,list):
-    errs_list.append("WF_ENTRIES: 'entries_list' missing or not a list")
-    return errs_list
-  for i ,e_map in enumerate(entries_list ,1):
-    op = e_map.get("op")
-    dst_dpath_str = e_map.get("dst_dpath")
-    dst_fname_str = e_map.get("dst_fname")
-    where = f"entry {i}"
-    if op not in ("copy","displace","delete"):
-      errs_list.append(f"WF_OP:{where}: invalid op {op!r}")
-      continue
-    if not isinstance(dst_dpath_str ,str) or not dst_dpath_str:
-      errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath missing or not str")
-    if not isinstance(dst_fname_str ,str) or not dst_fname_str:
-      errs_list.append(f"WF_DST_FNAME:{where}: dst_fname missing or not str")
-    if isinstance(dst_fname_str ,str) and "/" in dst_fname_str:
-      errs_list.append(f"WF_DST_FNAME:{where}: dst_fname must not contain '/'")
-    if isinstance(dst_dpath_str ,str) and not dst_dpath_str.startswith("/"):
-      errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath must be absolute")
-    full_fpath_str = _dst_fpath_str(dst_dpath_str or "" ,dst_fname_str or "")
-    if not full_fpath_str or not full_fpath_str.startswith("/"):
-      errs_list.append(f"WF_PATH:{where}: failed to construct absolute path from dst_dpath/fname")
-    if op == "copy":
-      mode_int = e_map.get("mode_int")
-      if not isinstance(mode_int ,int) or not (0 <= mode_int <= 0o7777):
-        errs_list.append(f"WF_MODE:{where}: mode_int must be int in [0..0o7777]")
-      if isinstance(mode_int ,int) and (mode_int & 0o6000):
-        errs_list.append(f"WF_MODE:{where}: suid/sgid bits not allowed in MVP")
-      owner_name = e_map.get("owner_name")
-      if not isinstance(owner_name ,str) or not owner_name:
-        errs_list.append(f"WF_OWNER:{where}: owner_name must be non-empty username string")
-      content_bytes = e_map.get("content_bytes")
-      if not (isinstance(content_bytes ,(bytes,bytearray)) and len(content_bytes) >= 0):
-        errs_list.append(f"WF_CONTENT:{where}: content_bytes must be bytes (may be empty)")
-      sha = e_map.get("sha256_bytes")
-      if sha is not None:
-        if not isinstance(sha ,(bytes,bytearray)) or len(sha)!=32:
-          errs_list.append(f"WF_SHA256:{where}: sha256_bytes must be 32-byte digest if present")
-        elif isinstance(content_bytes ,(bytes,bytearray)) and sha != _sha256_bytes(content_bytes):
-          errs_list.append(f"WF_SHA256_MISMATCH:{where}: sha256_bytes does not match content_bytes")
-  return errs_list
-
-def valid_check(plan_map: dict[str,Any])-> list[str]:
-  "Given: plan map. Does: environment (read-only) checks. Returns: list of error strings."
-  errs_list: list[str] = []
-  for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
-    op = e_map.get("op")
-    dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
-    where = f"entry {i}"
-    try:
-      parent_dpath = Path(dst_fpath_str).parent
-      if not parent_dpath.exists():
-        errs_list.append(f"VAL_PARENT_MISSING:{where}: parent dir does not exist: {parent_dpath}")
-      elif not parent_dpath.is_dir():
-        errs_list.append(f"VAL_PARENT_NOT_DIR:{where}: parent is not a directory: {parent_dpath}")
-      if Path(dst_fpath_str).is_dir():
-        errs_list.append(f"VAL_DST_IS_DIR:{where}: destination exists as a directory: {dst_fpath_str}")
-      if op == "copy":
-        owner_name = e_map.get("owner_name")
-        try:
-          pw = pwd.getpwnam(owner_name)  # may raise KeyError
-          e_map["_resolved_uid_int"] = pw.pw_uid
-          e_map["_resolved_gid_int"] = pw.pw_gid
-        except Exception:
-          errs_list.append(f"VAL_OWNER_UNKNOWN:{where}: user not found: {owner_name!r}")
-    except Exception as x:
-      errs_list.append(f"VAL_EXCEPTION:{where}: {x}")
-  return errs_list
-
-def sanity_check(plan_map: dict[str,Any])-> list[str]:
-  "Given: plan map. Does: policy checks (allowlist, denials). Returns: list of error strings."
-  errs_list: list[str] = []
-  for i ,e_map in enumerate(plan_map.get("entries_list",[]) ,1):
-    dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
-    where = f"entry {i}"
-    if not any(dst_fpath_str.startswith(pref + "/") or dst_fpath_str==pref for pref in _ALLOWLIST_PREFIXES_LIST):
-      errs_list.append(f"POL_PATH_DENY:{where}: destination outside allowlist: {dst_fpath_str}")
-  return errs_list
-
-# ---------- APPLY (root) ----------
-
-def _utc_str()-> str:
-  "Given: n/a. Does: current UTC compact. Returns: string."
-  import datetime as _dt
-  return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def _ensure_parent_dirs(dst_fpath: Path)-> None:
-  "Given: destination file path. Does: create parents. Returns: None."
-  dst_fpath.parent.mkdir(parents=True ,exist_ok=True)
-
-def _displace_in_place(dst_fpath: Path)-> None:
-  "Given: destination file path. Does: rename existing file/symlink to add UTC suffix. Returns: None."
-  try:
-    if dst_fpath.exists() or dst_fpath.is_symlink():
-      suffix = "_" + _utc_str()
-      dst_fpath.rename(dst_fpath.with_name(dst_fpath.name + suffix))
-  except FileNotFoundError:
-    pass
-
-def _apply_copy(dst_fpath: Path ,content_bytes: bytes ,mode_int: int ,uid_int: int ,gid_int: int)-> None:
-  "Given: target, bytes, mode, uid, gid. Does: write temp, fsync, chmod/chown, atomic replace. Returns: None."
-  _ensure_parent_dirs(dst_fpath)
-  _displace_in_place(dst_fpath)
-  tmp_fpath = dst_fpath.with_name("." + dst_fpath.name + ".stage_tmp")
-  with open(tmp_fpath ,"wb") as fh:
-    fh.write(content_bytes)
-    fh.flush()
-    os.fsync(fh.fileno())
-  try:
-    os.chmod(tmp_fpath ,mode_int & 0o777)
-  except Exception:
-    pass
-  try:
-    os.chown(tmp_fpath ,uid_int ,gid_int)
-  except Exception:
-    pass
-  os.replace(tmp_fpath ,dst_fpath)  # atomic within same dir/device
-
-def _apply_delete(dst_fpath: Path)-> None:
-  "Given: target file path. Does: unlink file/symlink if present. Returns: None."
-  try:
-    if dst_fpath.is_symlink() or dst_fpath.is_file():
-      dst_fpath.unlink()
-  except FileNotFoundError:
-    pass
-
-def apply_plan(plan_map: dict[str,Any] ,dry_run_bool: bool=False)-> int:
-  "Given: plan map and dry flag. Does: execute ops sequentially. Returns: exit code."
-  for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
-    op = e_map.get("op")
-    dst_fpath = Path(_dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")))
-    if op == "displace":
-      print(f"+ displace {dst_fpath}")
-      if not dry_run_bool:
-        _displace_in_place(dst_fpath)
-    elif op == "delete":
-      print(f"+ delete   {dst_fpath}")
-      if not dry_run_bool:
-        _apply_delete(dst_fpath)
-    elif op == "copy":
-      mode_int = e_map.get("mode_int") or 0o644
-      uid_int  = e_map.get("_resolved_uid_int" ,0)
-      gid_int  = e_map.get("_resolved_gid_int" ,0)
-      content_bytes = e_map.get("content_bytes") or b""
-      print(f"+ copy     {dst_fpath}  mode {mode_int:04o} uid {uid_int} gid {gid_int} bytes {len(content_bytes)}")
-      if not dry_run_bool:
-        _apply_copy(dst_fpath ,content_bytes ,mode_int ,uid_int ,gid_int)
-    else:
-      print(f"! unknown op {op} (skipping)")
-      return 2
-  return 0
-
-# ---------- orchestration ----------
-
-def _build_plan_unpriv(stage_root_dpath: Path)-> dict[str,Any]:
-  "Given: stage root. Does: execute configs, accumulate entries, add sha256. Returns: plan map."
-  StageMod = _load_stage_module(stage_root_dpath)
-  Stage = StageMod.Stage
-  Stage._reset()
-  Stage.set_meta(
-    planner_user_name=getpass.getuser()
-    ,planner_uid_int=os.getuid()
-    ,planner_gid_int=os.getgid()
-    ,host_name=socket.gethostname()
-    ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
-  )
-  for rel_fpath in _config_rel_fpaths(stage_root_dpath):
-    Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
-    runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
-    Stage._end()
-  for e_map in Stage.plan_entries():
-    if e_map.get("op") == "copy" and isinstance(e_map.get("content_bytes") ,(bytes,bytearray)):
-      e_map["sha256_bytes"] = _sha256_bytes(e_map["content_bytes"])
-  return Stage.plan_object()
-
-def _sudo_apply_self(plan_fpath: Path ,dry_run_bool: bool)-> int:
-  "Given: plan file path and dry flag. Does: sudo re-exec current script with --apply. Returns: exit code."
-  cmd_list = ["sudo",sys.executable,os.path.abspath(__file__),"--apply"
-             ,"--plan",str(plan_fpath)]
-  if dry_run_bool:
-    cmd_list.append("--dry-run")
-  return subprocess.call(cmd_list)
-
-def main(argv: list[str]|None=None)-> int:
-  "Given: CLI. Does: plan, WF (user) then VALID+SANITY+APPLY (root). Returns: exit code."
-  ap = argparse.ArgumentParser(prog="stage_cp.py"
-   ,description="Plan staged config application and apply with sudo.")
-  ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
-  ap.add_argument("--dry-run",action="store_true",help="validate and show actions, do not change files")
-  ap.add_argument("--apply",action="store_true",help=argparse.SUPPRESS)     # internal (root path)
-  ap.add_argument("--plan",default=None,help=argparse.SUPPRESS)             # internal (root path)
-  args = ap.parse_args(argv)
-
-  # Root path (apply)
-  if args.apply:
-    if os.geteuid() != 0:
-      print("error: --apply requires root" ,file=sys.stderr)
-      return 2
-    if not args.plan:
-      print("error: --plan path required for --apply" ,file=sys.stderr)
-      return 2
-    with open(args.plan ,"rb") as fh:
-      plan_map = cbor2.load(fh)
-    val_errs = valid_check(plan_map)
-    pol_errs = sanity_check(plan_map)
-    if val_errs or pol_errs:
-      print("error(s) during validation/sanity:" ,file=sys.stderr)
-      for e in val_errs: print(f"  - {e}" ,file=sys.stderr)
-      for e in pol_errs: print(f"  - {e}" ,file=sys.stderr)
-      return 2
-    rc = apply_plan(plan_map ,dry_run_bool=args.dry_run)
-    return rc
-
-  # User path (plan + summarize + escalate)
-  stage_root_dpath = Path(args.stage)
-  plan_map = _build_plan_unpriv(stage_root_dpath)
-
-  entries_list = plan_map.get("entries_list" ,[])
-  print(f"Built plan with {len(entries_list)} entr{'y' if len(entries_list)==1 else 'ies'}")
-
-  total_bytes_int = sum(len(e_map.get("content_bytes") or b"")
-                        for e_map in entries_list if e_map.get("op")=="copy")
-  print(f"Total bytes to write: {total_bytes_int}")
-  if args.dry_run:
-    print("\n--dry-run: would perform the following:")
-
-  for i ,e_map in enumerate(entries_list ,1):
-    op = e_map.get("op")
-    dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath") ,e_map.get("dst_fname"))
-    if op=="copy":
-      mode_int = e_map.get("mode_int") or 0o644
-      owner_name = e_map.get("owner_name") or "?"
-      size = len(e_map.get("content_bytes") or b"")
-      print(f"{i:02d}. copy     -> {dst_fpath_str}  mode {mode_int:04o} owner {owner_name} bytes {size}")
-    elif op=="displace":
-      print(f"{i:02d}. displace -> {dst_fpath_str}")
-    elif op=="delete":
-      print(f"{i:02d}. delete   -> {dst_fpath_str}")
-    else:
-      print(f"{i:02d}. ?op?     -> {dst_fpath_str}")
-
-  with tempfile.NamedTemporaryFile(prefix="plan_" ,suffix=".cbor" ,delete=False) as tf:
-    cbor2.dump(plan_map ,tf)
-    plan_fpath = Path(tf.name)
-  try:
-    if args.dry_run:
-      return _sudo_apply_self(plan_fpath ,dry_run_bool=True)
-    ans = input("\nProceed with apply under sudo? [y/N] ").strip().lower()
-    if ans not in ("y","yes"):
-      print("Aborted.")
-      return 0
-    return _sudo_apply_self(plan_fpath ,dry_run_bool=False)
-  finally:
-    try: os.unlink(plan_fpath)
-    except Exception: pass
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft b/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft
deleted file mode 100644 (file)
index eaee5be..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-table inet NO-IPV6 {
-  chain input {
-    type filter hook input priority raw; policy accept;
-    meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop
-  }
-
-  chain output {
-    type filter hook output priority raw; policy accept;
-    meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop
-  }
-
-  chain forward {
-    type filter hook forward priority raw; policy accept;
-    meta nfproto ipv6 counter comment "drop all IPv6 forward" drop
-  }
-}
diff --git a/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft b/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft
deleted file mode 100644 (file)
index 6c31446..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-table inet SUBU-DNS-REDIRECT {
-  chain output {
-    type nat hook output priority -100; policy accept;
-
-    # Redirect DNS for the subu UIDs to local Unbound listeners
-    meta skuid 2017 udp dport 53 redirect to :5301
-    meta skuid 2018 udp dport 53 redirect to :5302
-    meta skuid 2017 tcp dport 53 redirect to :5301
-    meta skuid 2018 tcp dport 53 redirect to :5302
-  }
-}
-
-table inet SUBU-PORT-EGRESS {
-  chain output {
-    type filter hook output priority 0; policy accept;
-
-    # Always allow loopback on egress
-    oifname "lo" accept
-
-    # No IPv6 for subu (until you reintroduce v6)
-    meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop
-
-    ##### x6 (UID 2018)
-    # Block some exfil channels regardless of iface
-    meta skuid 2018 tcp dport {25,465,587}  counter comment "block SMTP/Submission" drop
-    meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop
-    meta skuid 2018 tcp dport 853            counter comment "block DoT (TCP/853)" drop
-
-    # (Optional) allow ICMP echo out via x6
-    meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept
-
-    # Enforce interface binding
-    meta skuid 2018 oifname "x6" accept
-    meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop
-
-    ##### US (UID 2017)
-    meta skuid 2017 tcp dport {25,465,587}  counter drop comment "block SMTP/Submission"
-    meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN"
-    meta skuid 2017 tcp dport 853            counter drop comment "block DoT (TCP/853)"
-
-    # (Optional) ICMP via US
-    meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept
-
-    meta skuid 2017 oifname "US" accept
-    meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop
-  }
-}
diff --git a/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service b/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service
deleted file mode 100644 (file)
index ba2919b..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-[Unit]
-Description=Unbound DNS instance for %i (per-subu tunnel egress)
-After=network-online.target wg-quick@%i.service
-Requires=wg-quick@%i.service
-Wants=network-online.target
-
-[Service]
-Type=simple
-ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf
-User=unbound
-Group=unbound
-Restart=on-failure
-RestartSec=2s
-AmbientCapabilities=CAP_NET_BIND_SERVICE
-CapabilityBoundingSet=CAP_NET_BIND_SERVICE
-NoNewPrivileges=true
-
-[Install]
-WantedBy=multi-user.target
diff --git a/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf b/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf
deleted file mode 100644 (file)
index 1995438..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-server:
-  username: "unbound"
-  chroot: ""
-  directory: "/etc/unbound"
-  do-daemonize: no
-  interface: 127.0.0.1@5301
-  hide-identity: yes
-  hide-version: yes
-  harden-glue: yes
-  harden-dnssec-stripped: yes
-  qname-minimisation: yes
-  prefetch: yes
-  outgoing-interface: 10.0.0.1
-
-forward-zone:
-  name: "."
-  forward-addr: 1.1.1.1
-  forward-addr: 1.0.0.1
diff --git a/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf b/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf
deleted file mode 100644 (file)
index ed49241..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-server:
-  username: "unbound"
-  chroot: ""
-  directory: "/etc/unbound"
-  do-daemonize: no
-  interface: 127.0.0.1@5302
-  hide-identity: yes
-  hide-version: yes
-  harden-glue: yes
-  harden-dnssec-stripped: yes
-  qname-minimisation: yes
-  prefetch: yes
-  outgoing-interface: 10.8.0.2
-
-forward-zone:
-  name: "."
-  forward-addr: 1.1.1.1
-  forward-addr: 1.0.0.1
diff --git a/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh b/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh
deleted file mode 100755 (executable)
index d4db58e..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/usr/bin/env bash
-set -euo pipefail
-echo "== DNS status =="
-systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true
-echo
-echo "== nftables =="
-nft list table inet NAT-DNS-REDIRECT || true
-echo
-echo "== Unbound logs (last 50 lines each) =="
-journalctl -u unbound@US -n 50 --no-pager || true
-echo
-journalctl -u unbound@x6 -n 50 --no-pager || true
diff --git a/developer/source/DNS/stage_show_plan.py b/developer/source/DNS/stage_show_plan.py
deleted file mode 100644 (file)
index 075e65b..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-#!/usr/bin/env -S python3 -B
-"""
-stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan.
-
-Given: a stage root directory.
-Does:  loads Stage.py, executes each config, builds a native plan map, summarizes it.
-Returns: exit code 0 on success, non-zero on error.
-"""
-from __future__ import annotations
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib
-
-# ---------- helpers ----------
-
-def _load_stage_module(stage_root_dpath: Path):
-  "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
-  mod_fpath = stage_root_dpath/"Stage.py"
-  if not mod_fpath.exists():
-    raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
-  spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
-  mod = importlib.util.module_from_spec(spec)
-  sys.modules["Stage"] = mod
-  assert spec and spec.loader
-  spec.loader.exec_module(mod)  # type: ignore
-  return mod
-
-def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
-  "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
-  rel_fpath_list: list[Path] = []
-  for p in stage_root_dpath.rglob("*.py"):
-    if p.name == "Stage.py": continue
-    if p.is_file():
-      rel_fpath_list.append(p.relative_to(stage_root_dpath))
-  return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
-
-def _sha256_hex(b: bytes)-> str:
-  "Given: bytes. Does: sha256. Returns: hex string."
-  return hashlib.sha256(b).hexdigest()
-
-# ---------- main ----------
-
-def main(argv: list[str]|None=None)-> int:
-  "Given: CLI. Does: show plan. Returns: exit code."
-  ap = argparse.ArgumentParser(prog="stage_show_plan.py"
-   ,description="Run staged config scripts and print the resulting plan.")
-  ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
-  args = ap.parse_args(argv)
-
-  stage_root_dpath = Path(args.stage)
-  StageMod = _load_stage_module(stage_root_dpath)
-  Stage = StageMod.Stage
-  Stage._reset()
-  Stage.set_meta(
-    planner_user_name=getpass.getuser()
-    ,planner_uid_int=os.getuid()
-    ,planner_gid_int=os.getgid()
-    ,host_name=socket.gethostname()
-    ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
-  )
-
-  for rel_fpath in _config_rel_fpaths(stage_root_dpath):
-    Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
-    runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
-    Stage._end()
-
-  plan_map = Stage.plan_object()
-  entries_list = plan_map["entries_list"]
-  print(f"Plan version: {plan_map['version_int']}")
-  print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')}  "
-        f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}")
-  print(f"Created: {plan_map['meta_map'].get('created_utc_str')}")
-  print(f"Entries: {len(entries_list)}\n")
-
-  for i ,e_map in enumerate(entries_list ,1):
-    op = e_map.get("op")
-    dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}"
-    if op == "copy":
-      content = e_map.get("content_bytes") or b""
-      sz = len(content)
-      mode = e_map.get("mode_octal_str") or "????"
-      owner = e_map.get("owner_name") or "?"
-      h = _sha256_hex(content)
-      print(f"{i:02d}. copy     -> {dst_fpath_str}  mode {mode} owner {owner}  bytes {sz} sha256 {h[:16]}…")
-    elif op == "displace":
-      print(f"{i:02d}. displace -> {dst_fpath_str}")
-    elif op == "delete":
-      print(f"{i:02d}. delete   -> {dst_fpath_str}")
-    else:
-      print(f"{i:02d}. ?op?     -> {dst_fpath_str}  ({op})")
-  return 0
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/stage_test_0/a.conf b/developer/source/DNS/stage_test_0/a.conf
deleted file mode 100644 (file)
index a3153de..0000000
+++ /dev/null
@@ -1 +0,0 @@
-Thomas-developer 0x444 . stage_test_0_out
\ No newline at end of file
diff --git a/developer/source/DNS/stage_test_0/b.conf b/developer/source/DNS/stage_test_0/b.conf
deleted file mode 100644 (file)
index 9e8fc11..0000000
+++ /dev/null
@@ -1 +0,0 @@
-Thomas-developer 0640 . stage_test_0_out
\ No newline at end of file
diff --git a/developer/source/DNS/stage_test_0/c.conf b/developer/source/DNS/stage_test_0/c.conf
deleted file mode 100644 (file)
index 7d4330b..0000000
+++ /dev/null
@@ -1 +0,0 @@
-Thomas-developer 0444 . stage_test_0_out
\ No newline at end of file
diff --git a/developer/source/DNS/stage_test_0/example.py b/developer/source/DNS/stage_test_0/example.py
deleted file mode 100644 (file)
index 6f2c257..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env -S python3 -B
-import Stage
-
-# You can compute these with arbitrary Python if you like
-svc = "unbound"
-zone = "US"
-fname = f"unbound-{zone}.conf"
-
-Stage.init(
-  write_file_name="."                 # '.' → use basename of this file -> 'example_dns.py'
-, write_file_directory_path="/etc/unbound"
-, write_file_owner="root"
-, write_file_permissions=0o644        # or "0644"
-, read_file_contents="""\
-# generated config (example)
-server:
-  verbosity: 1
-  interface: 127.0.0.1
-"""
-)
-
-# declare the desired operations (no effect in 'noop'/'dry' without a copier)
-Stage.displace()
-Stage.copy()
diff --git a/developer/source/DNS/stage_test_0/name_space/c.conf b/developer/source/DNS/stage_test_0/name_space/c.conf
deleted file mode 100644 (file)
index 7d4330b..0000000
+++ /dev/null
@@ -1 +0,0 @@
-Thomas-developer 0444 . stage_test_0_out
\ No newline at end of file
diff --git a/developer/source/DNS/stage_test_0/stage_ls.py b/developer/source/DNS/stage_test_0/stage_ls.py
deleted file mode 100644 (file)
index 1e27c26..0000000
+++ /dev/null
@@ -1,169 +0,0 @@
-#!/usr/bin/env -S python3 -B
-"""
-stage_ls.py — execute staged Python programs with Stage in 'noop' mode and list metadata.
-
-For each *.py under --stage (recursively, excluding Stage.py), this tool:
-  1) loads Stage.py from the stage root,
-  2) switches mode to 'noop' (no side effects, no printing),
-  3) executes the program via runpy.run_path(...) with the proper __file__,
-  4) collects the resolved write_file_* metadata and declared ops,
-  5) prints either list or aligned table,
-  6) reports any collected errors.
-
-This lets admins compute metadata with arbitrary Python while guaranteeing no writes.
-"""
-
-from __future__ import annotations
-
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from dataclasses import dataclass
-from pathlib import Path
-import argparse
-import importlib.util ,runpy
-import traceback
-
-# --- utility dataclass (for printing) ---
-
-@dataclass
-class Row:
-  read_rel: Path
-  owner: str|None
-  perm: str|None
-  write_name: str|None
-  target_dir: Path|None
-  ops: list[str]
-  errors: list[str]
-
-# --- helpers ---
-
-def _load_stage_module(stage_root: Path):
-  """Load Stage.py from stage_root into sys.modules['Stage'] (overwriting if present). Returns the Stage module."""
-  stage_py = stage_root/"Stage.py"
-  if not stage_py.exists():
-    raise FileNotFoundError(f"Stage.py not found at {stage_py} — place Stage.py in the stage root.")
-  spec = importlib.util.spec_from_file_location("Stage" ,str(stage_py))
-  if spec is None or spec.loader is None:
-    raise RuntimeError(f"cannot load Stage module from {stage_py}")
-  mod = importlib.util.module_from_spec(spec)
-  sys.modules["Stage"] = mod
-  spec.loader.exec_module(mod)  # type: ignore[union-attr]
-  return mod
-
-def _stage_program_paths(stage_root: Path)-> list[Path]:
-  rels: list[Path] = []
-  for p in stage_root.rglob("*.py"):
-    if p.name == "Stage.py":
-      continue
-    try:
-      if p.is_file():
-        rels.append(p.relative_to(stage_root))
-    except Exception:
-      continue
-  return sorted(rels ,key=lambda x: x.as_posix())
-
-def print_list(rows: list[Row])-> None:
-  for r in rows:
-    owner = r.owner or "?"
-    perm  = r.perm or "????"
-    name  = r.write_name or "?"
-    tdir  = str(r.target_dir) if r.target_dir is not None else "?"
-    print(f"{r.read_rel.as_posix()}: {owner} {perm} {name} {tdir}")
-
-def print_table(rows: list[Row])-> None:
-  if not rows:
-    return
-  a = [r.read_rel.as_posix() for r in rows]
-  b = [(r.owner or "?") for r in rows]
-  c = [(r.perm or "????") for r in rows]
-  d = [(r.write_name or "?") for r in rows]
-  e = [str(r.target_dir) if r.target_dir is not None else "?" for r in rows]
-  wa = max(len(s) for s in a)
-  wb = max(len(s) for s in b)
-  wc = max(len(s) for s in c)
-  wd = max(len(s) for s in d)
-  for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
-    print(f"{sa:<{wa}}  {sb:<{wb}}  {sc:<{wc}}  {sd:<{wd}}  {se}")
-
-# --- core ---
-
-def ls_stage(stage_root: Path ,fmt: str="list")-> int:
-  Stage = _load_stage_module(stage_root)
-  Stage.Stage.set_mode("noop")  # hard safety for this tool
-
-  rows: list[Row] = []
-  errs: list[str] = []
-
-  for rel in _stage_program_paths(stage_root):
-    abs_path = stage_root/rel
-    try:
-      # isolate per-run state
-      Stage.Stage._current = None
-      Stage.Stage._all_records.clear()
-      Stage.Stage._begin(read_rel=rel ,stage_root=stage_root)
-
-      # execute the staged program under its real path
-      runpy.run_path(str(abs_path) ,run_name="__main__")
-
-      rec = Stage.Stage._end()
-      if rec is None:
-        errs.append(f"{rel}: program executed but Stage.init(...) was never called")
-        continue
-
-      rows.append(
-        Row(
-          read_rel=rel
-        , owner=rec.owner
-        , perm=rec.perm_octal_str
-        , write_name=rec.write_name
-        , target_dir=rec.target_dir
-        , ops=list(rec.ops)
-        , errors=list(rec.errors)
-        )
-      )
-
-    except SystemExit as e:
-      errs.append(f"{rel}: program called sys.exit({e.code}) during listing")
-    except Exception:
-      tb = traceback.format_exc(limit=2)
-      errs.append(f"{rel}: exception during execution:\n{tb}")
-
-  # print data
-  if fmt == "table":
-    print_table(rows)
-  else:
-    print_list(rows)
-
-  # print per-row Stage errors
-  row_errs = [f"{r.read_rel}: {msg}" for r in rows for msg in r.errors]
-  all_errs = row_errs + errs
-  if all_errs:
-    print("\nerror(s):" ,file=sys.stderr)
-    for e in all_errs:
-      print(f"  - {e}" ,file=sys.stderr)
-    return 1
-  return 0
-
-# --- CLI ---
-
-def main(argv: list[str] | None=None)-> int:
-  import argparse
-  ap = argparse.ArgumentParser(
-    prog="stage_ls.py"
-  , description="Execute staged Python configs with Stage in 'noop' mode and list resolved metadata."
-  )
-  ap.add_argument("--stage" ,default="stage" ,help="stage directory (default: ./stage)")
-  ap.add_argument("--format" ,choices=["list","table"] ,default="list" ,help="output format")
-  args = ap.parse_args(argv)
-
-  stage_root = Path(args.stage)
-  if not stage_root.exists() or not stage_root.is_dir():
-    print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
-    return 2
-
-  return ls_stage(stage_root ,fmt=args.format)
-
-if __name__ == "__main__":
-  sys.exit(main())
diff --git a/developer/source/DNS/stage_test_0/unbound_conf.py b/developer/source/DNS/stage_test_0/unbound_conf.py
new file mode 100644 (file)
index 0000000..4f57794
--- /dev/null
@@ -0,0 +1,15 @@
+# example unbound.conf
+def configure(prov, planner, WriteFileMeta):
+  # use current user for owner, and use this script’s py-less name for the filename
+
+  # owner defaults to root (this is a configuration file installer)
+  # owner "." means owner of the process running Stagehane
+  # owner "." is good for testing
+
+  # fname "." means write file has the same name as read file (without .py if it has .py)
+  # fname "." is the default, so it is redundant here. "." still works in args, even when wfm changes the fname.
+
+  wfm = WriteFileMeta(dpath="stage_test_0_out", fname=".", owner=".")
+  planner.displace(wfm)
+  planner.copy(wfm, content="server:\n  do-ip6: no\n")
+