from pathlib import Path
import getpass
-
# ===== Utilities =====
-def norm_perm(value: int|str)-> tuple[int,str]|None:
- "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int,'%04o') or None."
+def norm_perm(value: int|str)-> tuple[int ,str]|None:
+ "Given int or 3/4-char octal string (optionally 0o-prefixed). Does validate/normalize. Returns (int ,'%04o') or None."
if isinstance(value ,int):
if 0 <= value <= 0o7777:
return value ,f"{value:04o}"
s = value.as_posix() if isinstance(value ,Path) else str(value)
return s if is_abs_dpath(s) else None
+def norm_dpath_str(value: str|Path|None)-> str|None:
+ "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None."
+ if value is None: return None
+ s = value.as_posix() if isinstance(value ,Path) else str(value)
+ if not s or "\x00" in s: return None
+ return s
+
def norm_fname_or_none(value: str|None)-> str|None:
"Given candidate filename or None. Does validate bare filename. Returns str or None."
if value is None: return None
return s if s else None
def parse_mode(value: int|str|None)-> tuple[int|None ,str|None]:
- "Given int/str/None. Does normalize via norm_perm. Returns (int,'%04o') or (None,None)."
+ "Given int/str/None. Does normalize via norm_perm. Returns (int ,'%04o') or (None ,None)."
if value is None: return None ,None
r = norm_perm(value)
return r if r is not None else (None ,None)
if isinstance(value ,bytes): return value
return value.encode("utf-8")
-def norm_dpath_str(value: str|Path|None)-> str|None:
- "Given str/Path/None. Does minimal sanitize; allows relative. Returns str or None."
- if value is None: return None
- s = value.as_posix() if isinstance(value ,Path) else str(value)
- if not s or "\x00" in s: return None
- return s
-
-
# ===== Wire-ready model types (no CBOR here) =====
class Command:
Does hold op name, own a fresh arg_dict, collect per-entry errors.
Returns dictionary via as_dictionary().
"""
- __slots__ = ("name_str" ,"arg_dict" ,"errors_list")
+ __slots__ = (
+ "name_str"
+ ,"arg_dict"
+ ,"errors_list"
+ )
def __init__(self ,name_str: str ,arg_dict: dict|None=None ,errors_list: list[str]|None=None)-> None:
self.name_str = name_str
,"errors_list": list(self.errors_list)
}
- def print(self, *, index: int|None=None, file=None)-> None:
+ def print(self ,* ,index: int|None=None ,file=None)-> None:
"""
Given: optional index for numbering and optional file-like (defaults to stdout).
Does: print a compact, human-readable one-line summary of this command; prints any errors indented below.
op = self.name_str
ad = self.arg_dict or {}
- # Compose destination path for display
+ # Compose destination path for display (normalize to collapse '..')
d = ad.get("write_file_dpath_str") or ""
f = ad.get("write_file_fname") or ""
try:
from pathlib import Path as _Path
- dst = (_Path(d)/f).as_posix() if d and f and "/" not in f else "?"
+ if d and f and "/" not in f:
+ dst = (_Path(d)/f).resolve().as_posix()
+ else:
+ dst = "?"
except Exception:
dst = "?"
- # Numbering prefix
prefix = f"{index:02d}. " if index is not None else ""
if op == "copy":
else:
line = f"{prefix}?op? -> {dst}"
- print(line, file=file)
+ print(line ,file=file)
- # Print any per-entry errors underneath
for err in self.errors_list:
- print(f" ! {err}", file=file)
-
+ print(f" ! {err}" ,file=file)
class Journal:
"""
Does manage meta, append commands, expose entries, and pack to dict.
Returns dict via as_dictionary().
"""
- __slots__ = ("meta_dict" ,"command_list")
+ __slots__ = (
+ "meta_dict"
+ ,"command_list"
+ )
def __init__(self ,plan_dict: dict|None=None)-> None:
self.meta_dict = {}
entries = plan_dict.get("entries_list") or []
self.meta_dict.update(meta)
for e in entries:
- if not isinstance(e ,dict):
+ if not isinstance(e ,dict):
continue
op = e.get("op") or "?"
args = e.get("arg_dict") or {}
,"entries_list": self.entries_list()
}
- def print(self, *, index_start: int = 1, file=None) -> None:
+ def print(self ,* ,index_start: int=1 ,file=None)-> None:
"""
Given: optional starting index and optional file-like (defaults to stdout).
Does: print each Command on a single line via Command.print(), numbered.
file = _sys.stdout
if not self.command_list:
- print("(plan is empty)", file=file)
+ print("(plan is empty)" ,file=file)
return
- for i, cmd in enumerate(self.command_list, start=index_start):
- cmd.print(index=i, file=file)
+ for i ,cmd in enumerate(self.command_list ,start=index_start):
+ cmd.print(index=i ,file=file)
# ===== Runner-provided provenance =====
-# Planner.py
class PlanProvenance:
"""
Runner-provided, read-only provenance for a single config script.
"""
- __slots__ = ("stage_root_dpath","config_abs_fpath","config_rel_fpath",
- "read_dir_dpath","read_fname","process_user")
-
- def __init__(self, *, stage_root: Path, config_path: Path):
- import getpass
+ __slots__ = (
+ "stage_root_dpath"
+ ,"config_abs_fpath"
+ ,"config_rel_fpath"
+ ,"read_dir_dpath"
+ ,"read_fname"
+ ,"process_user"
+ ,"cwd_dpath"
+ )
+
+ def __init__(self ,* ,stage_root: Path ,config_path: Path):
self.stage_root_dpath = stage_root.resolve()
self.config_abs_fpath = config_path.resolve()
try:
else:
self.read_fname = name
- # NEW: owner of the StageHand process
self.process_user = getpass.getuser()
+ self.cwd_dpath = Path.cwd().resolve()
- def print(self, *, file=None) -> None:
+ def print(self ,* ,file=None)-> None:
if file is None:
import sys as _sys
file = _sys.stdout
- print(f"Stage root: {self.stage_root_dpath}", file=file)
- print(f"Config (rel): {self.config_rel_fpath.as_posix()}", file=file)
- print(f"Config (abs): {self.config_abs_fpath}", file=file)
- print(f"Read dir: {self.read_dir_dpath}", file=file)
- print(f"Read fname: {self.read_fname}", file=file)
- print(f"Process user: {self.process_user}", file=file) # NEW
+ print(f"Stage root: {self.stage_root_dpath}" ,file=file)
+ print(f"Config (rel): {self.config_rel_fpath.as_posix()}" ,file=file)
+ print(f"Config (abs): {self.config_abs_fpath}" ,file=file)
+ print(f"Read dir: {self.read_dir_dpath}" ,file=file)
+ print(f"Read fname: {self.read_fname}" ,file=file)
+ print(f"Process user: {self.process_user}" ,file=file)
# ===== Admin-facing defaults carrier =====
"""
WriteFileMeta — per-call or planner-default write-file attributes.
- Given dpath (abs str/Path) ,fname (bare name or None) ,owner (str)
+ Given dpath (str/Path, may be relative) ,fname (bare name or None) ,owner (str)
,mode (int|'0644') ,content (bytes|str|None).
Does normalize into fields (may remain None if absent/invalid).
Returns object suitable for providing defaults to Planner methods.
"""
- __slots__ = ("dpath_str" ,"fname" ,"owner_name_str" ,"mode_int" ,"mode_octal_str" ,"content_bytes")
+ __slots__ = (
+ "dpath_str"
+ ,"fname"
+ ,"owner_name_str"
+ ,"mode_int"
+ ,"mode_octal_str"
+ ,"content_bytes"
+ )
def __init__(self
,*
,dpath="/"
- ,fname=None # None → let Planner/provenance choose
+ ,fname=None
,owner="root"
,mode=0o444
,content=None
):
- self.dpath_str = norm_dpath_str(dpath)
- self.fname = norm_fname_or_none(fname) # '.' no longer special → None
- self.owner_name_str = norm_nonempty_owner(owner) # '.' rejected → None
- self.mode_int, self.mode_octal_str = parse_mode(mode)
- self.content_bytes = norm_content_bytes(content)
+ self.dpath_str = norm_dpath_str(dpath)
+ self.fname = norm_fname_or_none(fname)
+ self.owner_name_str = norm_nonempty_owner(owner)
+ self.mode_int ,self.mode_octal_str = parse_mode(mode)
+ self.content_bytes = norm_content_bytes(content)
- def print(self, *, label: str | None = None, file=None) -> None:
- """
- Given: optional label and optional file-like (defaults to stdout).
- Does: print a single-line summary of defaults/overrides.
- Returns: None.
- """
+ def print(self ,* ,label: str|None=None ,file=None)-> None:
if file is None:
import sys as _sys
file = _sys.stdout
-
dpath = self.dpath_str or "?"
fname = self.fname or "?"
owner = self.owner_name_str or "?"
- mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int, int) else (self.mode_octal_str or "?")
- size = len(self.content_bytes) if isinstance(self.content_bytes, (bytes, bytearray)) else 0
+ mode_str = f"{self.mode_int:04o}" if isinstance(self.mode_int ,int) else (self.mode_octal_str or "?")
+ size = len(self.content_bytes) if isinstance(self.content_bytes ,(bytes ,bytearray)) else 0
prefix = (label + ": ") if label else ""
- print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}", file=file)
-
+ print(f"{prefix}dpath={dpath} fname={fname} owner={owner} mode={mode_str} bytes={size}" ,file=file)
# ===== Planner =====
On any argument error, returns the Command with errors and DOES NOT append it to Journal.
Returns live Journal via journal().
"""
- __slots__ = ("_prov" ,"_defaults" ,"_journal")
+ __slots__ = (
+ "_prov"
+ ,"_defaults"
+ ,"_journal"
+ )
def __init__(self ,provenance: PlanProvenance ,defaults: WriteFileMeta|None=None)-> None:
self._prov = provenance
,config_rel_fpath_str=self._prov.config_rel_fpath.as_posix()
)
- # --- defaults management / access ---
+ # --- provenance/defaults/journal access ---
- # in Planner.py, inside class Planner
- def set_provenance(self, prov: PlanProvenance) -> None:
- """Switch the current provenance used for fallbacks & per-command provenance tagging."""
+ def set_provenance(self ,prov: PlanProvenance)-> None:
self._prov = prov
def set_defaults(self ,defaults: WriteFileMeta)-> None:
- "Given WriteFileMeta. Does replace planner defaults. Returns None."
self._defaults = defaults
def defaults(self)-> WriteFileMeta:
- "Given n/a. Does return current WriteFileMeta defaults. Returns WriteFileMeta."
return self._defaults
def journal(self)-> Journal:
- "Given n/a. Returns Journal reference (live, still being modified here)."
return self._journal
# --- resolution helpers ---
def _pick(self ,kw ,meta_attr ,default_attr):
- "Given three sources. Does pick first non-None. Returns value or None."
+ "Pick first non-None among kw ,meta_attr ,default_attr."
return kw if kw is not None else (meta_attr if meta_attr is not None else default_attr)
-# inside Planner
-
-from pathlib import Path
-from posixpath import normpath as _normposix
-
- def _resolve_write_file(self, wfm, dpath, fname) -> tuple[str|None, str|None]:
- # Normalize explicit kwargs (keep None as sentinel)
+ def _resolve_write_file(self ,wfm ,dpath ,fname)-> tuple[str|None ,str|None]:
+ # normalize explicit kwargs
dpath_str = norm_dpath_str(dpath) if dpath is not None else None
- if fname is not None and fname != ".":
- fname = norm_fname_or_none(fname)
+ fname_str = norm_fname_or_none(fname) if fname is not None else None
- # Precedence: kwarg > per-call meta > planner default
- dpath_val = self._pick(dpath_str, (wfm.dpath_str if wfm else None), self._defaults.dpath_str)
- fname_val = self._pick(fname, (wfm.fname if wfm else None), self._defaults.fname)
+ # precedence: kwarg > per-call meta > planner default
+ dpath_val = self._pick(dpath_str ,(wfm.dpath_str if wfm else None) ,self._defaults.dpath_str)
+ fname_val = self._pick(fname_str ,(wfm.fname if wfm else None) ,self._defaults.fname)
- # Final fallback for filename: "." or None → derive from config name
- if fname_val == "." or fname_val is None:
+ # final fallback for filename: derive from config name
+ if fname_val is None:
fname_val = self._prov.read_fname
- # Anchor relative dpaths to the *process working directory* (CWD), then normalize.
+ # anchor/normalize dpath
if dpath_val is not None:
- if is_abs_dpath(dpath_val):
- # Normalize absolute path for pretty/consistency
- try:
- dpath_val = Path(dpath_val).resolve().as_posix()
- except Exception:
- dpath_val = _normposix(str(dpath_val))
- else:
- base = Path.cwd()
- try:
- dpath_val = (base / dpath_val).resolve().as_posix()
- except Exception:
- dpath_val = _normposix((base / dpath_val).as_posix())
+ p = Path(dpath_val)
+ if not p.is_absolute():
+ p = (self._prov.cwd_dpath/p)
+ dpath_val = p.resolve().as_posix()
- return dpath_val, fname_val
+ return dpath_val ,fname_val
def _resolve_owner_mode_content(self
,wfm: WriteFileMeta|None
mode_norm = parse_mode(mode) if mode is not None else (None ,None)
content_b = norm_content_bytes(content) if content is not None else None
- owner_v = self._pick(owner_norm, (wfm.owner_name_str if wfm else None), self._defaults.owner_name_str)
+ owner_v = self._pick(owner_norm ,(wfm.owner_name_str if wfm else None) ,self._defaults.owner_name_str)
mode_v = (mode_norm if mode_norm != (None ,None) else
((wfm.mode_int ,wfm.mode_octal_str) if wfm else (self._defaults.mode_int ,self._defaults.mode_octal_str)))
content_v = self._pick(content_b ,(wfm.content_bytes if wfm else None) ,self._defaults.content_bytes)
return owner_v ,mode_v ,content_v
- def print(self, *, show_journal: bool = True, file=None) -> None:
- """
- Given: flags (show_journal) and optional file-like (defaults to stdout).
- Does: print provenance, defaults, and optionally the journal via delegation.
- Returns: None.
- """
+ # --- printing ---
+
+ def print(self ,* ,show_journal: bool=True ,file=None)-> None:
if file is None:
import sys as _sys
file = _sys.stdout
- print("== Provenance ==", file=file)
+ print("== Provenance ==" ,file=file)
self._prov.print(file=file)
- print("\n== Defaults ==", file=file)
- self._defaults.print(label="defaults", file=file)
+ print("\n== Defaults ==" ,file=file)
+ self._defaults.print(label="defaults" ,file=file)
if show_journal:
- entries = getattr(self._journal, "command_list", [])
+ entries = getattr(self._journal ,"command_list" ,[])
n_total = len(entries)
- n_copy = sum(1 for c in entries if getattr(c, "name_str", None) == "copy")
- n_disp = sum(1 for c in entries if getattr(c, "name_str", None) == "displace")
- n_del = sum(1 for c in entries if getattr(c, "name_str", None) == "delete")
+ n_copy = sum(1 for c in entries if getattr(c ,"name_str" ,None) == "copy")
+ n_disp = sum(1 for c in entries if getattr(c ,"name_str" ,None) == "displace")
+ n_del = sum(1 for c in entries if getattr(c ,"name_str" ,None) == "delete")
- print("\n== Journal ==", file=file)
- print(f"entries: {n_total} copy:{n_copy} displace:{n_disp} delete:{n_del}", file=file)
+ print("\n== Journal ==" ,file=file)
+ print(f"entries: {n_total} copy:{n_copy} displace:{n_disp} delete:{n_del}" ,file=file)
if n_total:
- self._journal.print(index_start=1, file=file)
+ self._journal.print(index_start=1 ,file=file)
else:
- print("(plan is empty)", file=file)
+ print("(plan is empty)" ,file=file)
# --- Command builders (first arg may be WriteFileMeta) ---
,mode: int|str|None=None
,content: bytes|str|None=None
)-> Command:
- """
- Given optional WriteFileMeta plus keyword overrides.
- Does build a 'copy' command; on any argument error the command is returned with errors and NOT appended.
- Returns Command.
- """
cmd = Command("copy")
dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
owner_v ,(mode_int ,mode_oct) ,content_b = self._resolve_owner_mode_content(wfm ,owner ,mode ,content)
cmd.add_error("content is required for copy() (bytes or str)")
cmd.arg_dict.update({
- "write_file_dpath_str": dpath,
- "write_file_fname": fname, # was write_file_fname
- "owner_name": owner_v, # was owner_name_str
- "mode_int": mode_int,
- "mode_octal_str": mode_oct,
- "content_bytes": content_b,
- "provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix(),
+ "write_file_dpath_str": dpath
+ ,"write_file_fname": fname
+ ,"owner_name": owner_v
+ ,"mode_int": mode_int
+ ,"mode_octal_str": mode_oct
+ ,"content_bytes": content_b
+ ,"provenance_config_rel_fpath_str": self._prov.config_rel_fpath.as_posix()
})
if not cmd.errors_list:
,write_file_dpath: str|Path|None=None
,write_file_fname: str|None=None
)-> Command:
- "Given optional WriteFileMeta plus overrides. Does build 'displace' entry or return errors. Returns Command."
cmd = Command("displace")
dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute")
if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename")
cmd.arg_dict.update({
- "write_file_dpath_str": dpath,
- "write_file_fname": fname,
+ "write_file_dpath_str": dpath
+ ,"write_file_fname": fname
})
if not cmd.errors_list:
self._journal.append(cmd)
,write_file_dpath: str|Path|None=None
,write_file_fname: str|None=None
)-> Command:
- "Given optional WriteFileMeta plus overrides. Does build 'delete' entry or return errors. Returns Command."
cmd = Command("delete")
dpath ,fname = self._resolve_write_file(wfm ,write_file_dpath ,write_file_fname)
if not is_abs_dpath(dpath): cmd.add_error("write_file_dpath must be absolute")
if norm_fname_or_none(fname) is None: cmd.add_error("write_file_fname must be a bare filename")
cmd.arg_dict.update({
- "write_file_dpath_str": dpath,
- "write_file_fname": fname,
+ "write_file_dpath_str": dpath
+ ,"write_file_fname": fname
})
if not cmd.errors_list:
self._journal.append(cmd)
return cmd
-
-
-