Runner-provided, read-only provenance for a single config script.
"""
__slots__ = ("stage_root_dpath","config_abs_fpath","config_rel_fpath",
- "read_dir_dpath","read_fname")
+ "read_dir_dpath","read_fname","process_user")
def __init__(self, *, stage_root: Path, config_path: Path):
+ import getpass
self.stage_root_dpath = stage_root.resolve()
self.config_abs_fpath = config_path.resolve()
try:
except Exception:
self.config_rel_fpath = Path(self.config_abs_fpath.name)
- # Where the config file lives (used to anchor relative write dirs)
self.read_dir_dpath = self.config_abs_fpath.parent
- # “py-less” filename: strip .stage.py, else .py, else keep name
name = self.config_abs_fpath.name
if name.endswith(".stage.py"):
self.read_fname = name[:-len(".stage.py")]
else:
self.read_fname = name
+ # NEW: owner of the StageHand process
+ self.process_user = getpass.getuser()
+
def print(self, *, file=None) -> None:
- """
- Given: optional file-like (defaults to stdout).
- Does: print a readable, multi-line summary of provenance.
- Returns: None.
- """
if file is None:
import sys as _sys
file = _sys.stdout
-
print(f"Stage root: {self.stage_root_dpath}", file=file)
print(f"Config (rel): {self.config_rel_fpath.as_posix()}", file=file)
print(f"Config (abs): {self.config_abs_fpath}", file=file)
print(f"Read dir: {self.read_dir_dpath}", file=file)
print(f"Read fname: {self.read_fname}", file=file)
-
-
+ print(f"Process user: {self.process_user}", file=file) # NEW
# ===== Admin-facing defaults carrier =====
def __init__(self
,*
,dpath="/"
- ,fname=None # None or "." → let Planner resolve (provenance fallback)
- ,owner="root" # "." → current process user (resolved by Planner)
+ ,fname=None # None → let Planner/provenance choose
+ ,owner="root"
,mode=0o444
,content=None
):
self.dpath_str = norm_dpath_str(dpath)
- # keep "." as a sentinel; otherwise validate the filename
- if fname == ".":
- self.fname = "."
- else:
- self.fname = norm_fname_or_none(fname)
-
- # keep "." as a sentinel; otherwise normalize owner
- if owner == ".":
- self.owner_name_str = "."
- else:
- self.owner_name_str = norm_nonempty_owner(owner)
-
+ self.fname = norm_fname_or_none(fname) # '.' no longer special → None
+ self.owner_name_str = norm_nonempty_owner(owner) # '.' rejected → None
self.mode_int, self.mode_octal_str = parse_mode(mode)
- # content_'bytes' due to UTF8 encoding
- self.content_bytes = norm_content_bytes(content)
+ self.content_bytes = norm_content_bytes(content)
def print(self, *, label: str | None = None, file=None) -> None:
"""
# --- defaults management / access ---
+ # in Planner.py, inside class Planner
+ def set_provenance(self, prov: PlanProvenance) -> None:
+ """Switch the current provenance used for fallbacks & per-command provenance tagging."""
+ self._prov = prov
+
def set_defaults(self ,defaults: WriteFileMeta)-> None:
"Given WriteFileMeta. Does replace planner defaults. Returns None."
self._defaults = defaults
"Given three sources. Does pick first non-None. Returns value or None."
return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr)
- # Planner.py (inside Planner)
def _resolve_write_file(self, wfm, dpath, fname) -> tuple[str|None, str|None]:
- # normalize explicit kwargs (allow "." to pass through untouched)
dpath_str = norm_dpath_str(dpath) if dpath is not None else None
- if fname is not None and fname != ".":
- fname = norm_fname_or_none(fname)
+ fname = norm_fname_or_none(fname) if fname is not None else None
dpath_val = self._pick(dpath_str, (wfm.dpath_str if wfm else None), self._defaults.dpath_str)
fname_val = self._pick(fname, (wfm.fname if wfm else None), self._defaults.fname)
- # final fallback for filename: "." or None → derive from config name
- if fname_val == "." or fname_val is None:
+ # final fallback for filename: derive from config name
+ if fname_val is None:
fname_val = self._prov.read_fname
# anchor relative dpaths against the config’s directory
,mode: int|str|None
,content: bytes|str|None
)-> tuple[str|None ,tuple[int|None ,str|None] ,bytes|None]:
- owner_norm = norm_nonempty_owner(owner) if (owner is not None and owner != ".") else owner
- mode_norm = parse_mode(mode) if mode is not None else (None, None)
+ owner_norm = norm_nonempty_owner(owner) if owner is not None else None
+ mode_norm = parse_mode(mode) if mode is not None else (None ,None)
content_b = norm_content_bytes(content) if content is not None else None
owner_v = self._pick(owner_norm, (wfm.owner_name_str if wfm else None), self._defaults.owner_name_str)
-
- # resolve "." → current process user
- if owner_v == ".":
- owner_v = getpass.getuser()
-
- mode_v = (mode_norm if mode_norm != (None, None) else
- ((wfm.mode_int, wfm.mode_octal_str) if wfm else (self._defaults.mode_int, self._defaults.mode_octal_str)))
- content_v = self._pick(content_b, (wfm.content_bytes if wfm else None), self._defaults.content_bytes)
- return owner_v, mode_v, content_v
+ mode_v = (mode_norm if mode_norm != (None ,None) else
+ ((wfm.mode_int ,wfm.mode_octal_str) if wfm else (self._defaults.mode_int ,self._defaults.mode_octal_str)))
+ content_v = self._pick(content_b ,(wfm.content_bytes if wfm else None) ,self._defaults.content_bytes)
+ return owner_v ,mode_v ,content_v
def print(self, *, show_journal: bool = True, file=None) -> None:
"""
"""
executor.py — StageHand outer/inner executor (MVP; UNPRIVILEGED for now)
+Phase 0 (bootstrap):
+ - Ensure filter program exists (create default in CWD if --filter omitted)
+ - Validate --stage exists
+ - If --phase-0-then-stop: exit here (no scan, no execution)
+
Phase 1 (outer):
- - Build a combined plan by executing each config's `configure(prov, planner, WriteFileMeta)`.
- - Optionally print the plan via Planner.print().
- - Optionally stop.
+ - Discover every file under --stage; acceptance filter decides which to include
+ - Execute each config’s configure(prov, planner, WriteFileMeta) into ONE Planner
+ - Optionally print the planner; optionally stop
Phase 2 (inner shim in same program for now; no privilege yet):
- - Encode combined plan to CBOR and pass to inner path.
- - Inner decodes back to a Journal and optionally prints it.
- - Optionally stop.
-
-Discovery:
- - --stage (default: ./stage) points at the stage directory root.
- - By default, *every file* under --stage (recursively) is executed as a config,
- regardless of extension. Editors can still use .py for highlighting; we strip
- only a trailing ".py" to derive prov.read_fname.
-
+ - Encode plan to CBOR and hand to inner path
+ - Inner decodes to a Journal and can print it
"""
from __future__ import annotations
import runpy
import subprocess
import datetime as _dt
-import os, fnmatch, stat
-
+import stat
# Local module: Planner.py (same directory)
from Planner import (
Planner, PlanProvenance, WriteFileMeta, Journal, Command,
)
+# -------- default filter template (written to CWD when --filter not provided) --------
+
+DEFAULT_FILTER_FILENAME = "stagehand_filter.py"
+
+DEFAULT_FILTER_SOURCE = """# StageHand acceptance filter (default template)
+# Return True to include a config file, False to skip it.
+# You receive a PlanProvenance object named `prov`.
+#
+# prov fields commonly used here:
+# prov.stage_root_dpath : Path → absolute path to the stage root
+# prov.config_abs_fpath : Path → absolute path to the candidate file
+# prov.config_rel_fpath : Path → path relative to the stage root
+# prov.read_dir_dpath : Path → directory of the candidate file
+# prov.read_fname : str → filename with trailing '.py' stripped (if present)
+#
+# Examples:
+#
+# 1) Accept everything (default behavior):
+# def accept(prov):
+# return True
+#
+# 2) Only accept configs in a 'dns/' namespace under the stage:
+# def accept(prov):
+# return prov.config_rel_fpath.as_posix().startswith("dns/")
+#
+# 3) Exclude editor backup files:
+# def accept(prov):
+# rel = prov.config_rel_fpath.as_posix()
+# return not (rel.endswith("~") or rel.endswith(".swp"))
+#
+# 4) Only accept Python files + a few non-Python names:
+# def accept(prov):
+# name = prov.config_abs_fpath.name
+# return name.endswith(".py") or name in {"hosts", "resolv.conf"}
+#
+# Choose ONE 'accept' definition. Below is the default:
+
+def accept(prov):
+ return True
+"""
+
# -------- utilities --------
def iso_utc_now_str() -> str:
return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-def _split_globs(glob_arg: str) -> list[str]:
- parts = [g.strip() for g in (glob_arg or "").split(",") if g.strip()]
- # Default includes both deep and top-level files
- return parts or ["**/*", "*"]
-
-def find_config_paths(stage_root: Path, glob_arg: str) -> list[Path]:
- """
- Given stage root and comma-glob string, return sorted list of files (regular or symlink).
- Defaults to match ALL files under stage, including top-level ones.
- """
- root = stage_root.resolve()
- patterns = _split_globs(glob_arg)
- out: set[Path] = set()
-
- for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
- # (optional) prune symlinked dirs to avoid cycles; files can still be symlinks
- dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath, d))]
-
- for fname in filenames:
- f_abs = Path(dirpath, fname)
- rel = f_abs.relative_to(root).as_posix()
- if any(fnmatch.fnmatch(rel, pat) for pat in patterns):
- try:
- st = f_abs.lstat()
- if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
- out.add(f_abs)
- except Exception:
- # unreadable/broken entries are skipped
- pass
-
- return sorted(out, key=lambda p: p.as_posix())
-
-
-
-def _run_one_config(config_path: Path, stage_root: Path) -> Planner:
- """Execute a single config's `configure(prov, planner, WriteFileMeta)` and return that config's Planner."""
- prov = PlanProvenance(stage_root=stage_root, config_path=config_path)
- per_planner = Planner(provenance=prov) # defaults derive from this file's provenance
- env = runpy.run_path(str(config_path))
- fn = env.get("configure")
+def _ensure_filter_file(filter_arg: str|None) -> Path:
+ """
+ If --filter is provided, return that path (must exist).
+ Otherwise, create ./stagehand_filter.py in the CWD if missing (writing a helpful template),
+ and return its path.
+ """
+ if filter_arg:
+ p = Path(filter_arg)
+ if not p.is_file():
+ raise RuntimeError(f"--filter file not found: {p}")
+ return p
+
+ p = Path.cwd() / DEFAULT_FILTER_FILENAME
+ if not p.exists():
+ try:
+ p.write_text(DEFAULT_FILTER_SOURCE, encoding="utf-8")
+ print(f"(created default filter at {p})")
+ except Exception as e:
+ raise RuntimeError(f"failed to create default filter {p}: {e}")
+ return p
+
+def _load_accept_func(filter_path: Path):
+ env = runpy.run_path(str(filter_path))
+ fn = env.get("accept")
if not callable(fn):
- raise RuntimeError(f"{config_path}: missing callable configure(prov, planner, WriteFileMeta)")
- fn(prov, per_planner, WriteFileMeta)
- return per_planner
-
-def _aggregate_into_master(stage_root: Path, planners: list[Planner]) -> Planner:
- """Create a master Planner and copy all Commands from per-config planners into it."""
- # Synthetic provenance for the master planner (used only for display/meta)
- fake_config = stage_root / "(aggregate).py"
- master = Planner(PlanProvenance(stage_root=stage_root, config_path=fake_config))
-
- # annotate meta
- master.journal().set_meta(
+ raise RuntimeError(f"{filter_path}: missing callable 'accept(prov)'")
+ return fn
+
+def _walk_all_files(stage_root: Path):
+ """
+ Yield every file (regular or symlink) under stage_root recursively.
+ We do not follow symlinked directories to avoid cycles.
+ """
+ root = stage_root.resolve()
+ for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
+ # prune symlinked dirs (files can still be symlinks)
+ dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(dirpath, d))]
+ for fname in filenames:
+ p = Path(dirpath, fname)
+ try:
+ st = p.lstat()
+ if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
+ yield p.resolve()
+ except Exception:
+ # unreadable/broken entries skipped
+ continue
+
+def find_config_paths(stage_root: Path, accept_func) -> list[Path]:
+ """
+ Return files accepted by the Python acceptance function: accept(prov) → True/False.
+ """
+ out: list[Path] = []
+ for p in _walk_all_files(stage_root):
+ prov = PlanProvenance(stage_root=stage_root, config_path=p)
+ try:
+ if accept_func(prov):
+ out.append(p)
+ except Exception as e:
+ raise RuntimeError(f"accept() failed on {prov.config_rel_fpath.as_posix()}: {e}")
+ return sorted(out, key=lambda q: q.as_posix())
+
+# --- run all configs into ONE planner ---
+
+def _run_all_configs_into_single_planner(stage_root: Path, cfgs: list[Path]) -> Planner:
+ """
+ Create a single Planner and execute each config's configure(prov, planner, WriteFileMeta)
+ against it. Returns that single Planner containing the entire plan.
+ """
+ # seed with synthetic provenance; we overwrite per config before execution
+ aggregate_prov = PlanProvenance(stage_root=stage_root, config_path=stage_root / "(aggregate).py")
+ planner = Planner(provenance=aggregate_prov)
+
+ for cfg in cfgs:
+ prov = PlanProvenance(stage_root=stage_root, config_path=cfg)
+ planner.set_provenance(prov)
+
+ env = runpy.run_path(str(cfg))
+ fn = env.get("configure")
+ if not callable(fn):
+ raise RuntimeError(f"{cfg}: missing callable configure(prov, planner, WriteFileMeta)")
+
+ fn(prov, planner, WriteFileMeta)
+
+ # annotate meta once, on the single planner's journal
+ j = planner.journal()
+ j.set_meta(
generator_prog_str="executor.py",
generated_at_utc_str=iso_utc_now_str(),
user_name_str=getpass.getuser(),
host_name_str=os.uname().nodename if hasattr(os, "uname") else "unknown",
stage_root_dpath_str=str(stage_root.resolve()),
- configs_list=[p._prov.config_rel_fpath.as_posix() for p in planners],
+ configs_list=[str(p.resolve().relative_to(stage_root.resolve())) for p in cfgs],
)
-
- # copy commands
- out_j = master.journal()
- for p in planners:
- for cmd in p.journal().command_list:
- out_j.append(cmd) # keep Command objects as-is
- return master
+ return planner
# ----- CBOR “matchbox” (simple wrapper kept local to executor) -----
# -------- outer executor (phase 1 & handoff) --------
-def _outer_main(args) -> int:
- stage_root = Path(args.stage)
+def _outer_main(stage_root: Path, accept_func, args) -> int:
if not stage_root.is_dir():
print(f"error: --stage not a directory: {stage_root}", file=sys.stderr)
return 2
- cfgs = find_config_paths(stage_root, args.glob)
+ cfgs = find_config_paths(stage_root, accept_func)
if not cfgs:
print("No configuration files found.")
return 0
- # Execute each config into its own planner
- per_planners: list[Planner] = []
- for cfg in cfgs:
- try:
- per_planners.append(_run_one_config(cfg, stage_root))
- except SystemExit:
- raise
- except Exception as e:
- print(f"error: executing {cfg}: {e}", file=sys.stderr)
- return 2
-
- # Aggregate into a single master planner for printing/CBOR
- master = _aggregate_into_master(stage_root, per_planners)
+ try:
+ master = _run_all_configs_into_single_planner(stage_root, cfgs)
+ except SystemExit:
+ raise
+ except Exception as e:
+ print(f"error: executing configs: {e}", file=sys.stderr)
+ return 2
if args.phase_1_print:
master.print()
prog="executor.py",
description="StageHand outer/inner executor (plan → CBOR → decode).",
)
- ap.add_argument("--stage", default="stage", help="stage root directory (default: ./stage)")
-
- ap.add_argument("--glob", default="**/*",
- help="glob for config scripts under --stage (default: '**/*' = all files)")
-
- # ap.add_argument("--glob",
- # default="**/*",
- # help="comma-separated globs under --stage (default: **/*; every file is a config)")
+ ap.add_argument("--stage", default="stage",
+ help="stage root directory (default: ./stage)")
+ ap.add_argument(
+ "--filter",
+ default="",
+ help=f"path to acceptance filter program exporting accept(prov) "
+ f"(default: ./{DEFAULT_FILTER_FILENAME}; created if missing)"
+ )
+ ap.add_argument(
+ "--phase-0-then-stop",
+ action="store_true",
+ help="stop after arg checks & filter bootstrap (no stage scan)"
+ )
# Phase-1 (outer) controls
ap.add_argument("--phase-1-print", action="store_true", help="print master planner (phase 1)")
args = ap.parse_args(argv)
+ # Inner path
if args.inner:
if not args.plan:
print("error: --inner requires --plan <file>", file=sys.stderr)
phase2_print=args.phase_2_print,
phase2_then_stop=args.phase_2_then_stop)
- return _outer_main(args)
+ # Phase 0: bootstrap & stop (no scan)
+ stage_root = Path(args.stage)
+ try:
+ filter_path = _ensure_filter_file(args.filter or None)
+ except Exception as e:
+ print(f"error: {e}", file=sys.stderr)
+ return 2
+
+ if not stage_root.exists():
+ print(f"error: --stage not found: {stage_root}", file=sys.stderr)
+ return 2
+ if not stage_root.is_dir():
+ print(f"error: --stage is not a directory: {stage_root}", file=sys.stderr)
+ return 2
+
+ if args.phase_0_then_stop:
+ print(f"phase-0 OK: stage at {stage_root.resolve()} and filter at {filter_path}")
+ return 0
+
+ # Load acceptance function and proceed with outer
+ try:
+ accept_func = _load_accept_func(filter_path)
+ except Exception as e:
+ print(f"error: {e}", file=sys.stderr)
+ return 2
+ return _outer_main(stage_root, accept_func, args)
if __name__ == "__main__":
sys.exit(main())