"""
Planner.py — plan builder for staged configuration (UNPRIVILEGED).
-The Planner accumulates Command objects into a Journal.
-
-Journal building is orchestrated by the outer runner (e.g., stage_show_plan, stage_cp)
-which constructs a Planner per config file and invokes Planner command methods.
-
-Defaults and provenance come from a PlannerContext instance. You can replace the
-context at any time via set_context(ctx).
-
-The Journal can be exported as CBOR via Journal.to_CBOR_bytes(), and reconstructed
-on the privileged side via Journal.from_CBOR_bytes().
-
-On-wire field names are snake_case and use explicit suffixes (_str,_bytes,_int, etc.)
-to avoid ambiguity.
+Given: runner-side provenance (PlanProvenance) and optional defaults (WriteFileMeta).
+Does: expose Planner whose command methods (copy/displace/delete) build Command entries,
+ resolving arguments with precedence: kwarg > per-call WriteFileMeta > planner default
+ (and for filename, fallback to provenance-derived basename). On any argument error,
+ the Command is returned with errors and NOT appended to the Journal.
+Returns: Journal (model only; dict in/out) via planner.journal().
"""
from __future__ import annotations
# no bytecode anywhere (works under sudo/root shells too)
import sys ,os
sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE","1")
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-from dataclasses import dataclass ,field
from pathlib import Path
-from typing import Any
+import getpass
+
# ===== Utilities =====
-def _norm_perm(value: int|str)-> tuple[int,str]|None:
- "Given int or 4-char octal string. Does validate/normalize. Returns (int,'%04o') or None."
+def norm_perm(value: int|str)-> tuple[int,str]|None:
+ "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int,'%04o') or None."
if isinstance(value ,int):
if 0 <= value <= 0o7777:
return value ,f"{value:04o}"
return None
if isinstance(value ,str):
- s = value.strip()
- if len(s)==4 and all(ch in "01234567" for ch in s):
+ s = value.strip().lower()
+ if s.startswith("0o"):
+ try:
+ v = int(s ,8)
+ return v ,f"{v:04o}"
+ except Exception:
+ return None
+ if len(s) in (3 ,4) and all(ch in "01234567" for ch in s):
try:
v = int(s ,8)
- return v ,s
+ return v ,f"{v:04o}"
except Exception:
return None
return None
-def _is_abs_dpath(dpath_str: str)-> bool:
+def is_abs_dpath(dpath_str: str|None)-> bool:
"Given path string. Does quick abs dir check. Returns bool."
- return bool(dpath_str) and dpath_str.startswith("/")
+ return isinstance(dpath_str ,str) and dpath_str.startswith("/") and "\x00" not in dpath_str
+
+def norm_abs_dpath_str(value: str|Path|None)-> str|None:
+ "Given str/Path/None. Does normalize absolute dir path string. Returns str or None."
+ if value is None: return None
+ s = value.as_posix() if isinstance(value ,Path) else str(value)
+ return s if is_abs_dpath(s) else None
+
+def norm_fname_or_none(value: str|None)-> str|None:
+ "Given candidate filename or None. Does validate bare filename. Returns str or None."
+ if value is None: return None
+ s = str(value)
+ if not s: return None
+ if "/" in s or s in ("." ,"..") or "\x00" in s: return None
+ return s
+
+def norm_nonempty_owner(value: str|None)-> str|None:
+ "Given owner string or None. Does minimally validate (non-empty). Returns str or None."
+ if value is None: return None
+ s = str(value).strip()
+ return s if s else None
+
+def parse_mode(value: int|str|None)-> tuple[int|None ,str|None]:
+ "Given int/str/None. Does normalize via norm_perm. Returns (int,'%04o') or (None,None)."
+ if value is None: return None ,None
+ r = norm_perm(value)
+ return r if r is not None else (None ,None)
+
+def norm_content_bytes(value: bytes|str|None)-> bytes|None:
+ "Given bytes/str/None. Does normalize to UTF-8 bytes or None. Returns bytes|None."
+ if value is None: return None
+ if isinstance(value ,bytes): return value
+ return value.encode("utf-8")
+
+def norm_dpath_str(value: str|Path|None)-> str|None:
+ "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None."
+ if value is None: return None
+ s = value.as_posix() if isinstance(value ,Path) else str(value)
+ if not s or "\x00" in s: return None
+ return s
+
+
+# ===== Wire-ready model types (no CBOR here) =====
-def _join_write_file(dpath_str: str ,fname_str: str)-> str:
- "Given dir path string and filename string. Does join. Returns POSIX path string or ''."
- if not _is_abs_dpath(dpath_str): return ""
- if not fname_str or "/" in fname_str: return ""
- return (Path(dpath_str)/fname_str).as_posix()
-
-# ===== Core data types =====
-
-@dataclass(slots=True)
class Command:
"""
Command — a single planned operation.
- Given a command name and an argument map (native values).
- Does hold the op name, owns a distinct args map, accumulates errors for this op.
- Returns serializable mapping via to_map().
+ Given name_str ('copy'|'displace'|'delete'), optional arg_dict, optional errors_list.
+ Does hold op name, own a fresh arg_dict, collect per-entry errors.
+ Returns dictionary via as_dictionary().
"""
- name_str: str
- args_map: dict[str,Any] = field(default_factory=dict)
- errors_list: list[str] = field(default_factory=list)
+ __slots__ = ("name_str" ,"arg_dict" ,"errors_list")
+
+ def __init__(self ,name_str: str ,arg_dict: dict|None=None ,errors_list: list[str]|None=None)-> None:
+ self.name_str = name_str
+ self.arg_dict = dict(arg_dict) if arg_dict is not None else {}
+ self.errors_list = list(errors_list) if errors_list is not None else []
def add_error(self ,msg_str: str)-> None:
- "Given message. Does append to errors_list. Returns None."
self.errors_list.append(msg_str)
- def to_map(self)-> dict[str,Any]:
- "Given self. Does convert to a plain dict. Returns {'op','args_map','errors_list'}."
+ def as_dictionary(self)-> dict:
return {
"op": self.name_str
- ,"args_map": dict(self.args_map)
+ ,"arg_dict": dict(self.arg_dict)
,"errors_list": list(self.errors_list)
}
-@dataclass(slots=True)
-class PlannerContext:
- """
- PlannerContext — per-config provenance and defaults.
+ def print(self, *, index: int|None=None, file=None)-> None:
+ """
+ Given: optional index for numbering and optional file-like (defaults to stdout).
+ Does: print a compact, human-readable one-line summary of this command; prints any errors indented below.
+ Returns: None.
+ """
+ if file is None:
+ import sys as _sys
+ file = _sys.stdout
- Given: stage_root_dpath, read_file_rel_fpath, default write_file location/name,
- default owner name, default permission (int or '0644'), optional default content.
- Does: provide ambient defaults and provenance to Planner methods.
- Returns: n/a (data holder).
- """
- stage_root_dpath: Path
- read_file_rel_fpath: Path
- default_write_file_dpath_str: str
- default_write_file_fname_str: str
- default_owner_name_str: str
- default_mode_int: int|None = None
- default_mode_octal_str: str|None = None
- default_content_bytes: bytes|None = None
-
- @staticmethod
- def from_values(stage_root_dpath: Path
- ,read_file_rel_fpath: Path
- ,write_file_dpath_str: str
- ,write_file_fname_str: str
- ,owner_name_str: str
- ,perm: int|str
- ,content: bytes|str|None
- )-> PlannerContext:
- "Given raw values. Does normalize perm and content. Returns PlannerContext."
- if isinstance(content ,str):
- content_b = content.encode("utf-8")
- else:
- content_b = content
- perm_norm = _norm_perm(perm)
- if perm_norm is None:
- m_int ,m_oct = None ,None
+ op = self.name_str
+ ad = self.arg_dict or {}
+
+ # Compose destination path for display
+ d = ad.get("write_file_dpath_str") or ""
+ f = ad.get("write_file_fname") or ""
+ try:
+ from pathlib import Path as _Path
+ dst = (_Path(d)/f).as_posix() if d and f and "/" not in f else "?"
+ except Exception:
+ dst = "?"
+
+ # Numbering prefix
+ prefix = f"{index:02d}. " if index is not None else ""
+
+ if op == "copy":
+ mode = ad.get("mode_int")
+ owner = ad.get("owner_name")
+ size = len(ad.get("content_bytes") or b"")
+ line = f"{prefix}copy -> {dst} mode {mode:04o} owner {owner} bytes {size}"
+ elif op == "displace":
+ line = f"{prefix}displace -> {dst}"
+ elif op == "delete":
+ line = f"{prefix}delete -> {dst}"
else:
- m_int ,m_oct = perm_norm
- return PlannerContext(
- stage_root_dpath=stage_root_dpath
- ,read_file_rel_fpath=read_file_rel_fpath
- ,default_write_file_dpath_str=write_file_dpath_str
- ,default_write_file_fname_str=write_file_fname_str
- ,default_owner_name_str=owner_name_str
- ,default_mode_int=m_int
- ,default_mode_octal_str=m_oct
- ,default_content_bytes=content_b
- )
+ line = f"{prefix}?op? -> {dst}"
+
+ print(line, file=file)
+
+ # Print any per-entry errors underneath
+ for err in self.errors_list:
+ print(f" ! {err}", file=file)
+
-@dataclass(slots=True)
class Journal:
"""
- Journal — ordered list of Commands plus provenance metadata.
+ Journal — ordered list of Command plus provenance metadata (model only; no CBOR).
- Given optional meta map.
- Does append commands, expose entries, produce plain or CBOR encodings, and rebuild from CBOR.
- Returns plain dict via to_map(), bytes via to_CBOR_bytes(), Journal via from_CBOR_bytes().
+ Given optional plan_dict in wire shape (for reconstruction).
+ Does manage meta, append commands, expose entries, and pack to dict.
+ Returns dict via as_dictionary().
"""
- meta_map: dict[str,Any] = field(default_factory=dict)
- commands_list: list[Command] = field(default_factory=list)
+ __slots__ = ("meta_dict" ,"command_list")
+
+ def __init__(self ,plan_dict: dict|None=None)-> None:
+ self.meta_dict = {}
+ self.command_list = []
+ if plan_dict is not None:
+ self._init_from_dict(plan_dict)
+
+ def _init_from_dict(self ,plan_dict: dict)-> None:
+ if not isinstance(plan_dict ,dict):
+ raise ValueError("plan_dict must be a dict")
+ meta = dict(plan_dict.get("meta_dict") or {})
+ entries = plan_dict.get("entries_list") or []
+ self.meta_dict.update(meta)
+ for e in entries:
+ if not isinstance(e ,dict):
+ continue
+ op = e.get("op") or "?"
+ args = e.get("arg_dict") or {}
+ errs = e.get("errors_list") or []
+ self.command_list.append(Command(name_str=op ,arg_dict=dict(args) ,errors_list=list(errs)))
def set_meta(self ,**kv)-> None:
- "Given keyword meta. Does merge into meta_map. Returns None."
- self.meta_map.update(kv)
+ self.meta_dict.update(kv)
def append(self ,cmd: Command)-> None:
- "Given Command. Does append to commands_list. Returns None."
- self.commands_list.append(cmd)
+ self.command_list.append(cmd)
- def entries_list(self)-> list[dict[str,Any]]:
- "Given n/a. Does return list of entry dicts (copy). Returns list[dict]."
- return [c.to_map() for c in self.commands_list]
+ def entries_list(self)-> list[dict]:
+ return [c.as_dictionary() for c in self.command_list]
- def to_map(self)-> dict[str,Any]:
- "Given n/a. Does package a plan map (ready for CBOR). Returns dict."
+ def as_dictionary(self)-> dict:
return {
"version_int": 1
- ,"meta_map": dict(self.meta_map)
+ ,"meta_dict": dict(self.meta_dict)
,"entries_list": self.entries_list()
}
- def to_CBOR_bytes(self ,canonical_bool: bool=True)-> bytes:
- "Given n/a. Does CBOR-encode to bytes (requires cbor2). Returns bytes."
- try:
- import cbor2
- except Exception as e:
- raise RuntimeError(f"package cbor2 required for to_CBOR_bytes: {e}")
- return cbor2.dumps(self.to_map() ,canonical=canonical_bool)
-
- @staticmethod
- def from_CBOR_bytes(data_bytes: bytes)-> Journal:
- "Given CBOR bytes. Does decode and rebuild a Journal (Commands + meta). Returns Journal."
+ def print(self, *, index_start: int = 1, file=None) -> None:
+ """
+ Given: optional starting index and optional file-like (defaults to stdout).
+ Does: print each Command on a single line via Command.print(), numbered.
+ Returns: None.
+ """
+ if file is None:
+ import sys as _sys
+ file = _sys.stdout
+
+ if not self.command_list:
+ print("(plan is empty)", file=file)
+ return
+
+ for i, cmd in enumerate(self.command_list, start=index_start):
+ cmd.print(index=i, file=file)
+
+# ===== Runner-provided provenance =====
+
+# Planner.py
+class PlanProvenance:
+ """
+ Runner-provided, read-only provenance for a single config script.
+ """
+ __slots__ = ("stage_root_dpath","config_abs_fpath","config_rel_fpath",
+ "read_dir_dpath","read_fname")
+
+ def __init__(self, *, stage_root: Path, config_path: Path):
+ self.stage_root_dpath = stage_root.resolve()
+ self.config_abs_fpath = config_path.resolve()
try:
- import cbor2
- except Exception as e:
- raise RuntimeError(f"package cbor2 required for from_CBOR_bytes: {e}")
- obj = cbor2.loads(data_bytes)
- if not isinstance(obj ,dict): raise ValueError("CBOR root must be a map")
- meta = dict(obj.get("meta_map") or {})
- entries = obj.get("entries_list") or []
- j = Journal(meta_map=meta)
- for e in entries:
- if not isinstance(e ,dict): continue
- op = e.get("op") or "?"
- args = e.get("args_map") or {}
- errs = e.get("errors_list") or []
- j.append(Command(name_str=op ,args_map=dict(args) ,errors_list=list(errs)))
- return j
+ self.config_rel_fpath = self.config_abs_fpath.relative_to(self.stage_root_dpath)
+ except Exception:
+ self.config_rel_fpath = Path(self.config_abs_fpath.name)
+
+ # Where the config file lives (used to anchor relative write dirs)
+ self.read_dir_dpath = self.config_abs_fpath.parent
+
+ # “py-less” filename: strip .stage.py, else .py, else keep name
+ name = self.config_abs_fpath.name
+ if name.endswith(".stage.py"):
+ self.read_fname = name[:-len(".stage.py")]
+ elif name.endswith(".py"):
+ self.read_fname = name[:-3]
+ else:
+ self.read_fname = name
+
+ def print(self, *, file=None) -> None:
+ """
+ Given: optional file-like (defaults to stdout).
+ Does: print a readable, multi-line summary of provenance.
+ Returns: None.
+ """
+ if file is None:
+ import sys as _sys
+ file = _sys.stdout
+
+ print(f"Stage root: {self.stage_root_dpath}", file=file)
+ print(f"Config (rel): {self.config_rel_fpath.as_posix()}", file=file)
+ print(f"Config (abs): {self.config_abs_fpath}", file=file)
+ print(f"Read dir: {self.read_dir_dpath}", file=file)
+ print(f"Read fname: {self.read_fname}", file=file)
+
+
+
+# ===== Admin-facing defaults carrier =====
+
+class WriteFileMeta:
+ """
+ WriteFileMeta — per-call or planner-default write-file attributes.
+
+ Given dpath (abs str/Path) ,fname (bare name or None) ,owner (str)
+ ,mode (int|'0644') ,content (bytes|str|None).
+ Does normalize into fields (may remain None if absent/invalid).
+ Returns object suitable for providing defaults to Planner methods.
+ """
+ __slots__ = ("dpath_str" ,"fname" ,"owner_name_str" ,"mode_int" ,"mode_octal_str" ,"content_bytes")
+
+ def __init__(self
+ ,*
+ ,dpath="/"
+ ,fname=None # None or "." → let Planner resolve (provenance fallback)
+ ,owner="root" # "." → current process user (resolved by Planner)
+ ,mode=0o444
+ ,content=None
+ ):
+ self.dpath_str = norm_dpath_str(dpath)
+ # keep "." as a sentinel; otherwise validate the filename
+ if fname == ".":
+ self.fname = "."
+ else:
+ self.fname = norm_fname_or_none(fname)
+
+ # keep "." as a sentinel; otherwise normalize owner
+ if owner == ".":
+ self.owner_name_str = "."
+ else:
+ self.owner_name_str = norm_nonempty_owner(owner)
+
+ self.mode_int, self.mode_octal_str = parse_mode(mode)
+ # content_'bytes' due to UTF8 encoding
+ self.content_bytes = norm_content_bytes(content)
+
+ def print(self, *, label: str | None = None, file=None) -> None:
+ """
+ Given: optional label and optional file-like (defaults to stdout).
+ Does: print a single-line summary of defaults/overrides.
+ Returns: None.
+ """
+ if file is None:
+ import sys as _sys
+ file = _sys.stdout
+
+ dpath = self.dpath_str or "?"
+ fname = self.fname or "?"
+ owner = self.owner_name_str or "?"
+ mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int, int) else (self.mode_octal_str or "?")
+ size = len(self.content_bytes) if isinstance(self.content_bytes, (bytes, bytearray)) else 0
+ prefix = (label + ": ") if label else ""
+ print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}", file=file)
+
# ===== Planner =====
"""
Planner — constructs a Journal of Commands from config scripts.
- Given: PlannerContext (provenance + defaults).
- Does: maintains a Journal; command methods (copy/displace/delete) create Command objects,
- fill missing args from context defaults, preflight minimal shape checks, then append.
- Returns: accessors for Journal and meta; no I/O or privilege here.
+ Given provenance (PlanProvenance) and optional default WriteFileMeta.
+ Does resolve command parameters by precedence: kwarg > per-call WriteFileMeta > planner default,
+ with a final filename fallback to provenance basename if still missing.
+ On any argument error, returns the Command with errors and DOES NOT append it to Journal.
+ Returns live Journal via journal().
"""
- def __init__(self ,ctx: PlannerContext)-> None:
- self._ctx = ctx
+ __slots__ = ("_prov" ,"_defaults" ,"_journal")
+
+ def __init__(self ,provenance: PlanProvenance ,defaults: WriteFileMeta|None=None)-> None:
+ self._prov = provenance
+ self._defaults = defaults if defaults is not None else WriteFileMeta(
+ dpath="/"
+ ,fname=provenance.read_fname
+ ,owner="root"
+ ,mode=0o444
+ ,content=None
+ )
self._journal = Journal()
- # seed provenance; outer tools can add more later
self._journal.set_meta(
- source_read_file_rel_fpath_str=ctx.read_file_rel_fpath.as_posix()
- ,stage_root_dpath_str=str(ctx.stage_root_dpath)
+ stage_root_dpath_str=str(self._prov.stage_root_dpath)
+ ,config_rel_fpath_str=self._prov.config_rel_fpath.as_posix()
)
- # --- Context management ---
-
- def set_context(self ,ctx: PlannerContext)-> None:
- "Given PlannerContext. Does replace current context. Returns None."
- self._ctx = ctx
+ # --- defaults management / access ---
- def context(self)-> PlannerContext:
- "Given n/a. Does return current context. Returns PlannerContext."
- return self._ctx
+ def set_defaults(self ,defaults: WriteFileMeta)-> None:
+ "Given WriteFileMeta. Does replace planner defaults. Returns None."
+ self._defaults = defaults
- # --- Journal access ---
+ def defaults(self)-> WriteFileMeta:
+ "Given n/a. Does return current WriteFileMeta defaults. Returns WriteFileMeta."
+ return self._defaults
def journal(self)-> Journal:
- "Given n/a. Does return the Journal (live). Returns Journal."
+ "Given n/a. Returns Journal reference (live, still being modified here)."
return self._journal
- # --- Helpers ---
-
- def _resolve_write_file(self ,write_file_dpath_str: str|None ,write_file_fname_str: str|None)-> tuple[str,str]:
- "Given optional write_file dpath/fname. Does fill from context; '.' fname → read_file basename. Returns (dpath,fname)."
- dpath_str = write_file_dpath_str if write_file_dpath_str is not None else self._ctx.default_write_file_dpath_str
- fname_str = write_file_fname_str if write_file_fname_str is not None else self._ctx.default_write_file_fname_str
- if fname_str == ".":
- fname_str = self._ctx.read_file_rel_fpath.name
- return dpath_str ,fname_str
-
- def _resolve_owner(self ,owner_name_str: str|None)-> str:
- "Given optional owner. Does fill from context. Returns owner string."
- return owner_name_str if owner_name_str is not None else self._ctx.default_owner_name_str
-
- def _resolve_mode(self ,perm: int|str|None)-> tuple[int|None,str|None]:
- "Given optional perm. Does normalize or fall back to context. Returns (mode_int,mode_octal_str)."
- if perm is None:
- return self._ctx.default_mode_int ,self._ctx.default_mode_octal_str
- norm = _norm_perm(perm)
- return (norm if norm is not None else (None ,None))
-
- def _resolve_content(self ,content: bytes|str|None)-> bytes|None:
- "Given optional content (bytes or str). Does normalize or fall back to context. Returns bytes|None."
- if content is None:
- return self._ctx.default_content_bytes
- if isinstance(content ,str):
- return content.encode("utf-8")
- return content
-
- # --- Command builders ---
+ # --- resolution helpers ---
+
+ def _pick(self ,kw ,meta_attr ,default_attr):
+ "Given three sources. Does pick first non-None. Returns value or None."
+ return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr)
+
+ # Planner.py (inside Planner)
+ def _resolve_write_file(self, wfm, dpath, fname) -> tuple[str|None, str|None]:
+ # normalize explicit kwargs (allow "." to pass through untouched)
+ dpath_str = norm_dpath_str(dpath) if dpath is not None else None
+ if fname is not None and fname != ".":
+ fname = norm_fname_or_none(fname)
+
+ dpath_val = self._pick(dpath_str, (wfm.dpath_str if wfm else None), self._defaults.dpath_str)
+ fname_val = self._pick(fname, (wfm.fname if wfm else None), self._defaults.fname)
+
+ # final fallback for filename: "." or None → derive from config name
+ if fname_val == "." or fname_val is None:
+ fname_val = self._prov.read_fname
+
+ # anchor relative dpaths against the config’s directory
+ if dpath_val is not None and not is_abs_dpath(dpath_val):
+ dpath_val = (self._prov.read_dir_dpath / dpath_val).as_posix()
+
+ return dpath_val, fname_val
+
+ def _resolve_owner_mode_content(self
+ ,wfm: WriteFileMeta|None
+ ,owner: str|None
+ ,mode: int|str|None
+ ,content: bytes|str|None
+ )-> tuple[str|None ,tuple[int|None ,str|None] ,bytes|None]:
+ owner_norm = norm_nonempty_owner(owner) if (owner is not None and owner != ".") else owner
+ mode_norm = parse_mode(mode) if mode is not None else (None, None)
+ content_b = norm_content_bytes(content) if content is not None else None
+
+ owner_v = self._pick(owner_norm, (wfm.owner_name_str if wfm else None), self._defaults.owner_name_str)
+
+ # resolve "." → current process user
+ if owner_v == ".":
+ owner_v = getpass.getuser()
+
+ mode_v = (mode_norm if mode_norm != (None, None) else
+ ((wfm.mode_int, wfm.mode_octal_str) if wfm else (self._defaults.mode_int, self._defaults.mode_octal_str)))
+ content_v = self._pick(content_b, (wfm.content_bytes if wfm else None), self._defaults.content_bytes)
+ return owner_v, mode_v, content_v
+
+ def print(self, *, show_journal: bool = True, file=None) -> None:
+ """
+ Given: flags (show_journal) and optional file-like (defaults to stdout).
+ Does: print provenance, defaults, and optionally the journal via delegation.
+ Returns: None.
+ """
+ if file is None:
+ import sys as _sys
+ file = _sys.stdout
+
+ print("== Provenance ==", file=file)
+ self._prov.print(file=file)
+
+ print("\n== Defaults ==", file=file)
+ self._defaults.print(label="defaults", file=file)
+
+ if show_journal:
+ entries = getattr(self._journal, "command_list", [])
+ n_total = len(entries)
+ n_copy = sum(1 for c in entries if getattr(c, "name_str", None) == "copy")
+ n_disp = sum(1 for c in entries if getattr(c, "name_str", None) == "displace")
+ n_del = sum(1 for c in entries if getattr(c, "name_str", None) == "delete")
+
+ print("\n== Journal ==", file=file)
+ print(f"entries: {n_total} copy:{n_copy} displace:{n_disp} delete:{n_del}", file=file)
+ if n_total:
+ self._journal.print(index_start=1, file=file)
+ else:
+ print("(plan is empty)", file=file)
+
+ # --- Command builders (first arg may be WriteFileMeta) ---
def copy(self
- ,*
- ,write_file_dpath_str: str|None=None
- ,write_file_fname_str: str|None=None
- ,owner_name_str: str|None=None
- ,perm: int|str|None=None
- ,content: bytes|str|None=None
- ,read_file_rel_fpath: Path|None=None
+ ,wfm: WriteFileMeta|None=None
+ ,*
+ ,write_file_dpath: str|Path|None=None
+ ,write_file_fname: str|None=None
+ ,owner: str|None=None
+ ,mode: int|str|None=None
+ ,content: bytes|str|None=None
)-> Command:
"""
- Given: optional overrides for write_file (dpath,fname,owner,perm), content, and read_file_rel_fpath.
- Does: build a 'copy' command entry (content is embedded; read_file path kept as provenance).
- Returns: Command (also appended to Journal).
+ Given optional WriteFileMeta plus keyword overrides.
+ Does build a 'copy' command; on any argument error the command is returned with errors and NOT appended.
+ Returns Command.
"""
cmd = Command("copy")
- # resolve basics
- wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
- owner_str = self._resolve_owner(owner_name_str)
- mode_int ,mode_oct = self._resolve_mode(perm)
- content_b = self._resolve_content(content)
- read_rel = (read_file_rel_fpath if read_file_rel_fpath is not None else self._ctx.read_file_rel_fpath)
-
- # minimal shape checks (well-formedness, not policy)
- if not _is_abs_dpath(wf_dpath_str):
- cmd.add_error("write_file_dpath_str must be absolute and non-empty")
- if not wf_fname_str or "/" in wf_fname_str:
- cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
- if not owner_str:
- cmd.add_error("owner_name_str must be non-empty")
+ dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
+ owner_v ,(mode_int ,mode_oct) ,content_b = self._resolve_owner_mode_content(wfm ,owner ,mode ,content)
+
+ # well-formed checks
+ if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute")
+ if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename")
+ if not owner_v: cmd.add_error("owner must be non-empty")
if (mode_int ,mode_oct) == (None ,None):
- cmd.add_error("perm must be an int <= 0o7777 or a 4-digit octal string")
+ cmd.add_error("mode must be int <= 0o7777 or 3/4-digit octal string")
if content_b is None:
cmd.add_error("content is required for copy() (bytes or str)")
- cmd.args_map.update({
- "write_file_dpath_str": wf_dpath_str
- ,"write_file_fname_str": wf_fname_str
- ,"owner_name_str": owner_str
- ,"mode_int": mode_int
- ,"mode_octal_str": mode_oct
- ,"content_bytes": content_b
- ,"read_file_rel_fpath_str": read_rel.as_posix()
+ cmd.arg_dict.update({
+ "write_file_dpath_str": dpath,
+ "write_file_fname": fname, # was write_file_fname
+ "owner_name": owner_v, # was owner_name_str
+ "mode_int": mode_int,
+ "mode_octal_str": mode_oct,
+ "content_bytes": content_b,
+ "provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix(),
})
- self._journal.append(cmd)
+
+ if not cmd.errors_list:
+ self._journal.append(cmd)
return cmd
def displace(self
- ,*
- ,write_file_dpath_str: str|None=None
- ,write_file_fname_str: str|None=None
+ ,wfm: WriteFileMeta|None=None
+ ,*
+ ,write_file_dpath: str|Path|None=None
+ ,write_file_fname: str|None=None
)-> Command:
- """
- Given: optional write_file dpath/fname overrides.
- Does: build a 'displace' command (rename existing write_file in-place with UTC suffix).
- Returns: Command (appended).
- """
+ "Given optional WriteFileMeta plus overrides. Does build 'displace' entry or return errors. Returns Command."
cmd = Command("displace")
- wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
-
- if not _is_abs_dpath(wf_dpath_str):
- cmd.add_error("write_file_dpath_str must be absolute and non-empty")
- if not wf_fname_str or "/" in wf_fname_str:
- cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
-
- cmd.args_map.update({
- "write_file_dpath_str": wf_dpath_str
- ,"write_file_fname_str": wf_fname_str
+ dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
+ if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute")
+ if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename")
+ cmd.arg_dict.update({
+ "write_file_dpath_str": dpath,
+ "write_file_fname": fname,
})
- self._journal.append(cmd)
+ if not cmd.errors_list:
+ self._journal.append(cmd)
return cmd
def delete(self
- ,*
- ,write_file_dpath_str: str|None=None
- ,write_file_fname_str: str|None=None
+ ,wfm: WriteFileMeta|None=None
+ ,*
+ ,write_file_dpath: str|Path|None=None
+ ,write_file_fname: str|None=None
)-> Command:
- """
- Given: optional write_file dpath/fname overrides.
- Does: build a 'delete' command (unlink if present).
- Returns: Command (appended).
- """
+ "Given optional WriteFileMeta plus overrides. Does build 'delete' entry or return errors. Returns Command."
cmd = Command("delete")
- wf_dpath_str ,wf_fname_str = self._resolve_write_file(write_file_dpath_str ,write_file_fname_str)
-
- if not _is_abs_dpath(wf_dpath_str):
- cmd.add_error("write_file_dpath_str must be absolute and non-empty")
- if not wf_fname_str or "/" in wf_fname_str:
- cmd.add_error("write_file_fname_str must be a simple filename (no '/')")
-
- cmd.args_map.update({
- "write_file_dpath_str": wf_dpath_str
- ,"write_file_fname_str": wf_fname_str
+ dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
+ if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute")
+ if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename")
+ cmd.arg_dict.update({
+ "write_file_dpath_str": dpath,
+ "write_file_fname": fname,
})
- self._journal.append(cmd)
+ if not cmd.errors_list:
+ self._journal.append(cmd)
return cmd
+
+
+
+++ /dev/null
-#!/usr/bin/env python3
-"""
-deploy.py — Deploy staged DNS bundle (Unbound per-subu + nft redirect)
-RT-v2025.09.15.4
-
-What it does
- - Installs the staged tree under ./stage into /
- - systemctl daemon-reload
- - nft -f /etc/nftables.conf (relies on: include "/etc/nftables.d/*.nft")
- - enable + restart unbound@<instance> for each instance (default: US x6)
-
-Assumptions
- - This file lives next to install_staged_tree.py
- - Stage contains:
- stage/etc/nftables.d/10-block-IPv6.nft
- stage/etc/nftables.d/20-SUBU-ports.nft
- stage/etc/systemd/system/unbound@.service
- stage/etc/unbound/unbound-US.conf (127.0.0.1@5301)
- stage/etc/unbound/unbound-x6.conf (127.0.0.1@5302)
- - /etc/nftables.conf has: include "/etc/nftables.d/*.nft"
-
-Exit codes
- 0 = success, 2 = preflight/deploy error
-"""
-from __future__ import annotations
-from pathlib import Path
-import argparse
-import importlib
-import os
-import subprocess
-import sys
-
-ROOT = Path(__file__).resolve().parent
-STAGE = ROOT / "stage"
-
-def _run(cmd: list[str]) -> tuple[int, str, str]:
- cp = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- return (cp.returncode, cp.stdout.strip(), cp.stderr.strip())
-
-def _preflight_errors() -> list[str]:
- errs = []
- if os.geteuid() != 0:
- errs.append("must be run as root (sudo)")
- if not STAGE.exists():
- errs.append(f"stage dir missing: {STAGE}")
- return errs
-
-def _install_stage(stage_root: Path) -> list[str]:
- """
- Call install_staged_tree.install_staged_tree(stage_root=..., dest_root=/, create_dirs=True)
- and return its log lines.
- """
- sys.path.insert(0, str(ROOT))
- try:
- ist = importlib.import_module("install_staged_tree")
- except Exception as e:
- raise RuntimeError(f"failed to import install_staged_tree: {e}")
-
- # Expect signature: (stage_root, dest_root, create_dirs=False, skip_identical=True) -> (logs, ifaces)
- try:
- logs, _ifaces = ist.install_staged_tree(
- stage_root=stage_root,
- dest_root=Path("/"),
- create_dirs=True,
- skip_identical=True,
- )
- except TypeError as te:
- # Fallback for older two-arg signature: install_staged_tree(stage_root, dest_root)
- try:
- logs, _ifaces = ist.install_staged_tree(stage_root, Path("/"))
- except Exception as e2:
- raise RuntimeError(f"install_staged_tree() call failed: {e2}") from te
- return logs
-
-def deploy(instances: list[str]) -> list[str]:
- logs: list[str] = []
-
- # Plan
- logs.append("Deploy DNS plan:")
- logs.append(f" instances: {', '.join(instances)}")
- logs.append(f" stage: {STAGE}")
- logs.append(f" root: /")
- logs.append("")
- logs.append("Installing staged artifacts…")
-
- # Install staged files
- install_logs = _install_stage(STAGE)
- logs.extend(install_logs)
-
- # Reload systemd units (for unbound@.service changes)
- _run(["systemctl", "daemon-reload"])
-
- # Apply nftables from the main config (which includes drop-ins)
- rc, out, err = _run(["/usr/sbin/nft", "-f", "/etc/nftables.conf"])
- if rc != 0:
- raise RuntimeError(f"nftables apply failed:\n{err or out}")
-
- # Sanity: verify our tables are present
- rc2, out2, err2 = _run(["/usr/sbin/nft", "list", "tables"])
- if rc2 != 0:
- raise RuntimeError(f"nftables list tables failed:\n{err2 or out2}")
-
- required = {"inet NO-IPV6", "inet SUBU-DNS-REDIRECT", "inet SUBU-PORT-EGRESS"}
- present = set()
- for line in out2.splitlines():
- parts = line.strip().split()
- # lines look like: "table inet FOO"
- if len(parts) == 3 and parts[0] == "table":
- present.add(f"{parts[1]} {parts[2]}")
- missing = required - present
- if missing:
- raise RuntimeError(f"nftables missing tables: {', '.join(sorted(missing))}")
-
- # Enable + restart unbound instances
- for inst in instances:
- unit = f"unbound@{inst}.service"
- _run(["systemctl", "enable", unit])
- _run(["systemctl", "restart", unit])
- rcA, _, _ = _run(["systemctl", "is-active", unit])
- logs.append(f"{unit}: {'active' if rcA == 0 else 'inactive'}")
-
- logs.append("")
- logs.append("✓ DNS deploy complete.")
- return logs
-
-def main(argv=None) -> int:
- ap = argparse.ArgumentParser(description="Deploy staged DNS (Unbound per-subu + nft redirect).")
- ap.add_argument("--instances", nargs="+", default=["US", "x6"],
- help="Unbound instances to enable (default: US x6)")
- args = ap.parse_args(argv)
-
- errs = _preflight_errors()
- if errs:
- print("❌ deploy preflight found issue(s):", file=sys.stderr)
- for e in errs:
- print(f" - {e}", file=sys.stderr)
- return 2
-
- try:
- logs = deploy(args.instances)
- print("\n".join(logs))
- return 0
- except Exception as e:
- print(f"❌ deploy failed: {e}", file=sys.stderr)
- return 2
-
-if __name__ == "__main__":
- sys.exit(main())
--- /dev/null
+table inet NO-IPV6 {
+ chain input {
+ type filter hook input priority raw; policy accept;
+ meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop
+ }
+
+ chain output {
+ type filter hook output priority raw; policy accept;
+ meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop
+ }
+
+ chain forward {
+ type filter hook forward priority raw; policy accept;
+ meta nfproto ipv6 counter comment "drop all IPv6 forward" drop
+ }
+}
--- /dev/null
+table inet SUBU-DNS-REDIRECT {
+ chain output {
+ type nat hook output priority -100; policy accept;
+
+ # Redirect DNS for the subu UIDs to local Unbound listeners
+ meta skuid 2017 udp dport 53 redirect to :5301
+ meta skuid 2018 udp dport 53 redirect to :5302
+ meta skuid 2017 tcp dport 53 redirect to :5301
+ meta skuid 2018 tcp dport 53 redirect to :5302
+ }
+}
+
+table inet SUBU-PORT-EGRESS {
+ chain output {
+ type filter hook output priority 0; policy accept;
+
+ # Always allow loopback on egress
+ oifname "lo" accept
+
+ # No IPv6 for subu (until you reintroduce v6)
+ meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop
+
+ ##### x6 (UID 2018)
+ # Block some exfil channels regardless of iface
+ meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop
+ meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop
+ meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop
+
+ # (Optional) allow ICMP echo out via x6
+ meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept
+
+ # Enforce interface binding
+ meta skuid 2018 oifname "x6" accept
+ meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop
+
+ ##### US (UID 2017)
+ meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission"
+ meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN"
+ meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)"
+
+ # (Optional) ICMP via US
+ meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept
+
+ meta skuid 2017 oifname "US" accept
+ meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop
+ }
+}
--- /dev/null
+[Unit]
+Description=Unbound DNS instance for %i (per-subu tunnel egress)
+After=network-online.target wg-quick@%i.service
+Requires=wg-quick@%i.service
+Wants=network-online.target
+
+[Service]
+Type=simple
+ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf
+User=unbound
+Group=unbound
+Restart=on-failure
+RestartSec=2s
+AmbientCapabilities=CAP_NET_BIND_SERVICE
+CapabilityBoundingSet=CAP_NET_BIND_SERVICE
+NoNewPrivileges=true
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+server:
+ username: "unbound"
+ chroot: ""
+ directory: "/etc/unbound"
+ do-daemonize: no
+ interface: 127.0.0.1@5301
+ hide-identity: yes
+ hide-version: yes
+ harden-glue: yes
+ harden-dnssec-stripped: yes
+ qname-minimisation: yes
+ prefetch: yes
+ outgoing-interface: 10.0.0.1
+
+forward-zone:
+ name: "."
+ forward-addr: 1.1.1.1
+ forward-addr: 1.0.0.1
--- /dev/null
+server:
+ username: "unbound"
+ chroot: ""
+ directory: "/etc/unbound"
+ do-daemonize: no
+ interface: 127.0.0.1@5302
+ hide-identity: yes
+ hide-version: yes
+ harden-glue: yes
+ harden-dnssec-stripped: yes
+ qname-minimisation: yes
+ prefetch: yes
+ outgoing-interface: 10.8.0.2
+
+forward-zone:
+ name: "."
+ forward-addr: 1.1.1.1
+ forward-addr: 1.0.0.1
--- /dev/null
+#!/usr/bin/env bash
+set -euo pipefail
+echo "== DNS status =="
+systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true
+echo
+echo "== nftables =="
+nft list table inet NAT-DNS-REDIRECT || true
+echo
+echo "== Unbound logs (last 50 lines each) =="
+journalctl -u unbound@US -n 50 --no-pager || true
+echo
+journalctl -u unbound@x6 -n 50 --no-pager || true
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan.
+
+Given: a stage root directory.
+Does: loads Stage.py, executes each config, builds a native plan map, summarizes it.
+Returns: exit code 0 on success, non-zero on error.
+"""
+from __future__ import annotations
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from pathlib import Path
+import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib
+
+# ---------- helpers ----------
+
+def _load_stage_module(stage_root_dpath: Path):
+ "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
+ mod_fpath = stage_root_dpath/"Stage.py"
+ if not mod_fpath.exists():
+ raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
+ spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
+ mod = importlib.util.module_from_spec(spec)
+ sys.modules["Stage"] = mod
+ assert spec and spec.loader
+ spec.loader.exec_module(mod) # type: ignore
+ return mod
+
+def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
+ "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
+ rel_fpath_list: list[Path] = []
+ for p in stage_root_dpath.rglob("*.py"):
+ if p.name == "Stage.py": continue
+ if p.is_file():
+ rel_fpath_list.append(p.relative_to(stage_root_dpath))
+ return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
+
+def _sha256_hex(b: bytes)-> str:
+ "Given: bytes. Does: sha256. Returns: hex string."
+ return hashlib.sha256(b).hexdigest()
+
+# ---------- main ----------
+
+def main(argv: list[str]|None=None)-> int:
+ "Given: CLI. Does: show plan. Returns: exit code."
+ ap = argparse.ArgumentParser(prog="stage_show_plan.py"
+ ,description="Run staged config scripts and print the resulting plan.")
+ ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
+ args = ap.parse_args(argv)
+
+ stage_root_dpath = Path(args.stage)
+ StageMod = _load_stage_module(stage_root_dpath)
+ Stage = StageMod.Stage
+ Stage._reset()
+ Stage.set_meta(
+ planner_user_name=getpass.getuser()
+ ,planner_uid_int=os.getuid()
+ ,planner_gid_int=os.getgid()
+ ,host_name=socket.gethostname()
+ ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
+ )
+
+ for rel_fpath in _config_rel_fpaths(stage_root_dpath):
+ Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
+ runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
+ Stage._end()
+
+ plan_map = Stage.plan_object()
+ entries_list = plan_map["entries_list"]
+ print(f"Plan version: {plan_map['version_int']}")
+ print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} "
+ f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}")
+ print(f"Created: {plan_map['meta_map'].get('created_utc_str')}")
+ print(f"Entries: {len(entries_list)}\n")
+
+ for i ,e_map in enumerate(entries_list ,1):
+ op = e_map.get("op")
+ dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}"
+ if op == "copy":
+ content = e_map.get("content_bytes") or b""
+ sz = len(content)
+ mode = e_map.get("mode_octal_str") or "????"
+ owner = e_map.get("owner_name") or "?"
+ h = _sha256_hex(content)
+ print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…")
+ elif op == "displace":
+ print(f"{i:02d}. displace -> {dst_fpath_str}")
+ elif op == "delete":
+ print(f"{i:02d}. delete -> {dst_fpath_str}")
+ else:
+ print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now)
+
+Phase 1 (outer):
+ - Build a combined plan by executing each config's `configure(prov, planner, WriteFileMeta)`.
+ - Optionally print the plan via Planner.print().
+ - Optionally stop.
+
+Phase 2 (inner shim in same program for now; no privilege yet):
+ - Encode combined plan to CBOR and pass to inner path.
+ - Inner decodes back to a Journal and optionally prints it.
+ - Optionally stop.
+
+Discovery:
+ - --stage (default: ./stage) points at the stage directory root.
+ - By default, *every file* under --stage (recursively) is executed as a config,
+ regardless of extension. Editors can still use .py for highlighting; we strip
+ only a trailing ".py" to derive prov.read_fname.
+
+"""
+
+from __future__ import annotations
+
+# no bytecode anywhere
+import sys, os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1")
+
+from pathlib import Path
+import argparse
+import getpass
+import tempfile
+import runpy
+import subprocess
+import datetime as _dt
+import os, fnmatch, stat
+
+
+# Local module: Planner.py (same directory)
+from Planner import (
+ Planner, PlanProvenance, WriteFileMeta, Journal, Command,
+)
+
+# -------- utilities --------
+
+def iso_utc_now_str() -> str:
+ return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+
+def _split_globs(glob_arg: str) -> list[str]:
+ parts = [g.strip() for g in (glob_arg or "").split(",") if g.strip()]
+ # Default includes both deep and top-level files
+ return parts or ["**/*", "*"]
+
+def find_config_paths(stage_root: Path, glob_arg: str) -> list[Path]:
+ """
+ Given stage root and comma-glob string, return sorted list of files (regular or symlink).
+ Defaults to match ALL files under stage, including top-level ones.
+ """
+ root = stage_root.resolve()
+ patterns = _split_globs(glob_arg)
+ out: set[Path] = set()
+
+ for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
+ # (optional) prune symlinked dirs to avoid cycles; files can still be symlinks
+ dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath, d))]
+
+ for fname in filenames:
+ f_abs = Path(dirpath, fname)
+ rel = f_abs.relative_to(root).as_posix()
+ if any(fnmatch.fnmatch(rel, pat) for pat in patterns):
+ try:
+ st = f_abs.lstat()
+ if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
+ out.add(f_abs)
+ except Exception:
+ # unreadable/broken entries are skipped
+ pass
+
+ return sorted(out, key=lambda p: p.as_posix())
+
+
+
+def _run_one_config(config_path: Path, stage_root: Path) -> Planner:
+ """Execute a single config's `configure(prov, planner, WriteFileMeta)` and return that config's Planner."""
+ prov = PlanProvenance(stage_root=stage_root, config_path=config_path)
+ per_planner = Planner(provenance=prov) # defaults derive from this file's provenance
+ env = runpy.run_path(str(config_path))
+ fn = env.get("configure")
+ if not callable(fn):
+ raise RuntimeError(f"{config_path}: missing callable configure(prov, planner, WriteFileMeta)")
+ fn(prov, per_planner, WriteFileMeta)
+ return per_planner
+
+def _aggregate_into_master(stage_root: Path, planners: list[Planner]) -> Planner:
+ """Create a master Planner and copy all Commands from per-config planners into it."""
+ # Synthetic provenance for the master planner (used only for display/meta)
+ fake_config = stage_root / "(aggregate).py"
+ master = Planner(PlanProvenance(stage_root=stage_root, config_path=fake_config))
+
+ # annotate meta
+ master.journal().set_meta(
+ generator_prog_str="executor.py",
+ generated_at_utc_str=iso_utc_now_str(),
+ user_name_str=getpass.getuser(),
+ host_name_str=os.uname().nodename if hasattr(os, "uname") else "unknown",
+ stage_root_dpath_str=str(stage_root.resolve()),
+ configs_list=[p._prov.config_rel_fpath.as_posix() for p in planners],
+ )
+
+ # copy commands
+ out_j = master.journal()
+ for p in planners:
+ for cmd in p.journal().command_list:
+ out_j.append(cmd) # keep Command objects as-is
+ return master
+
+# ----- CBOR “matchbox” (simple wrapper kept local to executor) -----
+
+def _plan_to_cbor_bytes(planner: Planner) -> bytes:
+ """Serialize a Planner's Journal to CBOR bytes."""
+ try:
+ import cbor2
+ except Exception as e:
+ raise RuntimeError(f"cbor2 is required: {e}")
+ plan_dict = planner.journal().as_dictionary()
+ return cbor2.dumps(plan_dict, canonical=True)
+
+def _journal_from_cbor_bytes(data: bytes) -> Journal:
+ """Rebuild a Journal from CBOR bytes."""
+ try:
+ import cbor2
+ except Exception as e:
+ raise RuntimeError(f"cbor2 is required: {e}")
+ obj = cbor2.loads(data)
+ if not isinstance(obj, dict):
+ raise ValueError("CBOR root must be a dict")
+ return Journal(plan_dict=obj)
+
+# -------- inner executor (phase 2) --------
+
+def _inner_main(plan_path: Path, phase2_print: bool, phase2_then_stop: bool) -> int:
+ """Inner executor path: decode CBOR → Journal; optionally print; (apply TBD)."""
+ try:
+ data = Path(plan_path).read_bytes()
+ except Exception as e:
+ print(f"error: failed to read plan file: {e}", file=sys.stderr)
+ return 2
+
+ try:
+ journal = _journal_from_cbor_bytes(data)
+ except Exception as e:
+ print(f"error: failed to decode CBOR: {e}", file=sys.stderr)
+ return 2
+
+ if phase2_print:
+ journal.print()
+
+ if phase2_then_stop:
+ return 0
+
+ # (Stage 3 apply would go here; omitted in MVP)
+ return 0
+
+# -------- outer executor (phase 1 & handoff) --------
+
+def _outer_main(args) -> int:
+ stage_root = Path(args.stage)
+ if not stage_root.is_dir():
+ print(f"error: --stage not a directory: {stage_root}", file=sys.stderr)
+ return 2
+
+ cfgs = find_config_paths(stage_root, args.glob)
+ if not cfgs:
+ print("No configuration files found.")
+ return 0
+
+ # Execute each config into its own planner
+ per_planners: list[Planner] = []
+ for cfg in cfgs:
+ try:
+ per_planners.append(_run_one_config(cfg, stage_root))
+ except SystemExit:
+ raise
+ except Exception as e:
+ print(f"error: executing {cfg}: {e}", file=sys.stderr)
+ return 2
+
+ # Aggregate into a single master planner for printing/CBOR
+ master = _aggregate_into_master(stage_root, per_planners)
+
+ if args.phase_1_print:
+ master.print()
+
+ if args.phase_1_then_stop:
+ return 0
+
+ # Phase 2: encode CBOR and invoke inner path (same script, --inner)
+ try:
+ cbor_bytes = _plan_to_cbor_bytes(master)
+ except Exception as e:
+ print(f"error: CBOR encode failed: {e}", file=sys.stderr)
+ return 2
+
+ with tempfile.NamedTemporaryFile(prefix="stagehand_plan_", suffix=".cbor", delete=False) as tf:
+ tf.write(cbor_bytes)
+ plan_path = tf.name
+
+ try:
+ cmd = [
+ sys.executable,
+ str(Path(__file__).resolve()),
+ "--inner",
+ "--plan", plan_path,
+ ]
+ if args.phase_2_print:
+ cmd.append("--phase-2-print")
+ if args.phase_2_then_stop:
+ cmd.append("--phase-2-then-stop")
+
+ proc = subprocess.run(cmd)
+ return proc.returncode
+ finally:
+ try:
+ os.unlink(plan_path)
+ except Exception:
+ pass
+
+# -------- CLI --------
+
+def main(argv: list[str] | None = None) -> int:
+ ap = argparse.ArgumentParser(
+ prog="executor.py",
+ description="StageHand outer/inner executor (plan → CBOR → decode).",
+ )
+ ap.add_argument("--stage", default="stage", help="stage root directory (default: ./stage)")
+
+ ap.add_argument("--glob", default="**/*",
+ help="glob for config scripts under --stage (default: '**/*' = all files)")
+
+ # ap.add_argument("--glob",
+ # default="**/*",
+ # help="comma-separated globs under --stage (default: **/*; every file is a config)")
+
+ # Phase-1 (outer) controls
+ ap.add_argument("--phase-1-print", action="store_true", help="print master planner (phase 1)")
+ ap.add_argument("--phase-1-then-stop", action="store_true", help="stop after phase 1")
+
+ # Phase-2 (inner) controls (outer forwards these to inner)
+ ap.add_argument("--phase-2-print", action="store_true", help="print decoded journal (phase 2)")
+ ap.add_argument("--phase-2-then-stop", action="store_true", help="stop after phase 2 decode")
+
+ # Inner-only flags (not for users)
+ ap.add_argument("--inner", action="store_true", help=argparse.SUPPRESS)
+ ap.add_argument("--plan", default=None, help=argparse.SUPPRESS)
+
+ args = ap.parse_args(argv)
+
+ if args.inner:
+ if not args.plan:
+ print("error: --inner requires --plan <file>", file=sys.stderr)
+ return 2
+ return _inner_main(Path(args.plan),
+ phase2_print=args.phase_2_print,
+ phase2_then_stop=args.phase_2_then_stop)
+
+ return _outer_main(args)
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+++ /dev/null
-#!/usr/bin/env python3
-"""
-install_staged_tree.py
-RT-v2025.09.15.2
-
-A dumb installer: copy staged files into the target root with backups and
-deterministic permissions. No systemd stop/start, no daemon-reload.
-
-- Extended whitelist to include DNS bundle assets:
- * /etc/unbound/*.conf -> 0644
- * /etc/nftables.d/*.nft -> 0644
- * /usr/local/sbin/* -> 0500
- * /etc/systemd/system/*.service -> 0644
-- Keeps existing WireGuard/iproute2 handling.
-- API unchanged:
- install_staged_tree(stage_root: Path, dest_root: Path,
- create_dirs=False, skip_identical=True)
- -> returns (logs: list[str], detected_ifaces: list[str])
-"""
-
-from __future__ import annotations
-from pathlib import Path
-from typing import List, Optional, Sequence, Tuple
-import argparse
-import datetime as dt
-import hashlib
-import os
-import shutil
-import sys
-
-ROOT = Path(__file__).resolve().parent
-DEFAULT_STAGE = ROOT / "stage"
-
-def _sha256(path: Path) -> str:
- h = hashlib.sha256()
- with path.open("rb") as f:
- for chunk in iter(lambda: f.read(1<<20), b""):
- h.update(chunk)
- return h.hexdigest()
-
-def _ensure_parents(dest_root: Path, rel: Path, create: bool) -> None:
- parent = (dest_root / rel).parent
- if parent.exists():
- return
- if not create:
- raise RuntimeError(f"missing parent directory: {parent}")
- parent.mkdir(parents=True, exist_ok=True)
-
-def _backup_existing_to_stage(stage_root: Path, dest_root: Path, rel: Path) -> Optional[Path]:
- """If target exists, copy it back into stage/_backups/<ts>/<rel> and return backup path."""
- target = dest_root / rel
- if not target.exists():
- return None
- ts = dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
- backup = stage_root / "_backups" / ts / rel
- backup.parent.mkdir(parents=True, exist_ok=True)
- shutil.copy2(target, backup)
- return backup
-
-def _atomic_install(src: Path, dst: Path, mode: int) -> None:
- tmp = dst.with_suffix(dst.suffix + ".tmp")
- shutil.copyfile(src, tmp)
- os.chmod(tmp, mode)
- try:
- os.chown(tmp, 0, 0) # best-effort
- except PermissionError:
- pass
- os.replace(tmp, dst)
-
-def _mode_for_rel(rel: Path) -> Optional[int]:
- """Choose a mode based on the relative path bucket."""
- s = str(rel)
-
- # Existing buckets
- if s.startswith("usr/local/bin/"):
- return 0o500
- if s.startswith("etc/wireguard/") and rel.suffix == ".conf":
- return 0o600
- if s == "etc/iproute2/rt_tables":
- return 0o644
- if s.startswith("etc/systemd/system/") and s.endswith(".conf"):
- return 0o644
-
- # NEW: DNS bundle buckets
- if s.startswith("usr/local/sbin/"):
- return 0o500
- if s.startswith("etc/unbound/") and s.endswith(".conf"):
- return 0o644
- if s.startswith("etc/nftables.d/") and s.endswith(".nft"):
- return 0o644
- if s.startswith("etc/systemd/system/") and s.endswith(".service"):
- return 0o644
-
- return None
-
-def _iter_stage_targets(stage_root: Path) -> List[Path]:
- """Return a list of *relative* paths under stage that match our whitelist."""
- rels: List[Path] = []
-
- # /usr/local/bin/*
- bin_dir = stage_root / "usr" / "local" / "bin"
- if bin_dir.is_dir():
- for p in sorted(bin_dir.glob("*")):
- if p.is_file():
- rels.append(p.relative_to(stage_root))
-
- # NEW: /usr/local/sbin/*
- sbin_dir = stage_root / "usr" / "local" / "sbin"
- if sbin_dir.is_dir():
- for p in sorted(sbin_dir.glob("*")):
- if p.is_file():
- rels.append(p.relative_to(stage_root))
-
- # /etc/wireguard/*.conf
- wg_dir = stage_root / "etc" / "wireguard"
- if wg_dir.is_dir():
- for p in sorted(wg_dir.glob("*.conf")):
- rels.append(p.relative_to(stage_root))
-
- # /etc/systemd/system/wg-quick@*.service.d/*.conf
- sysd_dir = stage_root / "etc" / "systemd" / "system"
- if sysd_dir.is_dir():
- for p in sorted(sysd_dir.rglob("wg-quick@*.service.d/*.conf")):
- rels.append(p.relative_to(stage_root))
-
- # NEW: /etc/systemd/system/*.service
- if sysd_dir.is_dir():
- for p in sorted(sysd_dir.glob("*.service")):
- if p.is_file():
- rels.append(p.relative_to(stage_root))
-
- # /etc/iproute2/rt_tables
- rt = stage_root / "etc" / "iproute2" / "rt_tables"
- if rt.is_file():
- rels.append(rt.relative_to(stage_root))
-
- # NEW: /etc/unbound/*.conf
- ub_dir = stage_root / "etc" / "unbound"
- if ub_dir.is_dir():
- for p in sorted(ub_dir.glob("*.conf")):
- rels.append(p.relative_to(stage_root))
-
- # NEW: /etc/nftables.d/*.nft
- nft_dir = stage_root / "etc" / "nftables.d"
- if nft_dir.is_dir():
- for p in sorted(nft_dir.glob("*.nft")):
- rels.append(p.relative_to(stage_root))
-
- return rels
-
-def _discover_ifaces_from_stage(stage_root: Path) -> List[str]:
- """Peek into staged artifacts to guess iface names (for friendly next-steps)."""
- names = set()
-
- # from /etc/wireguard/<iface>.conf
- wg_dir = stage_root / "etc" / "wireguard"
- if wg_dir.is_dir():
- for p in wg_dir.glob("*.conf"):
- names.add(p.stem)
-
- # from /etc/systemd/system/wg-quick@<iface>.service.d/
- sysd = stage_root / "etc" / "systemd" / "system"
- if sysd.is_dir():
- for d in sysd.glob("wg-quick@*.service.d"):
- name = d.name
- at = name.find("@")
- dot = name.find(".service.d")
- if at != -1 and dot != -1 and dot > at:
- names.add(name[at+1:dot])
-
- return sorted(names)
-
-def install_staged_tree(
- stage_root: Path,
- dest_root: Path,
- create_dirs: bool = False,
- skip_identical: bool = True,
-) -> Tuple[List[str], List[str]]:
- """
- Copy files from stage_root to dest_root.
- Returns (logs, detected_ifaces).
- """
- old_umask = os.umask(0o077)
- logs: List[str] = []
- try:
- staged = _iter_stage_targets(stage_root)
- if not staged:
- raise RuntimeError("nothing to install (stage is empty or whitelist didn’t match)")
-
- for rel in staged:
- src = stage_root / rel
- dst = dest_root / rel
-
- mode = _mode_for_rel(rel)
- if mode is None:
- logs.append(f"skip (not whitelisted): {rel}")
- continue
-
- _ensure_parents(dest_root, rel, create_dirs)
-
- backup = _backup_existing_to_stage(stage_root, dest_root, rel)
- if backup:
- logs.append(f"backup: {dst} -> {backup}")
-
- if skip_identical and dst.exists():
- try:
- if _sha256(src) == _sha256(dst):
- logs.append(f"identical: skip {rel}")
- continue
- except Exception:
- pass
-
- _atomic_install(src, dst, mode)
- logs.append(f"install: {rel} (mode {oct(mode)})")
-
- ifaces = _discover_ifaces_from_stage(stage_root)
- return (logs, ifaces)
- finally:
- os.umask(old_umask)
-
-def _require_root(allow_nonroot: bool) -> None:
- if not allow_nonroot and os.geteuid() != 0:
- raise RuntimeError("must run as root (use --force-nonroot to override)")
-
-def main(argv: Optional[Sequence[str]] = None) -> int:
- ap = argparse.ArgumentParser(description="Install staged artifacts into a target root. No service control.")
- ap.add_argument("--stage", default=str(DEFAULT_STAGE))
- ap.add_argument("--root", default="/")
- ap.add_argument("--create-dirs", action="store_true", help="create missing parent directories")
- ap.add_argument("--no-skip-identical", action="store_true", help="always replace even if content identical")
- ap.add_argument("--force-nonroot", action="store_true", help="allow non-root install (ownership may be wrong)")
- args = ap.parse_args(argv)
-
- try:
- _require_root(allow_nonroot=args.force_nonroot)
- logs, ifaces = install_staged_tree(
- stage_root=Path(args.stage),
- dest_root=Path(args.root),
- create_dirs=args.create_dirs,
- skip_identical=(not args.no_skip_identical),
- )
- for line in logs:
- print(line)
-
- print("\n=== Summary ===")
- print(f"Installed {sum(1 for l in logs if l.startswith('install:'))} file(s).")
- if ifaces:
- lst = " ".join(ifaces)
- print(f"Detected interfaces from stage: {lst}")
- print("\nNext steps:")
- print(f" sudo ./start_iface.py {lst}")
- else:
- print("No interfaces detected in staged artifacts.")
- print("\nNext steps:")
- print(" sudo ./start_iface.py <iface> [more ifaces]")
- return 0
- except Exception as e:
- print(f"❌ install failed: {e}", file=sys.stderr)
- return 2
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-plan_show.py — build and display a staged plan (UNPRIVILEGED).
-
-Given: a stage directory of config scripts (*.stage.py by default).
-Does: executes each script with a pre-created Planner (P) and PlannerContext,
- aggregates Commands into a single Journal, by default prints from the CBOR
- round-trip (encode→decode) so the human view matches what will be shipped
- to stage_cp; runs well-formed (WF) invariant checks. Can emit CBOR if requested.
-Returns: exit status 0 on success; 2 on WF errors or usage errors.
-"""
-
-from __future__ import annotations
-
-# no bytecode anywhere
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse
-import datetime as _dt
-import getpass
-import runpy
-
-# local module (same dir): Planner
-from Planner import Planner ,PlannerContext ,Journal ,Command
-
-# ===== Utilities (general / reusable) =====
-
-def iso_utc_now_str()-> str:
- "Given n/a. Does return compact UTC timestamp. Returns YYYYMMDDTHHMMSSZ."
- return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def find_configs(stage_root_dpath: Path ,glob_pat_str: str)-> list[Path]:
- "Given stage root and glob. Does find matching files under stage. Returns list of absolute Paths."
- root = stage_root_dpath.resolve()
- return sorted((p for p in root.glob(glob_pat_str) if p.is_file()) ,key=lambda p: p.as_posix())
-
-def human_size(n: int)-> str:
- "Given byte count. Does format human size. Returns string."
- units = ["B","KB","MB","GB","TB"]
- i = 0
- x = float(max(0 ,n))
- while x >= 1024 and i < len(units)-1:
- x /= 1024.0
- i += 1
- return f"{x:.1f} {units[i]}"
-
-def _dst_path_str(args_map: dict)-> str:
- "Given args map. Does join write_file path. Returns POSIX path or '?'."
- d = args_map.get("write_file_dpath_str") or ""
- f = args_map.get("write_file_fname_str") or ""
- try:
- if d and f and "/" not in f:
- return (Path(d)/f).as_posix()
- except Exception:
- pass
- return "?"
-
-# ===== WF invariants (MVP) =====
-# These are “well-formedness” rules (shape/encoding/domain), not policy or privilege checks.
-
-def wf_check(journal: Journal)-> list[str]:
- """
- Given Journal. Does run invariant checks on meta and each Command entry. Returns list of error strings.
- Invariants (MVP):
- - meta_map: must include generator identity and stage_root_dpath_str.
- - entry.op ∈ {'copy','displace','delete'}
- - all ops: write_file_dpath_str absolute; write_file_fname_str is bare filename.
- - copy: owner_name_str non-empty; mode_int ∈ [0..0o7777] and no suid/sgid; content_bytes present (bytes).
- """
- errs: list[str] = []
- meta = journal.meta_map or {}
-
- # meta presence (light placeholder)
- if not isinstance(meta ,dict):
- errs.append("WF_META: meta_map must be a map")
- else:
- if not meta.get("stage_root_dpath_str"):
- errs.append("WF_META: missing stage_root_dpath_str")
- if not meta.get("generator_prog_str"):
- errs.append("WF_META: missing generator_prog_str")
-
- # entries
- for idx ,cmd in enumerate(journal.commands_list ,1):
- prefix = f"WF[{idx:02d}]"
- if not isinstance(cmd ,Command):
- errs.append(f"{prefix}: entry is not Command")
- continue
- op = cmd.name_str
- if op not in {"copy","displace","delete"}:
- errs.append(f"{prefix}: unknown op '{op}'")
- continue
- am = cmd.args_map or {}
- dpath = am.get("write_file_dpath_str")
- fname = am.get("write_file_fname_str")
- if not isinstance(dpath ,str) or not dpath.startswith("/"):
- errs.append(f"{prefix}: write_file_dpath_str must be absolute")
- if not isinstance(fname ,str) or not fname or "/" in fname:
- errs.append(f"{prefix}: write_file_fname_str must be a bare filename")
-
- if op == "copy":
- owner = am.get("owner_name_str")
- mode = am.get("mode_int")
- data = am.get("content_bytes")
- if not isinstance(owner ,str) or not owner.strip():
- errs.append(f"{prefix}: owner_name_str must be non-empty")
- if not isinstance(mode ,int) or not (0 <= mode <= 0o7777):
- errs.append(f"{prefix}: mode_int must be int in [0..0o7777]")
- elif (mode & 0o6000):
- errs.append(f"{prefix}: mode_int suid/sgid not allowed in MVP")
- if not isinstance(data ,(bytes,bytearray)):
- errs.append(f"{prefix}: content_bytes must be bytes")
- return errs
-
-# ===== Planner execution =====
-
-def _run_one_config(config_abs_fpath: Path ,stage_root_dpath: Path)-> Planner:
- """
- Given abs path to a config script and stage root. Does construct a PlannerContext and Planner,
- then executes the script with 'P' (Planner instance) bound in globals. Returns Planner with Journal.
- Notes:
- - Defaults are intentionally spartan; config should refine them via P.set_context(...).
- - This is UNPRIVILEGED; no filesystem changes are performed here.
- """
- read_rel = config_abs_fpath.resolve().relative_to(stage_root_dpath.resolve())
- ctx = PlannerContext.from_values(
- stage_root_dpath=stage_root_dpath
- ,read_file_rel_fpath=read_rel
- ,write_file_dpath_str="/"
- ,write_file_fname_str="."
- ,owner_name_str=getpass.getuser()
- ,perm=0o644
- ,content=None
- )
- P = Planner(ctx)
- g = {"Planner": Planner ,"PlannerContext": PlannerContext ,"P": P}
- runpy.run_path(str(config_abs_fpath) ,init_globals=g)
- return P
-
-def _aggregate_journal(planners_list: list[Planner] ,stage_root_dpath: Path)-> Journal:
- "Given planners and stage root. Does aggregate Commands into a single Journal with meta. Returns Journal."
- J = Journal()
- J.set_meta(
- version_int=1
- ,generator_prog_str="plan_show.py"
- ,generated_at_utc_str=iso_utc_now_str()
- ,user_name_str=getpass.getuser()
- ,host_name_str=os.uname().nodename if hasattr(os ,"uname") else "unknown"
- ,stage_root_dpath_str=str(stage_root_dpath.resolve())
- ,configs_list=[p.context().read_file_rel_fpath.as_posix() for p in planners_list]
- )
- for p in planners_list:
- for cmd in p.journal().commands_list:
- J.append(cmd)
- return J
-
-def _print_plan(journal: Journal)-> None:
- "Given Journal. Does print a readable summary. Returns None."
- meta = journal.meta_map or {}
- print(f"Stage: {meta.get('stage_root_dpath_str','?')}")
- print(f"Generated: {meta.get('generated_at_utc_str','?')} by {meta.get('user_name_str','?')}@{meta.get('host_name_str','?')}\n")
-
- entries = journal.commands_list
- if not entries:
- print("(plan is empty)")
- return
-
- n_copy = sum(1 for c in entries if c.name_str=="copy")
- n_disp = sum(1 for c in entries if c.name_str=="displace")
- n_del = sum(1 for c in entries if c.name_str=="delete")
- print(f"Entries: {len(entries)} copy:{n_copy} displace:{n_disp} delete:{n_del}\n")
-
- for i ,cmd in enumerate(entries ,1):
- am = cmd.args_map
- dst = _dst_path_str(am)
- if cmd.name_str == "copy":
- size = len(am.get("content_bytes") or b"")
- mode = am.get("mode_int")
- owner = am.get("owner_name_str")
- print(f"{i:02d}. copy -> {dst} mode {mode:04o} owner {owner} bytes {size} ({human_size(size)})")
- elif cmd.name_str == "displace":
- print(f"{i:02d}. displace -> {dst}")
- elif cmd.name_str == "delete":
- print(f"{i:02d}. delete -> {dst}")
- else:
- print(f"{i:02d}. ?op? -> {dst}")
-
-def _maybe_emit_CBOR(journal: Journal ,emit_CBOR_fpath: Path|None)-> None:
- "Given Journal and optional path. Does write CBOR if requested. Returns None."
- if not emit_CBOR_fpath:
- return
- try:
- data = journal.to_CBOR_bytes(canonical_bool=True)
- except Exception as e:
- print(f"error: CBOR encode failed: {e}" ,file=sys.stderr)
- raise
- emit_CBOR_fpath.parent.mkdir(parents=True ,exist_ok=True)
- with open(emit_CBOR_fpath ,"wb") as fh:
- fh.write(data)
- print(f"\nWrote CBOR plan: {emit_CBOR_fpath} ({len(data)} bytes)")
-
-# ===== CLI =====
-
-def main(argv: list[str]|None=None)-> int:
- "Given CLI. Does discover configs, build plan, (optionally) CBOR round-trip before printing, run WF, optionally emit CBOR. Returns exit code."
- ap = argparse.ArgumentParser(prog="plan_show.py"
- ,description="Build and show a staged plan (no privilege, no apply).")
- ap.add_argument("--stage",default="stage",help="stage directory root (default: ./stage)")
- ap.add_argument("--glob",default="**/*.stage.py",help="glob for config scripts under --stage")
- ap.add_argument("--emit-CBOR",default=None,help="write CBOR plan to this path (optional)")
- ap.add_argument("--print-from-journal",action="store_true"
- ,help="print directly from in-memory Journal (skip CBOR round-trip)")
- args = ap.parse_args(argv)
-
- stage_root_dpath = Path(args.stage)
- if not stage_root_dpath.is_dir():
- print(f"error: --stage not a directory: {stage_root_dpath}" ,file=sys.stderr)
- return 2
-
- configs = find_configs(stage_root_dpath ,args.glob)
- if not configs:
- print("No config scripts found.")
- return 0
-
- planners: list[Planner] = []
- for cfg in configs:
- try:
- planners.append(_run_one_config(cfg ,stage_root_dpath))
- except SystemExit:
- raise
- except Exception as e:
- print(f"error: executing {cfg}: {e}" ,file=sys.stderr)
- return 2
-
- journal_src = _aggregate_journal(planners ,stage_root_dpath)
-
- if not args.print_from_journal:
- try:
- cbor_bytes = journal_src.to_CBOR_bytes(canonical_bool=True)
- journal = Journal.from_CBOR_bytes(cbor_bytes)
- except Exception as e:
- print(f"error: CBOR round-trip failed: {e}" ,file=sys.stderr)
- return 2
- else:
- journal = journal_src
-
- _print_plan(journal)
-
- errs = wf_check(journal)
- if errs:
- print("\nerror(s):" ,file=sys.stderr)
- for e in errs:
- print(f" - {e}" ,file=sys.stderr)
- return 2
-
- emit = Path(args.emit_CBOR) if args.emit_CBOR else None
- if emit:
- try:
- data = (cbor_bytes if not args.print_from_journal else journal_src.to_CBOR_bytes(canonical_bool=True))
- emit.parent.mkdir(parents=True ,exist_ok=True)
- with open(emit ,"wb") as fh:
- fh.write(data)
- print(f"\nWrote CBOR plan: {emit} ({len(data)} bytes)")
- except Exception as e:
- print(f"error: failed to write CBOR: {e}" ,file=sys.stderr)
- return 2
-
- return 0
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-stage_test_0/
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-stage_cp.py — build a CBOR plan from staged configs; show, validate, and apply with privilege.
-
-Given: a stage root directory.
-Does: (user) run configs → build native plan → WF checks → summarize → encode plan → sudo re-exec
- (root) decode plan → VALID + SANITY → apply ops (displace/copy/delete) safely.
-Returns: exit code.
-
-Requires: pip install cbor2
-"""
-from __future__ import annotations
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,tempfile ,subprocess ,pwd
-from typing import Any
-import cbor2
-
-# ---------- small utils ----------
-
-def _load_stage_module(stage_root_dpath: Path):
- "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
- mod_fpath = stage_root_dpath/"Stage.py"
- if not mod_fpath.exists():
- raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
- spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
- mod = importlib.util.module_from_spec(spec)
- sys.modules["Stage"] = mod
- assert spec and spec.loader
- spec.loader.exec_module(mod) # type: ignore
- return mod
-
-def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
- "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
- rel_fpath_list: list[Path] = []
- for p in stage_root_dpath.rglob("*.py"):
- if p.name == "Stage.py": continue
- if p.is_file():
- rel_fpath_list.append(p.relative_to(stage_root_dpath))
- return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
-
-def _sha256_bytes(b: bytes)-> bytes:
- "Given: bytes. Does: sha256. Returns: 32-byte digest."
- return hashlib.sha256(b).digest()
-
-def _dst_fpath_str(dst_dpath_str: str ,dst_fname_str: str)-> str:
- "Given: a directory path string and a filename string. Does: join. Returns: combined POSIX path string."
- if "/" in dst_fname_str:
- return "" # invalid; WF will flag
- return str((Path(dst_dpath_str)/dst_fname_str))
-
-# ---------- WF / VALID / SANITY ----------
-
-_ALLOWLIST_PREFIXES_LIST = ["/etc" ,"/usr/local" ,"/etc/systemd/system"]
-
-def wf_check(plan_map: dict[str,Any])-> list[str]:
- "Given: plan map. Does: shape/lexical checks only. Returns: list of error strings."
- errs_list: list[str] = []
- if plan_map.get("version_int") != 1:
- errs_list.append("WF_VERSION: unsupported plan version")
- entries_list = plan_map.get("entries_list")
- if not isinstance(entries_list ,list):
- errs_list.append("WF_ENTRIES: 'entries_list' missing or not a list")
- return errs_list
- for i ,e_map in enumerate(entries_list ,1):
- op = e_map.get("op")
- dst_dpath_str = e_map.get("dst_dpath")
- dst_fname_str = e_map.get("dst_fname")
- where = f"entry {i}"
- if op not in ("copy","displace","delete"):
- errs_list.append(f"WF_OP:{where}: invalid op {op!r}")
- continue
- if not isinstance(dst_dpath_str ,str) or not dst_dpath_str:
- errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath missing or not str")
- if not isinstance(dst_fname_str ,str) or not dst_fname_str:
- errs_list.append(f"WF_DST_FNAME:{where}: dst_fname missing or not str")
- if isinstance(dst_fname_str ,str) and "/" in dst_fname_str:
- errs_list.append(f"WF_DST_FNAME:{where}: dst_fname must not contain '/'")
- if isinstance(dst_dpath_str ,str) and not dst_dpath_str.startswith("/"):
- errs_list.append(f"WF_DST_DPATH:{where}: dst_dpath must be absolute")
- full_fpath_str = _dst_fpath_str(dst_dpath_str or "" ,dst_fname_str or "")
- if not full_fpath_str or not full_fpath_str.startswith("/"):
- errs_list.append(f"WF_PATH:{where}: failed to construct absolute path from dst_dpath/fname")
- if op == "copy":
- mode_int = e_map.get("mode_int")
- if not isinstance(mode_int ,int) or not (0 <= mode_int <= 0o7777):
- errs_list.append(f"WF_MODE:{where}: mode_int must be int in [0..0o7777]")
- if isinstance(mode_int ,int) and (mode_int & 0o6000):
- errs_list.append(f"WF_MODE:{where}: suid/sgid bits not allowed in MVP")
- owner_name = e_map.get("owner_name")
- if not isinstance(owner_name ,str) or not owner_name:
- errs_list.append(f"WF_OWNER:{where}: owner_name must be non-empty username string")
- content_bytes = e_map.get("content_bytes")
- if not (isinstance(content_bytes ,(bytes,bytearray)) and len(content_bytes) >= 0):
- errs_list.append(f"WF_CONTENT:{where}: content_bytes must be bytes (may be empty)")
- sha = e_map.get("sha256_bytes")
- if sha is not None:
- if not isinstance(sha ,(bytes,bytearray)) or len(sha)!=32:
- errs_list.append(f"WF_SHA256:{where}: sha256_bytes must be 32-byte digest if present")
- elif isinstance(content_bytes ,(bytes,bytearray)) and sha != _sha256_bytes(content_bytes):
- errs_list.append(f"WF_SHA256_MISMATCH:{where}: sha256_bytes does not match content_bytes")
- return errs_list
-
-def valid_check(plan_map: dict[str,Any])-> list[str]:
- "Given: plan map. Does: environment (read-only) checks. Returns: list of error strings."
- errs_list: list[str] = []
- for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
- op = e_map.get("op")
- dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
- where = f"entry {i}"
- try:
- parent_dpath = Path(dst_fpath_str).parent
- if not parent_dpath.exists():
- errs_list.append(f"VAL_PARENT_MISSING:{where}: parent dir does not exist: {parent_dpath}")
- elif not parent_dpath.is_dir():
- errs_list.append(f"VAL_PARENT_NOT_DIR:{where}: parent is not a directory: {parent_dpath}")
- if Path(dst_fpath_str).is_dir():
- errs_list.append(f"VAL_DST_IS_DIR:{where}: destination exists as a directory: {dst_fpath_str}")
- if op == "copy":
- owner_name = e_map.get("owner_name")
- try:
- pw = pwd.getpwnam(owner_name) # may raise KeyError
- e_map["_resolved_uid_int"] = pw.pw_uid
- e_map["_resolved_gid_int"] = pw.pw_gid
- except Exception:
- errs_list.append(f"VAL_OWNER_UNKNOWN:{where}: user not found: {owner_name!r}")
- except Exception as x:
- errs_list.append(f"VAL_EXCEPTION:{where}: {x}")
- return errs_list
-
-def sanity_check(plan_map: dict[str,Any])-> list[str]:
- "Given: plan map. Does: policy checks (allowlist, denials). Returns: list of error strings."
- errs_list: list[str] = []
- for i ,e_map in enumerate(plan_map.get("entries_list",[]) ,1):
- dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname",""))
- where = f"entry {i}"
- if not any(dst_fpath_str.startswith(pref + "/") or dst_fpath_str==pref for pref in _ALLOWLIST_PREFIXES_LIST):
- errs_list.append(f"POL_PATH_DENY:{where}: destination outside allowlist: {dst_fpath_str}")
- return errs_list
-
-# ---------- APPLY (root) ----------
-
-def _utc_str()-> str:
- "Given: n/a. Does: current UTC compact. Returns: string."
- import datetime as _dt
- return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def _ensure_parent_dirs(dst_fpath: Path)-> None:
- "Given: destination file path. Does: create parents. Returns: None."
- dst_fpath.parent.mkdir(parents=True ,exist_ok=True)
-
-def _displace_in_place(dst_fpath: Path)-> None:
- "Given: destination file path. Does: rename existing file/symlink to add UTC suffix. Returns: None."
- try:
- if dst_fpath.exists() or dst_fpath.is_symlink():
- suffix = "_" + _utc_str()
- dst_fpath.rename(dst_fpath.with_name(dst_fpath.name + suffix))
- except FileNotFoundError:
- pass
-
-def _apply_copy(dst_fpath: Path ,content_bytes: bytes ,mode_int: int ,uid_int: int ,gid_int: int)-> None:
- "Given: target, bytes, mode, uid, gid. Does: write temp, fsync, chmod/chown, atomic replace. Returns: None."
- _ensure_parent_dirs(dst_fpath)
- _displace_in_place(dst_fpath)
- tmp_fpath = dst_fpath.with_name("." + dst_fpath.name + ".stage_tmp")
- with open(tmp_fpath ,"wb") as fh:
- fh.write(content_bytes)
- fh.flush()
- os.fsync(fh.fileno())
- try:
- os.chmod(tmp_fpath ,mode_int & 0o777)
- except Exception:
- pass
- try:
- os.chown(tmp_fpath ,uid_int ,gid_int)
- except Exception:
- pass
- os.replace(tmp_fpath ,dst_fpath) # atomic within same dir/device
-
-def _apply_delete(dst_fpath: Path)-> None:
- "Given: target file path. Does: unlink file/symlink if present. Returns: None."
- try:
- if dst_fpath.is_symlink() or dst_fpath.is_file():
- dst_fpath.unlink()
- except FileNotFoundError:
- pass
-
-def apply_plan(plan_map: dict[str,Any] ,dry_run_bool: bool=False)-> int:
- "Given: plan map and dry flag. Does: execute ops sequentially. Returns: exit code."
- for i ,e_map in enumerate(plan_map.get("entries_list") or [] ,1):
- op = e_map.get("op")
- dst_fpath = Path(_dst_fpath_str(e_map.get("dst_dpath","/") ,e_map.get("dst_fname","")))
- if op == "displace":
- print(f"+ displace {dst_fpath}")
- if not dry_run_bool:
- _displace_in_place(dst_fpath)
- elif op == "delete":
- print(f"+ delete {dst_fpath}")
- if not dry_run_bool:
- _apply_delete(dst_fpath)
- elif op == "copy":
- mode_int = e_map.get("mode_int") or 0o644
- uid_int = e_map.get("_resolved_uid_int" ,0)
- gid_int = e_map.get("_resolved_gid_int" ,0)
- content_bytes = e_map.get("content_bytes") or b""
- print(f"+ copy {dst_fpath} mode {mode_int:04o} uid {uid_int} gid {gid_int} bytes {len(content_bytes)}")
- if not dry_run_bool:
- _apply_copy(dst_fpath ,content_bytes ,mode_int ,uid_int ,gid_int)
- else:
- print(f"! unknown op {op} (skipping)")
- return 2
- return 0
-
-# ---------- orchestration ----------
-
-def _build_plan_unpriv(stage_root_dpath: Path)-> dict[str,Any]:
- "Given: stage root. Does: execute configs, accumulate entries, add sha256. Returns: plan map."
- StageMod = _load_stage_module(stage_root_dpath)
- Stage = StageMod.Stage
- Stage._reset()
- Stage.set_meta(
- planner_user_name=getpass.getuser()
- ,planner_uid_int=os.getuid()
- ,planner_gid_int=os.getgid()
- ,host_name=socket.gethostname()
- ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
- )
- for rel_fpath in _config_rel_fpaths(stage_root_dpath):
- Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
- runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
- Stage._end()
- for e_map in Stage.plan_entries():
- if e_map.get("op") == "copy" and isinstance(e_map.get("content_bytes") ,(bytes,bytearray)):
- e_map["sha256_bytes"] = _sha256_bytes(e_map["content_bytes"])
- return Stage.plan_object()
-
-def _sudo_apply_self(plan_fpath: Path ,dry_run_bool: bool)-> int:
- "Given: plan file path and dry flag. Does: sudo re-exec current script with --apply. Returns: exit code."
- cmd_list = ["sudo",sys.executable,os.path.abspath(__file__),"--apply"
- ,"--plan",str(plan_fpath)]
- if dry_run_bool:
- cmd_list.append("--dry-run")
- return subprocess.call(cmd_list)
-
-def main(argv: list[str]|None=None)-> int:
- "Given: CLI. Does: plan, WF (user) then VALID+SANITY+APPLY (root). Returns: exit code."
- ap = argparse.ArgumentParser(prog="stage_cp.py"
- ,description="Plan staged config application and apply with sudo.")
- ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
- ap.add_argument("--dry-run",action="store_true",help="validate and show actions, do not change files")
- ap.add_argument("--apply",action="store_true",help=argparse.SUPPRESS) # internal (root path)
- ap.add_argument("--plan",default=None,help=argparse.SUPPRESS) # internal (root path)
- args = ap.parse_args(argv)
-
- # Root path (apply)
- if args.apply:
- if os.geteuid() != 0:
- print("error: --apply requires root" ,file=sys.stderr)
- return 2
- if not args.plan:
- print("error: --plan path required for --apply" ,file=sys.stderr)
- return 2
- with open(args.plan ,"rb") as fh:
- plan_map = cbor2.load(fh)
- val_errs = valid_check(plan_map)
- pol_errs = sanity_check(plan_map)
- if val_errs or pol_errs:
- print("error(s) during validation/sanity:" ,file=sys.stderr)
- for e in val_errs: print(f" - {e}" ,file=sys.stderr)
- for e in pol_errs: print(f" - {e}" ,file=sys.stderr)
- return 2
- rc = apply_plan(plan_map ,dry_run_bool=args.dry_run)
- return rc
-
- # User path (plan + summarize + escalate)
- stage_root_dpath = Path(args.stage)
- plan_map = _build_plan_unpriv(stage_root_dpath)
-
- entries_list = plan_map.get("entries_list" ,[])
- print(f"Built plan with {len(entries_list)} entr{'y' if len(entries_list)==1 else 'ies'}")
-
- total_bytes_int = sum(len(e_map.get("content_bytes") or b"")
- for e_map in entries_list if e_map.get("op")=="copy")
- print(f"Total bytes to write: {total_bytes_int}")
- if args.dry_run:
- print("\n--dry-run: would perform the following:")
-
- for i ,e_map in enumerate(entries_list ,1):
- op = e_map.get("op")
- dst_fpath_str = _dst_fpath_str(e_map.get("dst_dpath") ,e_map.get("dst_fname"))
- if op=="copy":
- mode_int = e_map.get("mode_int") or 0o644
- owner_name = e_map.get("owner_name") or "?"
- size = len(e_map.get("content_bytes") or b"")
- print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode_int:04o} owner {owner_name} bytes {size}")
- elif op=="displace":
- print(f"{i:02d}. displace -> {dst_fpath_str}")
- elif op=="delete":
- print(f"{i:02d}. delete -> {dst_fpath_str}")
- else:
- print(f"{i:02d}. ?op? -> {dst_fpath_str}")
-
- with tempfile.NamedTemporaryFile(prefix="plan_" ,suffix=".cbor" ,delete=False) as tf:
- cbor2.dump(plan_map ,tf)
- plan_fpath = Path(tf.name)
- try:
- if args.dry_run:
- return _sudo_apply_self(plan_fpath ,dry_run_bool=True)
- ans = input("\nProceed with apply under sudo? [y/N] ").strip().lower()
- if ans not in ("y","yes"):
- print("Aborted.")
- return 0
- return _sudo_apply_self(plan_fpath ,dry_run_bool=False)
- finally:
- try: os.unlink(plan_fpath)
- except Exception: pass
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-table inet NO-IPV6 {
- chain input {
- type filter hook input priority raw; policy accept;
- meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop
- }
-
- chain output {
- type filter hook output priority raw; policy accept;
- meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop
- }
-
- chain forward {
- type filter hook forward priority raw; policy accept;
- meta nfproto ipv6 counter comment "drop all IPv6 forward" drop
- }
-}
+++ /dev/null
-table inet SUBU-DNS-REDIRECT {
- chain output {
- type nat hook output priority -100; policy accept;
-
- # Redirect DNS for the subu UIDs to local Unbound listeners
- meta skuid 2017 udp dport 53 redirect to :5301
- meta skuid 2018 udp dport 53 redirect to :5302
- meta skuid 2017 tcp dport 53 redirect to :5301
- meta skuid 2018 tcp dport 53 redirect to :5302
- }
-}
-
-table inet SUBU-PORT-EGRESS {
- chain output {
- type filter hook output priority 0; policy accept;
-
- # Always allow loopback on egress
- oifname "lo" accept
-
- # No IPv6 for subu (until you reintroduce v6)
- meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop
-
- ##### x6 (UID 2018)
- # Block some exfil channels regardless of iface
- meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop
- meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop
- meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop
-
- # (Optional) allow ICMP echo out via x6
- meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept
-
- # Enforce interface binding
- meta skuid 2018 oifname "x6" accept
- meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop
-
- ##### US (UID 2017)
- meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission"
- meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN"
- meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)"
-
- # (Optional) ICMP via US
- meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept
-
- meta skuid 2017 oifname "US" accept
- meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop
- }
-}
+++ /dev/null
-[Unit]
-Description=Unbound DNS instance for %i (per-subu tunnel egress)
-After=network-online.target wg-quick@%i.service
-Requires=wg-quick@%i.service
-Wants=network-online.target
-
-[Service]
-Type=simple
-ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf
-User=unbound
-Group=unbound
-Restart=on-failure
-RestartSec=2s
-AmbientCapabilities=CAP_NET_BIND_SERVICE
-CapabilityBoundingSet=CAP_NET_BIND_SERVICE
-NoNewPrivileges=true
-
-[Install]
-WantedBy=multi-user.target
+++ /dev/null
-server:
- username: "unbound"
- chroot: ""
- directory: "/etc/unbound"
- do-daemonize: no
- interface: 127.0.0.1@5301
- hide-identity: yes
- hide-version: yes
- harden-glue: yes
- harden-dnssec-stripped: yes
- qname-minimisation: yes
- prefetch: yes
- outgoing-interface: 10.0.0.1
-
-forward-zone:
- name: "."
- forward-addr: 1.1.1.1
- forward-addr: 1.0.0.1
+++ /dev/null
-server:
- username: "unbound"
- chroot: ""
- directory: "/etc/unbound"
- do-daemonize: no
- interface: 127.0.0.1@5302
- hide-identity: yes
- hide-version: yes
- harden-glue: yes
- harden-dnssec-stripped: yes
- qname-minimisation: yes
- prefetch: yes
- outgoing-interface: 10.8.0.2
-
-forward-zone:
- name: "."
- forward-addr: 1.1.1.1
- forward-addr: 1.0.0.1
+++ /dev/null
-#!/usr/bin/env bash
-set -euo pipefail
-echo "== DNS status =="
-systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true
-echo
-echo "== nftables =="
-nft list table inet NAT-DNS-REDIRECT || true
-echo
-echo "== Unbound logs (last 50 lines each) =="
-journalctl -u unbound@US -n 50 --no-pager || true
-echo
-journalctl -u unbound@x6 -n 50 --no-pager || true
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-stage_show_plan.py — run staged configs (UNPRIVILEGED) and print the plan.
-
-Given: a stage root directory.
-Does: loads Stage.py, executes each config, builds a native plan map, summarizes it.
-Returns: exit code 0 on success, non-zero on error.
-"""
-from __future__ import annotations
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse ,importlib.util ,runpy ,socket ,getpass ,time ,hashlib
-
-# ---------- helpers ----------
-
-def _load_stage_module(stage_root_dpath: Path):
- "Given: stage root path. Does: load Stage.py as module 'Stage'. Returns: module."
- mod_fpath = stage_root_dpath/"Stage.py"
- if not mod_fpath.exists():
- raise FileNotFoundError(f"Stage.py not found at {mod_fpath}")
- spec = importlib.util.spec_from_file_location("Stage" ,str(mod_fpath))
- mod = importlib.util.module_from_spec(spec)
- sys.modules["Stage"] = mod
- assert spec and spec.loader
- spec.loader.exec_module(mod) # type: ignore
- return mod
-
-def _config_rel_fpaths(stage_root_dpath: Path)-> list[Path]:
- "Given: stage root. Does: collect *.py (excluding Stage.py) as relative file paths. Returns: list[Path]."
- rel_fpath_list: list[Path] = []
- for p in stage_root_dpath.rglob("*.py"):
- if p.name == "Stage.py": continue
- if p.is_file():
- rel_fpath_list.append(p.relative_to(stage_root_dpath))
- return sorted(rel_fpath_list ,key=lambda x: x.as_posix())
-
-def _sha256_hex(b: bytes)-> str:
- "Given: bytes. Does: sha256. Returns: hex string."
- return hashlib.sha256(b).hexdigest()
-
-# ---------- main ----------
-
-def main(argv: list[str]|None=None)-> int:
- "Given: CLI. Does: show plan. Returns: exit code."
- ap = argparse.ArgumentParser(prog="stage_show_plan.py"
- ,description="Run staged config scripts and print the resulting plan.")
- ap.add_argument("--stage",default="stage",help="stage directory (default: ./stage)")
- args = ap.parse_args(argv)
-
- stage_root_dpath = Path(args.stage)
- StageMod = _load_stage_module(stage_root_dpath)
- Stage = StageMod.Stage
- Stage._reset()
- Stage.set_meta(
- planner_user_name=getpass.getuser()
- ,planner_uid_int=os.getuid()
- ,planner_gid_int=os.getgid()
- ,host_name=socket.gethostname()
- ,created_utc_str=time.strftime("%Y-%m-%dT%H:%M:%SZ",time.gmtime())
- )
-
- for rel_fpath in _config_rel_fpaths(stage_root_dpath):
- Stage._begin(read_rel_fpath=rel_fpath ,stage_root_dpath=stage_root_dpath)
- runpy.run_path(str(stage_root_dpath/rel_fpath) ,run_name="__main__")
- Stage._end()
-
- plan_map = Stage.plan_object()
- entries_list = plan_map["entries_list"]
- print(f"Plan version: {plan_map['version_int']}")
- print(f"Planner: {plan_map['meta_map'].get('planner_user_name')}@{plan_map['meta_map'].get('host_name')} "
- f"UID:{plan_map['meta_map'].get('planner_uid_int')} GID:{plan_map['meta_map'].get('planner_gid_int')}")
- print(f"Created: {plan_map['meta_map'].get('created_utc_str')}")
- print(f"Entries: {len(entries_list)}\n")
-
- for i ,e_map in enumerate(entries_list ,1):
- op = e_map.get("op")
- dst_fpath_str = f"{e_map.get('dst_dpath')}/{e_map.get('dst_fname')}"
- if op == "copy":
- content = e_map.get("content_bytes") or b""
- sz = len(content)
- mode = e_map.get("mode_octal_str") or "????"
- owner = e_map.get("owner_name") or "?"
- h = _sha256_hex(content)
- print(f"{i:02d}. copy -> {dst_fpath_str} mode {mode} owner {owner} bytes {sz} sha256 {h[:16]}…")
- elif op == "displace":
- print(f"{i:02d}. displace -> {dst_fpath_str}")
- elif op == "delete":
- print(f"{i:02d}. delete -> {dst_fpath_str}")
- else:
- print(f"{i:02d}. ?op? -> {dst_fpath_str} ({op})")
- return 0
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-Thomas-developer 0x444 . stage_test_0_out
\ No newline at end of file
+++ /dev/null
-Thomas-developer 0640 . stage_test_0_out
\ No newline at end of file
+++ /dev/null
-Thomas-developer 0444 . stage_test_0_out
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-import Stage
-
-# You can compute these with arbitrary Python if you like
-svc = "unbound"
-zone = "US"
-fname = f"unbound-{zone}.conf"
-
-Stage.init(
- write_file_name="." # '.' → use basename of this file -> 'example_dns.py'
-, write_file_directory_path="/etc/unbound"
-, write_file_owner="root"
-, write_file_permissions=0o644 # or "0644"
-, read_file_contents="""\
-# generated config (example)
-server:
- verbosity: 1
- interface: 127.0.0.1
-"""
-)
-
-# declare the desired operations (no effect in 'noop'/'dry' without a copier)
-Stage.displace()
-Stage.copy()
+++ /dev/null
-Thomas-developer 0444 . stage_test_0_out
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-stage_ls.py — execute staged Python programs with Stage in 'noop' mode and list metadata.
-
-For each *.py under --stage (recursively, excluding Stage.py), this tool:
- 1) loads Stage.py from the stage root,
- 2) switches mode to 'noop' (no side effects, no printing),
- 3) executes the program via runpy.run_path(...) with the proper __file__,
- 4) collects the resolved write_file_* metadata and declared ops,
- 5) prints either list or aligned table,
- 6) reports any collected errors.
-
-This lets admins compute metadata with arbitrary Python while guaranteeing no writes.
-"""
-
-from __future__ import annotations
-
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from dataclasses import dataclass
-from pathlib import Path
-import argparse
-import importlib.util ,runpy
-import traceback
-
-# --- utility dataclass (for printing) ---
-
-@dataclass
-class Row:
- read_rel: Path
- owner: str|None
- perm: str|None
- write_name: str|None
- target_dir: Path|None
- ops: list[str]
- errors: list[str]
-
-# --- helpers ---
-
-def _load_stage_module(stage_root: Path):
- """Load Stage.py from stage_root into sys.modules['Stage'] (overwriting if present). Returns the Stage module."""
- stage_py = stage_root/"Stage.py"
- if not stage_py.exists():
- raise FileNotFoundError(f"Stage.py not found at {stage_py} — place Stage.py in the stage root.")
- spec = importlib.util.spec_from_file_location("Stage" ,str(stage_py))
- if spec is None or spec.loader is None:
- raise RuntimeError(f"cannot load Stage module from {stage_py}")
- mod = importlib.util.module_from_spec(spec)
- sys.modules["Stage"] = mod
- spec.loader.exec_module(mod) # type: ignore[union-attr]
- return mod
-
-def _stage_program_paths(stage_root: Path)-> list[Path]:
- rels: list[Path] = []
- for p in stage_root.rglob("*.py"):
- if p.name == "Stage.py":
- continue
- try:
- if p.is_file():
- rels.append(p.relative_to(stage_root))
- except Exception:
- continue
- return sorted(rels ,key=lambda x: x.as_posix())
-
-def print_list(rows: list[Row])-> None:
- for r in rows:
- owner = r.owner or "?"
- perm = r.perm or "????"
- name = r.write_name or "?"
- tdir = str(r.target_dir) if r.target_dir is not None else "?"
- print(f"{r.read_rel.as_posix()}: {owner} {perm} {name} {tdir}")
-
-def print_table(rows: list[Row])-> None:
- if not rows:
- return
- a = [r.read_rel.as_posix() for r in rows]
- b = [(r.owner or "?") for r in rows]
- c = [(r.perm or "????") for r in rows]
- d = [(r.write_name or "?") for r in rows]
- e = [str(r.target_dir) if r.target_dir is not None else "?" for r in rows]
- wa = max(len(s) for s in a)
- wb = max(len(s) for s in b)
- wc = max(len(s) for s in c)
- wd = max(len(s) for s in d)
- for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
- print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}")
-
-# --- core ---
-
-def ls_stage(stage_root: Path ,fmt: str="list")-> int:
- Stage = _load_stage_module(stage_root)
- Stage.Stage.set_mode("noop") # hard safety for this tool
-
- rows: list[Row] = []
- errs: list[str] = []
-
- for rel in _stage_program_paths(stage_root):
- abs_path = stage_root/rel
- try:
- # isolate per-run state
- Stage.Stage._current = None
- Stage.Stage._all_records.clear()
- Stage.Stage._begin(read_rel=rel ,stage_root=stage_root)
-
- # execute the staged program under its real path
- runpy.run_path(str(abs_path) ,run_name="__main__")
-
- rec = Stage.Stage._end()
- if rec is None:
- errs.append(f"{rel}: program executed but Stage.init(...) was never called")
- continue
-
- rows.append(
- Row(
- read_rel=rel
- , owner=rec.owner
- , perm=rec.perm_octal_str
- , write_name=rec.write_name
- , target_dir=rec.target_dir
- , ops=list(rec.ops)
- , errors=list(rec.errors)
- )
- )
-
- except SystemExit as e:
- errs.append(f"{rel}: program called sys.exit({e.code}) during listing")
- except Exception:
- tb = traceback.format_exc(limit=2)
- errs.append(f"{rel}: exception during execution:\n{tb}")
-
- # print data
- if fmt == "table":
- print_table(rows)
- else:
- print_list(rows)
-
- # print per-row Stage errors
- row_errs = [f"{r.read_rel}: {msg}" for r in rows for msg in r.errors]
- all_errs = row_errs + errs
- if all_errs:
- print("\nerror(s):" ,file=sys.stderr)
- for e in all_errs:
- print(f" - {e}" ,file=sys.stderr)
- return 1
- return 0
-
-# --- CLI ---
-
-def main(argv: list[str] | None=None)-> int:
- import argparse
- ap = argparse.ArgumentParser(
- prog="stage_ls.py"
- , description="Execute staged Python configs with Stage in 'noop' mode and list resolved metadata."
- )
- ap.add_argument("--stage" ,default="stage" ,help="stage directory (default: ./stage)")
- ap.add_argument("--format" ,choices=["list","table"] ,default="list" ,help="output format")
- args = ap.parse_args(argv)
-
- stage_root = Path(args.stage)
- if not stage_root.exists() or not stage_root.is_dir():
- print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
- return 2
-
- return ls_stage(stage_root ,fmt=args.format)
-
-if __name__ == "__main__":
- sys.exit(main())
--- /dev/null
+# example unbound.conf
+def configure(prov, planner, WriteFileMeta):
+ # use current user for owner, and use this script’s py-less name for the filename
+
+ # owner defaults to root (this is a configuration file installer)
+ # owner "." means owner of the process running Stagehane
+ # owner "." is good for testing
+
+ # fname "." means write file has the same name as read file (without .py if it has .py)
+ # fname "." is the default, so it is redundant here. "." still works in args, even when wfm changes the fname.
+
+ wfm = WriteFileMeta(dpath="stage_test_0_out", fname=".", owner=".")
+ planner.displace(wfm)
+ planner.copy(wfm, content="server:\n do-ip6: no\n")
+