From 8929bd51d115f1148eca552686eaf7f8dfb85ebf Mon Sep 17 00:00:00 2001 From: Thomas Walker Lynch Date: Tue, 16 Sep 2025 18:00:32 -0700 Subject: [PATCH] cp before new stage_cp --- .gitignore | 3 + developer/source/DNS/.gitignore | 2 +- developer/source/DNS/cp_stage.py | 330 +++++++++++++++++++++ developer/source/DNS/cp_stage.py_2 | 330 +++++++++++++++++++++ developer/source/DNS/cp_stage.py_old | 243 +++++++++++++++ developer/source/DNS/scratchpad/.gitignore | 2 - developer/source/DNS_bundle.tgz | Bin 4061 -> 0 bytes 7 files changed, 907 insertions(+), 3 deletions(-) create mode 100644 .gitignore create mode 100755 developer/source/DNS/cp_stage.py create mode 100644 developer/source/DNS/cp_stage.py_2 create mode 100644 developer/source/DNS/cp_stage.py_old delete mode 100644 developer/source/DNS/scratchpad/.gitignore delete mode 100644 developer/source/DNS_bundle.tgz diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6885132 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ + +__pycache__ +*~ diff --git a/developer/source/DNS/.gitignore b/developer/source/DNS/.gitignore index 8ddaf79..181003e 100644 --- a/developer/source/DNS/.gitignore +++ b/developer/source/DNS/.gitignore @@ -2,5 +2,5 @@ __pycache__ stage/ deprecated/ - +scratchpad/ diff --git a/developer/source/DNS/cp_stage.py b/developer/source/DNS/cp_stage.py new file mode 100755 index 0000000..8d85222 --- /dev/null +++ b/developer/source/DNS/cp_stage.py @@ -0,0 +1,330 @@ +#!/usr/bin/env -S python3 -B +""" +cp_stage.py — copy a staged tree into a root, after showing exactly what will happen. + +Behavior: + - Shows plan (to-copy / ignored) before executing. + - Requires root (UID 0) by default. + - Recreates symlinks as symlinks; regular files via shutil.copy2(). + - If a destination exists, it is first renamed in-place to '_YYYYMMDDTHHMMSSZ'. + The backup preserves the original owner/mode/xattrs. + - Prints shell-equivalent commands for each operation. +""" + +from __future__ import annotations + +# no bytecode anywhere (works under sudo/root shells too) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import datetime as _dt +import re +import shutil +import shlex as _shlex + +# === tiny helpers === + +def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: + """Return POSIX-relative paths for all file-like entries under `dir_tree_root`. + Notes: + - Recurses all levels; includes hidden files. + - Includes symlinks as entries; does NOT traverse into symlinked directories. + - Returns paths relative to `dir_tree_root` (no leading slash). + """ + read_file_relative_path_list: list[Path] = [] + for p in dir_tree_root.rglob("*"): + try: + if p.is_symlink() or p.is_file(): + read_file_relative_path_list.append(p.relative_to(dir_tree_root)) + except FileNotFoundError: + continue + return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) + +def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool: + s = rel.as_posix() + return any(r.search(s) for r in ignore_res) + +def _mode_octal(path: Path)-> str: + try: + return f"0{(path.stat().st_mode & 0o777):03o}" + except FileNotFoundError: + return "0644" + +def _cmd_install(src: Path ,dst: Path)-> str: + return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}" + +def _cmd_symlink(target: str ,dst: Path)-> str: + return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}" + +def _iso_utc()-> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _maybe_backup(dst: Path ,backups: list[Path])-> None: + """If dst exists (file or symlink), rename it to dst_ and record it.""" + try: + if dst.exists() or dst.is_symlink(): + ts = _iso_utc() + backup = dst.with_name(dst.name + f"_{ts}") + dst.rename(backup) # preserves owner/mode/xattrs + backups.append(backup) + except FileNotFoundError: + pass + +def _validate_main_args(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]: + errs: list[str] = [] + if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0: + errs.append("must run as root (UID 0) to write into the destination root; use sudo") + if not stage.exists(): + errs.append(f"--stage does not exist: {stage}") + elif not stage.is_dir(): + errs.append(f"--stage is not a directory: {stage}") + if not root.exists(): + errs.append(f"--root does not exist: {root}") + elif not root.is_dir(): + errs.append(f"--root is not a directory: {root}") + # regex syntax (report all) + for p in ignore_patterns: + try: + re.compile(p) + except re.error as e: + errs.append(f"--ignore '{p}': invalid regex: {e}") + try: + if stage.resolve() == root.resolve(): + errs.append("--stage and --root resolve to the same path (refusing to copy onto self)") + except Exception: + pass + if not sys.dont_write_bytecode: + errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)") + return errs + + +def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: + """Given: dir_tree_root. + Does: walk all levels under dir_tree_root, collect file-like entries (files or symlinks), + return their paths relative to dir_tree_root in POSIX form ordering. + Returns: list of relative Path objects (no leading slash), sorted lexicographically by as_posix(). + """ + read_file_relative_path_list: list[Path] = [] + for p in dir_tree_root.rglob("*"): + try: + if p.is_symlink() or p.is_file(): + read_file_relative_path_list.append(p.relative_to(dir_tree_root)) + except FileNotFoundError: + continue + return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) + +def _select_read_paths(read_file_rel_path_list: list[Path],ignore_patterns: list[re.Pattern[str]] +)-> tuple[list[Path], list[Path]]: + """Given: stage-relative read paths, compiled ignore patterns (matched against POSIX-style relpaths). + Does: partition into selected vs ignored. + Returns: (selected_read_rel_paths,ignored_read_rel_paths). + """ + selected: list[Path] = [] + ignored: list[Path] = [] + for rel in read_file_rel_path_list: + (ignored if _is_ignored(rel,ignore_patterns) else selected).append(rel) + return selected,ignored + +def _print_install_plan(stage: Path,root: Path,selected: list[Path],ignored: list[Path]-> None: + """Given: stage,root,selected,ignored + Does: print plan and notes about displaced-target policy. + Returns: None. + """ + print(f"Stage: {stage}") + print(f"Root: {root}\n") + print("If a file already exists at a write point, it is first renamed in place to " + "'_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.") + print(f"=== Files to be read ({len(selected)}) ===") + print("\n".join(p.as_posix() for p in selected) if selected else "(none)") + print(f"\n=== Files ignored ({len(ignored)}) ===") + print("\n".join(p.as_posix() for p in ignored) if ignored else "(none)") + +def _partition_by_kind(stage: Path,selected_read_rel_paths: list[Path] +)-> tuple[list[Path], list[Path]]: + """Given: stage root and selected stage-relative read paths. + Does: partition into regular files vs symlinks (based on the staged items). + Returns: (regular_read_rel_paths,symlink_read_rel_paths). + """ + regular: list[Path] = [] + symlinks: list[Path] = [] + for rel in selected_read_rel_paths: + (symlinks if (stage/rel).is_symlink() else regular).append(rel) + return regular,symlinks + +def _rewrite_symlink_target(link_target: str,stage: Path,root: Path)-> tuple[str,bool]: + """Given: textual symlink target from a staged link, and the resolved stage/root. + Does: if target is absolute and under stage, rewrite to analogous path under root; + otherwise preserve (absolute outside stage or relative). + Returns: (new_target_text,is_rewritten). + """ + try: + if link_target.startswith("/"): + stage_str = str(stage) + if link_target == stage_str or link_target.startswith(stage_str + os.sep): + rel_str = os.path.relpath(link_target,stage_str) + return str(root/rel_str),True + return link_target,False + else: + return link_target,False # relative: preserve + except Exception: + return link_target,False + +def _copy_regular_files(regular_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path])-> int: + """Given: stage-rooted regular read relpaths, stage, root, and displaced_targets accumulator. + Does: ensure parents, displace existing targets (rename with UTC suffix), copy via copy2, align mode. + Returns: number of write operations completed. + """ + writes = 0 + for rel in regular_read_rel_paths: + read_abs = stage/rel + write_abs = root/rel + write_abs.parent.mkdir(parents=True,exist_ok=True) + _maybe_backup(write_abs,displaced_targets) + print(f"+ {_cmd_install(read_abs,write_abs)} # (exec: copy2)") + shutil.copy2(read_abs,write_abs) + try: + os.chmod(write_abs,read_abs.stat().st_mode & 0o777) + except Exception: + pass + writes += 1 + return writes + +def _install_symlinks(symlink_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path] +)-> tuple[int,list[tuple[Path,str]]]: + """Given: staged symlink relpaths, stage, root, and displaced_targets accumulator. + Does: ensure parents, displace existing targets, recreate symlinks; rewrite stage-absolute targets to root. + Returns: (writes_completed,post_link_checks[(installed_path,target_text),...]). + """ + writes = 0 + post_checks: list[tuple[Path,str]] = [] + for rel in symlink_read_rel_paths: + read_link_abs = stage/rel + write_link_abs = root/rel + write_link_abs.parent.mkdir(parents=True,exist_ok=True) + _maybe_backup(write_link_abs,displaced_targets) + target_txt = os.readlink(read_link_abs) + new_txt,re_written = _rewrite_symlink_target(target_txt,stage,root) + note = " # rewritten from stage-absolute" if re_written else "" + print(f"+ {_cmd_symlink(new_txt,write_link_abs)}{note}") + os.symlink(new_txt,write_link_abs) + writes += 1 + post_checks.append((write_link_abs,new_txt)) + return writes,post_checks + +def _report_broken_symlinks(post_link_checks: list[tuple[Path,str]])-> list[Path]: + """Given: list of (installed_link_path,target_text). + Does: detect links whose targets do not exist at install time; prints warnings. + Returns: list of broken link paths. + """ + broken: list[Path] = [] + for link_path,target_txt in post_link_checks: + try: + exists = Path(target_txt).exists() if target_txt.startswith("/") else (link_path.parent/target_txt).exists() + if not exists: + broken.append(link_path) + except Exception: + broken.append(link_path) + if broken: + print("\nwarn: the following installed symlinks do not resolve at install time:") + for p in broken: + print(f" - {p}") + return broken + + +# === core === + +def cp_stage( + stage: Path # directory tree of files to be read, then copied + ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name + ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths + ,assume_yes: bool=False # prompt short-circuit +)-> tuple[list[Path], list[Path], int, list[Path]]: + """ + Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage, + and a non-interactive flag. + Does: enumerates stage file-like entries, filters by ignores, shows a plan, optionally prompts, then performs + writes in two phases: regular files first (copy2), then symlinks (with stage→root rewriting of absolute targets). + Pre-existing targets are displaced with an in-place rename to '_YYYYMMDDTHHMMSSZ' (UTC). + Returns: (selected_read_rel_paths,ignored_read_rel_paths,writes_completed_count,displaced_targets_list) + """ + stage = stage.resolve() + root = root.resolve() + + read_file_rel_path_list = _read_file_relative_paths_under(stage) + selected_read_rel_paths,ignored_read_rel_paths = _select_read_paths(read_file_rel_path_list,ignore_patterns) + + _print_install_plan(stage,root,selected_read_rel_paths,ignored_read_rel_paths) + if not assume_yes: + print("") + ans = input("Proceed with copy? [y/N] ").strip().lower() + if ans not in ("y","yes"): + print("Aborted. Nothing copied.") + return selected_read_rel_paths,ignored_read_rel_paths,0,[] + + regular_read_rel_paths,symlink_read_rel_paths = _partition_by_kind(stage,selected_read_rel_paths) + + displaced_targets: list[Path] = [] + writes_completed = 0 + + writes_completed += _copy_regular_files(regular_read_rel_paths,stage,root,displaced_targets) + link_writes,post_link_checks = _install_symlinks(symlink_read_rel_paths,stage,root,displaced_targets) + writes_completed += link_writes + + _report_broken_symlinks(post_link_checks) + + print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.") + if displaced_targets: + print("These files were displaced and renamed:") + for p in displaced_targets: + print(f" - {p}") + + return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets + + +def main(argv: list[str] | None=None)-> int: + ap = argparse.ArgumentParser( + prog="cp_stage.py" + ,description="Copy staged files into a destination root, after showing the exact plan." + ) + ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--root" , default="/" ,help="destination root (default: /)") + ap.add_argument("--ignore",action="append",default=[] + ,help="regex matched against POSIX-style relative paths under --stage; can be repeated") + + ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt") + + args = ap.parse_args(argv) + + stage = Path(args.stage) + root = Path(args.root) + + # validate against raw patterns (strings) + errs = _validate_main_args(stage ,root ,args.ignore ,require_root=True) + if errs: + print("error(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 2 + + # compile patterns + ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore] + + try: + cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes) + return 0 + except KeyboardInterrupt: + print( + "\nKeyboard interrupt — copy may be left in a partial state.\n" + "Some targets might have been backed up or replaced; review outputs before retrying.", + file=sys.stderr, + ) + return 130 + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/cp_stage.py_2 b/developer/source/DNS/cp_stage.py_2 new file mode 100644 index 0000000..8d85222 --- /dev/null +++ b/developer/source/DNS/cp_stage.py_2 @@ -0,0 +1,330 @@ +#!/usr/bin/env -S python3 -B +""" +cp_stage.py — copy a staged tree into a root, after showing exactly what will happen. + +Behavior: + - Shows plan (to-copy / ignored) before executing. + - Requires root (UID 0) by default. + - Recreates symlinks as symlinks; regular files via shutil.copy2(). + - If a destination exists, it is first renamed in-place to '_YYYYMMDDTHHMMSSZ'. + The backup preserves the original owner/mode/xattrs. + - Prints shell-equivalent commands for each operation. +""" + +from __future__ import annotations + +# no bytecode anywhere (works under sudo/root shells too) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import datetime as _dt +import re +import shutil +import shlex as _shlex + +# === tiny helpers === + +def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: + """Return POSIX-relative paths for all file-like entries under `dir_tree_root`. + Notes: + - Recurses all levels; includes hidden files. + - Includes symlinks as entries; does NOT traverse into symlinked directories. + - Returns paths relative to `dir_tree_root` (no leading slash). + """ + read_file_relative_path_list: list[Path] = [] + for p in dir_tree_root.rglob("*"): + try: + if p.is_symlink() or p.is_file(): + read_file_relative_path_list.append(p.relative_to(dir_tree_root)) + except FileNotFoundError: + continue + return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) + +def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool: + s = rel.as_posix() + return any(r.search(s) for r in ignore_res) + +def _mode_octal(path: Path)-> str: + try: + return f"0{(path.stat().st_mode & 0o777):03o}" + except FileNotFoundError: + return "0644" + +def _cmd_install(src: Path ,dst: Path)-> str: + return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}" + +def _cmd_symlink(target: str ,dst: Path)-> str: + return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}" + +def _iso_utc()-> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _maybe_backup(dst: Path ,backups: list[Path])-> None: + """If dst exists (file or symlink), rename it to dst_ and record it.""" + try: + if dst.exists() or dst.is_symlink(): + ts = _iso_utc() + backup = dst.with_name(dst.name + f"_{ts}") + dst.rename(backup) # preserves owner/mode/xattrs + backups.append(backup) + except FileNotFoundError: + pass + +def _validate_main_args(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]: + errs: list[str] = [] + if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0: + errs.append("must run as root (UID 0) to write into the destination root; use sudo") + if not stage.exists(): + errs.append(f"--stage does not exist: {stage}") + elif not stage.is_dir(): + errs.append(f"--stage is not a directory: {stage}") + if not root.exists(): + errs.append(f"--root does not exist: {root}") + elif not root.is_dir(): + errs.append(f"--root is not a directory: {root}") + # regex syntax (report all) + for p in ignore_patterns: + try: + re.compile(p) + except re.error as e: + errs.append(f"--ignore '{p}': invalid regex: {e}") + try: + if stage.resolve() == root.resolve(): + errs.append("--stage and --root resolve to the same path (refusing to copy onto self)") + except Exception: + pass + if not sys.dont_write_bytecode: + errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)") + return errs + + +def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: + """Given: dir_tree_root. + Does: walk all levels under dir_tree_root, collect file-like entries (files or symlinks), + return their paths relative to dir_tree_root in POSIX form ordering. + Returns: list of relative Path objects (no leading slash), sorted lexicographically by as_posix(). + """ + read_file_relative_path_list: list[Path] = [] + for p in dir_tree_root.rglob("*"): + try: + if p.is_symlink() or p.is_file(): + read_file_relative_path_list.append(p.relative_to(dir_tree_root)) + except FileNotFoundError: + continue + return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) + +def _select_read_paths(read_file_rel_path_list: list[Path],ignore_patterns: list[re.Pattern[str]] +)-> tuple[list[Path], list[Path]]: + """Given: stage-relative read paths, compiled ignore patterns (matched against POSIX-style relpaths). + Does: partition into selected vs ignored. + Returns: (selected_read_rel_paths,ignored_read_rel_paths). + """ + selected: list[Path] = [] + ignored: list[Path] = [] + for rel in read_file_rel_path_list: + (ignored if _is_ignored(rel,ignore_patterns) else selected).append(rel) + return selected,ignored + +def _print_install_plan(stage: Path,root: Path,selected: list[Path],ignored: list[Path]-> None: + """Given: stage,root,selected,ignored + Does: print plan and notes about displaced-target policy. + Returns: None. + """ + print(f"Stage: {stage}") + print(f"Root: {root}\n") + print("If a file already exists at a write point, it is first renamed in place to " + "'_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.") + print(f"=== Files to be read ({len(selected)}) ===") + print("\n".join(p.as_posix() for p in selected) if selected else "(none)") + print(f"\n=== Files ignored ({len(ignored)}) ===") + print("\n".join(p.as_posix() for p in ignored) if ignored else "(none)") + +def _partition_by_kind(stage: Path,selected_read_rel_paths: list[Path] +)-> tuple[list[Path], list[Path]]: + """Given: stage root and selected stage-relative read paths. + Does: partition into regular files vs symlinks (based on the staged items). + Returns: (regular_read_rel_paths,symlink_read_rel_paths). + """ + regular: list[Path] = [] + symlinks: list[Path] = [] + for rel in selected_read_rel_paths: + (symlinks if (stage/rel).is_symlink() else regular).append(rel) + return regular,symlinks + +def _rewrite_symlink_target(link_target: str,stage: Path,root: Path)-> tuple[str,bool]: + """Given: textual symlink target from a staged link, and the resolved stage/root. + Does: if target is absolute and under stage, rewrite to analogous path under root; + otherwise preserve (absolute outside stage or relative). + Returns: (new_target_text,is_rewritten). + """ + try: + if link_target.startswith("/"): + stage_str = str(stage) + if link_target == stage_str or link_target.startswith(stage_str + os.sep): + rel_str = os.path.relpath(link_target,stage_str) + return str(root/rel_str),True + return link_target,False + else: + return link_target,False # relative: preserve + except Exception: + return link_target,False + +def _copy_regular_files(regular_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path])-> int: + """Given: stage-rooted regular read relpaths, stage, root, and displaced_targets accumulator. + Does: ensure parents, displace existing targets (rename with UTC suffix), copy via copy2, align mode. + Returns: number of write operations completed. + """ + writes = 0 + for rel in regular_read_rel_paths: + read_abs = stage/rel + write_abs = root/rel + write_abs.parent.mkdir(parents=True,exist_ok=True) + _maybe_backup(write_abs,displaced_targets) + print(f"+ {_cmd_install(read_abs,write_abs)} # (exec: copy2)") + shutil.copy2(read_abs,write_abs) + try: + os.chmod(write_abs,read_abs.stat().st_mode & 0o777) + except Exception: + pass + writes += 1 + return writes + +def _install_symlinks(symlink_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path] +)-> tuple[int,list[tuple[Path,str]]]: + """Given: staged symlink relpaths, stage, root, and displaced_targets accumulator. + Does: ensure parents, displace existing targets, recreate symlinks; rewrite stage-absolute targets to root. + Returns: (writes_completed,post_link_checks[(installed_path,target_text),...]). + """ + writes = 0 + post_checks: list[tuple[Path,str]] = [] + for rel in symlink_read_rel_paths: + read_link_abs = stage/rel + write_link_abs = root/rel + write_link_abs.parent.mkdir(parents=True,exist_ok=True) + _maybe_backup(write_link_abs,displaced_targets) + target_txt = os.readlink(read_link_abs) + new_txt,re_written = _rewrite_symlink_target(target_txt,stage,root) + note = " # rewritten from stage-absolute" if re_written else "" + print(f"+ {_cmd_symlink(new_txt,write_link_abs)}{note}") + os.symlink(new_txt,write_link_abs) + writes += 1 + post_checks.append((write_link_abs,new_txt)) + return writes,post_checks + +def _report_broken_symlinks(post_link_checks: list[tuple[Path,str]])-> list[Path]: + """Given: list of (installed_link_path,target_text). + Does: detect links whose targets do not exist at install time; prints warnings. + Returns: list of broken link paths. + """ + broken: list[Path] = [] + for link_path,target_txt in post_link_checks: + try: + exists = Path(target_txt).exists() if target_txt.startswith("/") else (link_path.parent/target_txt).exists() + if not exists: + broken.append(link_path) + except Exception: + broken.append(link_path) + if broken: + print("\nwarn: the following installed symlinks do not resolve at install time:") + for p in broken: + print(f" - {p}") + return broken + + +# === core === + +def cp_stage( + stage: Path # directory tree of files to be read, then copied + ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name + ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths + ,assume_yes: bool=False # prompt short-circuit +)-> tuple[list[Path], list[Path], int, list[Path]]: + """ + Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage, + and a non-interactive flag. + Does: enumerates stage file-like entries, filters by ignores, shows a plan, optionally prompts, then performs + writes in two phases: regular files first (copy2), then symlinks (with stage→root rewriting of absolute targets). + Pre-existing targets are displaced with an in-place rename to '_YYYYMMDDTHHMMSSZ' (UTC). + Returns: (selected_read_rel_paths,ignored_read_rel_paths,writes_completed_count,displaced_targets_list) + """ + stage = stage.resolve() + root = root.resolve() + + read_file_rel_path_list = _read_file_relative_paths_under(stage) + selected_read_rel_paths,ignored_read_rel_paths = _select_read_paths(read_file_rel_path_list,ignore_patterns) + + _print_install_plan(stage,root,selected_read_rel_paths,ignored_read_rel_paths) + if not assume_yes: + print("") + ans = input("Proceed with copy? [y/N] ").strip().lower() + if ans not in ("y","yes"): + print("Aborted. Nothing copied.") + return selected_read_rel_paths,ignored_read_rel_paths,0,[] + + regular_read_rel_paths,symlink_read_rel_paths = _partition_by_kind(stage,selected_read_rel_paths) + + displaced_targets: list[Path] = [] + writes_completed = 0 + + writes_completed += _copy_regular_files(regular_read_rel_paths,stage,root,displaced_targets) + link_writes,post_link_checks = _install_symlinks(symlink_read_rel_paths,stage,root,displaced_targets) + writes_completed += link_writes + + _report_broken_symlinks(post_link_checks) + + print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.") + if displaced_targets: + print("These files were displaced and renamed:") + for p in displaced_targets: + print(f" - {p}") + + return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets + + +def main(argv: list[str] | None=None)-> int: + ap = argparse.ArgumentParser( + prog="cp_stage.py" + ,description="Copy staged files into a destination root, after showing the exact plan." + ) + ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--root" , default="/" ,help="destination root (default: /)") + ap.add_argument("--ignore",action="append",default=[] + ,help="regex matched against POSIX-style relative paths under --stage; can be repeated") + + ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt") + + args = ap.parse_args(argv) + + stage = Path(args.stage) + root = Path(args.root) + + # validate against raw patterns (strings) + errs = _validate_main_args(stage ,root ,args.ignore ,require_root=True) + if errs: + print("error(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 2 + + # compile patterns + ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore] + + try: + cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes) + return 0 + except KeyboardInterrupt: + print( + "\nKeyboard interrupt — copy may be left in a partial state.\n" + "Some targets might have been backed up or replaced; review outputs before retrying.", + file=sys.stderr, + ) + return 130 + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/cp_stage.py_old b/developer/source/DNS/cp_stage.py_old new file mode 100644 index 0000000..d5aafb9 --- /dev/null +++ b/developer/source/DNS/cp_stage.py_old @@ -0,0 +1,243 @@ +#!/usr/bin/env -S python3 -B +""" +cp_stage.py — copy a staged tree into a root, after showing exactly what will happen. + +Behavior: + - Shows plan (to-copy / ignored) before executing. + - Requires root (UID 0) by default. + - Recreates symlinks as symlinks; regular files via shutil.copy2(). + - If a destination exists, it is first renamed in-place to '_YYYYMMDDTHHMMSSZ'. + The backup preserves the original owner/mode/xattrs. + - Prints shell-equivalent commands for each operation. +""" + +from __future__ import annotations + +# no bytecode anywhere (works under sudo/root shells too) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from pathlib import Path +import argparse +import datetime as _dt +import re +import shutil +import shlex as _shlex + +# === tiny helpers === + +def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: + """Return POSIX-relative paths for all file-like entries under `dir_tree_root`. + Notes: + - Recurses all levels; includes hidden files. + - Includes symlinks as entries; does NOT traverse into symlinked directories. + - Returns paths relative to `dir_tree_root` (no leading slash). + """ + read_file_relative_path_list: list[Path] = [] + for p in dir_tree_root.rglob("*"): + try: + if p.is_symlink() or p.is_file(): + read_file_relative_path_list.append(p.relative_to(dir_tree_root)) + except FileNotFoundError: + continue + return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) + +def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool: + s = rel.as_posix() + return any(r.search(s) for r in ignore_res) + +def _mode_octal(path: Path)-> str: + try: + return f"0{(path.stat().st_mode & 0o777):03o}" + except FileNotFoundError: + return "0644" + +def _cmd_install(src: Path ,dst: Path)-> str: + return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}" + +def _cmd_symlink(target: str ,dst: Path)-> str: + return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}" + +def _iso_utc()-> str: + return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + +def _maybe_backup(dst: Path ,backups: list[Path])-> None: + """If dst exists (file or symlink), rename it to dst_ and record it.""" + try: + if dst.exists() or dst.is_symlink(): + ts = _iso_utc() + backup = dst.with_name(dst.name + f"_{ts}") + dst.rename(backup) # preserves owner/mode/xattrs + backups.append(backup) + except FileNotFoundError: + pass + +def _validate(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]: + errs: list[str] = [] + if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0: + errs.append("must run as root (UID 0) to write into the destination root; use sudo") + if not stage.exists(): + errs.append(f"--stage does not exist: {stage}") + elif not stage.is_dir(): + errs.append(f"--stage is not a directory: {stage}") + if not root.exists(): + errs.append(f"--root does not exist: {root}") + elif not root.is_dir(): + errs.append(f"--root is not a directory: {root}") + # regex syntax (report all) + for p in ignore_patterns: + try: + re.compile(p) + except re.error as e: + errs.append(f"--ignore '{p}': invalid regex: {e}") + try: + if stage.resolve() == root.resolve(): + errs.append("--stage and --root resolve to the same path (refusing to copy onto self)") + except Exception: + pass + if not sys.dont_write_bytecode: + errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)") + return errs + +# === core === + +def cp_stage( + stage: Path # directory tree of files to be read, then copied + ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name + ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths + ,assume_yes: bool=False # if yes don't stop to ask if the user really wants to do the copy + ,displaced_target_manifest: Path|None=None # optional file that will list targets displaced (renamed) before writes +)-> tuple[list[Path], list[Path], int, list[Path]]: + """ + Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage, + an optional non-interactive flag, and an optional path to a manifest that records displaced targets. + Does: enumerates all file-like entries under stage (files and symlinks), filters by ignore patterns, shows a plan, + optionally prompts, then for each selected read path ensures the write parent exists, renames any existing + write target to '_YYYYMMDDTHHMMSSZ' (UTC) in place (preserving owner/mode/xattrs), and finally writes: + reproduces symlinks as symlinks or copies regular files via shutil.copy2() and aligns mode. + Returns: (selected_read_rel_paths, ignored_read_rel_paths, writes_completed_count, displaced_targets_list) + """ + stage = stage.resolve() + root = root.resolve() + + read_file_rel_path_list: list[Path] = _read_file_relative_paths_under(stage) + + selected_read_rel_paths: list[Path] = [] + ignored_read_rel_paths: list[Path] = [] + for read_file_rel_path in read_file_rel_path_list: + (ignored_read_rel_paths if _is_ignored(read_file_rel_path ,ignore_patterns) else selected_read_rel_paths).append(read_file_rel_path) + + print(f"Stage: {stage}") + print(f"Root: {root}\n") + print("Backups: if a destination exists, it is first renamed in place to " + "'_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.") + if displaced_target_manifest: + print(f"Backup manifest will be written to: {displaced_target_manifest}") + + print(f"=== Files to be read ({len(selected_read_rel_paths)}) ===") + print("\n".join(p.as_posix() for p in selected_read_rel_paths) if selected_read_rel_paths else "(none)") + + print(f"\n=== Files ignored ({len(ignored_read_rel_paths)}) ===") + print("\n".join(p.as_posix() for p in ignored_read_rel_paths) if ignored_read_rel_paths else "(none)") + + print("") + if not assume_yes: + ans = input("Proceed with copy? [y/N] ").strip().lower() + if ans not in ("y","yes"): + print("Aborted. Nothing copied.") + return selected_read_rel_paths,ignored_read_rel_paths,0,[] + + displaced_targets: list[Path] = [] + writes_completed = 0 + + for read_file_rel_path in selected_read_rel_paths: + read_file_abs_path = stage / read_file_rel_path + write_file_abs_path = root / read_file_rel_path + write_file_abs_path.parent.mkdir(parents=True ,exist_ok=True) + + # backup if target exists (rename in place with UTC suffix) + _maybe_backup(write_file_abs_path ,displaced_targets) + + if read_file_abs_path.is_symlink(): + link_target = os.readlink(read_file_abs_path) + print(f"+ {_cmd_symlink(link_target ,write_file_abs_path)}") + os.symlink(link_target ,write_file_abs_path) + writes_completed += 1 + continue + + print(f"+ {_cmd_install(read_file_abs_path ,write_file_abs_path)} # (exec: copy2)") + shutil.copy2(read_file_abs_path ,write_file_abs_path) + try: + os.chmod(write_file_abs_path ,read_file_abs_path.stat().st_mode & 0o777) + except Exception: + pass + writes_completed += 1 + + # optional manifest write (list displaced targets, one per line) + if displaced_target_manifest: + try: + displaced_target_manifest.parent.mkdir(parents=True ,exist_ok=True) + with open(displaced_target_manifest ,"w" ,encoding="utf-8") as fh: + for p in displaced_targets: + fh.write(str(p) + "\n") + print(f"\nBackup manifest: {displaced_target_manifest} ({len(displaced_targets)} entr{'y' if len(displaced_targets)==1 else 'ies'})") + except Exception as e: + print(f"warn: could not write manifest {displaced_target_manifest}: {e}" ,file=sys.stderr) + + print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.") + if displaced_targets: + print("Backups:") + for p in displaced_targets: + print(f" - {p}") + + return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets + + +def main(argv: list[str] | None=None)-> int: + ap = argparse.ArgumentParser( + prog="cp_stage.py" + ,description="Copy staged files into a destination root, after showing the exact plan." + ) + ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--root" , default="/" ,help="destination root (default: /)") + ap.add_argument("--ignore",action="append",default=[] + ,help="regex matched against POSIX-style relative paths under --stage; can be repeated") + ap.add_argument("--displaced_target_manifest" ,default=None + ,help="write list of backup files to this path (optional)") + + ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt") + + args = ap.parse_args(argv) + + stage = Path(args.stage) + root = Path(args.root) + displaced_target_manifest = Path(args.displaced_target_manifest) if args.displaced_target_manifest else None + + # validate against raw patterns (strings) + errs = _validate(stage ,root ,args.ignore ,require_root=True) + if errs: + print("error(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 2 + + # compile patterns + ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore] + + try: + cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes ,displaced_target_manifest=displaced_target_manifest) + return 0 + except KeyboardInterrupt: + print( + "\nKeyboard interrupt — copy may be left in a partial state.\n" + "Some targets might have been backed up or replaced; review outputs before retrying.", + file=sys.stderr, + ) + return 130 + except Exception as e: + print(f"error: {e}" ,file=sys.stderr) + return 2 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/scratchpad/.gitignore b/developer/source/DNS/scratchpad/.gitignore deleted file mode 100644 index 120f485..0000000 --- a/developer/source/DNS/scratchpad/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!/.gitignore diff --git a/developer/source/DNS_bundle.tgz b/developer/source/DNS_bundle.tgz deleted file mode 100644 index 1635cc2d51b8c4fbe221f19f2ffed0095001ba50..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4061 zcmV<34*=%ov)-%Mr)_+qo|NHbkfQVTN$=?O zakovW^x7?SbbI@!y|a@xsgb;Hh2lzt%Ca`;^_57j0iMu8%ukOavLg(8)<%apnrFX=X-z=_tnNiD|WM z5!GiO&bm_y1C@$MF^=h0YtL!opRM-Jc5`c5qeLhfUIFx`1l5{|vo57s6p3(?Y?-A) z8ALnO2RI!cZAnSlT|Ni4d9;5gD3uIVp$r`4Uiovj*7iU=VnO`2(4FjLj^{ zaFblcK#OiS)j)YcM+0EU2QhMa9o4kO0L*M84cq{W@j|Hp!;U1WAcS&KDBR0xjjl&{ zr5l+gG}VlonrWUJVf6tR27>9Rd%~a@?Xe_pCa1TS6h}s%GprF6d&Dy%577>WhgBs}a`0EIdGMXAuG~ z3&O;^mLHEI5~*h%s8l`2FM5bqS1+(T7?6KZH?V^7+&I%QdtaKFE5_XCW!ttEx@Um$ zj`~?wGx1LUWOvq|sic^wWj06NMG!3!*9Q~QOgU-|a#2E9bw zkz;5n#QajbF#7K6+?-tBhlmwBuR$WmMq`AVU|3?DU+%Nc;8{!0(%5cIb?;=iJ#zrM zw@zJF3>Y~;y8idTO~HWtU@#B~P?z9{KK%QCYG*KQHMjN(c#GXHrp6**I8avugk6^= z^FmH)1PP_NS2bF}3QEW{a0R;Pi=~$$2BbQm= zvqz4EsdQxc3YAH*6YYeS~3U*BBkT~UY!S@ z9v&Q>9=_}?QUh_klGhZZbS;xhH6PjR=^e3he4#0Wozi9m9}PW;vviY2QeMLST_-`x z@g>hIidbN5(1c_tHpeto+K0_Xz@|~vCb66x6!uaKd$DSmHpoL4A$U1kA$SyD`}!bN zCi3c_tYBFzpdT{G0>9cTFs*PIM<<(wvtMBc97WC|Fia*s=3-RJ00d(%vF7+pe1q@U zA7FZCs&P+GO>j|opI#5^c=0a3{C!ZSPda)WOK@^APU^HG_hcM1Bbic+7xr|JMX_|w zzSEjsOvZFxyG+W3@LZ0CQhOOT1gL6wQ_jY#ftN-_Qn#=WnYrDCw|36Rt|GNffxSYz0=!)W-bR?r~t5T_0 zi-`i4{U#SGTm;9=i=`cdvQU4f{ZCnASa)^|tgnMB5t+cczfk-E4m_#T(0$r1xAQD} zlU8|?_h;SOWcPMHl{W*6f+DXY?x_w|j(woeFykM{C-su@(zb6F*5`#W7u_qAh`Bqa zlYi^IdVPGd-+P7dOl~Nnp@OS31upzq5T|Cd;HZeyx}2N2L@wjz6FA!!12+rPwn^o< zvOAC>&tATD!@zer)Z#ahBr*Xb*eWcYeD(W-3=>7aPr*1( z)R5u0y1X+(n0aAvQG7qat5u9mhOwI{P-{9ctVR;pzwP92YIhw_o$;M+rSB? z_xn{R-x31b00s$_)$Q$neTZ>-c!MgMx1bgJKz%J$J8o+zRjU6H987q@X=2iPm?<+M zpVn?vV7(Y($3TP>3wGL1rLA#P895Zj6ZXd2w{JIU8;uRFwZy~|aS8_V4|uQuSHr62 z)S$4jy|Lfc8TK1|I@ctjs13y%=u-_{{NFi)@M0`##t$c-jC@f{(oPSF3)q$VNPIhU zqfU-a4s}1|w%pIe=49C(FOVNS=>rg64J`b?_UvlW1u#WdxeAjN_t}(ey?|pG2__kX z6^MRwj%TD#QaMAB3KW$X_PY)>4QF_i^9q)lKByX3eg;|-fz@s z!!Wh%t=_JjWsGk@V#`BVGulLrJ0B}|dvb@(E;)DE@R|XN!W0?we-~@aN8wrfdX|Gq zSGs6TR*n4RhE2HhCpT20${@rfh66@Xnnj+L1Mi`_>>O#fdj%8x-qil=gi&{oIRI*$ zMr5Tn5V;x*3^?(C&nO2b7=>TyXBL;v?uI8+tNBOX6b_=y1aMe^K|q}jRZO(1tO7I0 zKSCrR;+A#&JxDT=0VUxgq?~8>fXctNb1668YmW|a)fGP0;9{ir;X*B(2A+E_w&FYR zT?X;vke0ZVSPk{Pq2RV{_%KhT)}>P&taY(#o6Y+lb*QfD%3fiNcK&ZFWtgwjDhq+W za-M@?`zlW&X6e_8`34alba9N|RapVUh4nz4;N5HN1))FLL z=-kNsXK`kJ;*x^!m)plK{-Yl3!{7d*6(QdS^(614r^2bs#c-Zr;?&rMk40j2)!|62 zLsJmxU1*n+h$7{3_LE^YhIyRuTY`+)B-&8yPUUWW^U=b_P~*mq>-&8-hizBpvT|@N z!UxrLOKq)jpzF7?|1&!*c|33(P1urltFp^Mh5$-yB7hJWzG>Ik@9IxT}FJU=78HMh^+72C{_1-v)*)#lAT+< zn^<(<&r!pssMXozG*qtCJ2!}xhZEs-Im_tieJp(@%MQBOmwsrUhUc23<$pQ3o62}; z>PL6ntut6qhh4Gtbt1FaJX&w^O#EDPM)+UdbjI-afj;ps81pOH|MJG!C%*sH+-oh_ z|JL5#&U*j%*Rub@h}p8sGeXWte;<%t+I@Z-Mhy9YulkU!(~4t5t2|a*_3`I82NmcG z7OVU~uD4XjzcJOl)rV2Qt9OpIUFMy+X%=s%=D-n-sLpu=O538VE4)q%o^E`>S2$j} zv3n7O0iUsUUhbdtj}LqOua1rn`rX6RZ;xIcTIGnFuNX3_^R7r&4?UL0;`$^Bu8^w? z0Z$*(eR4ms(*7Utd%wW_&nzuEFtd3}g?{1!re=^;$a ze|qPh1ZFJWlF(xoFG(zqX<^L1xu>MDx% z^7E^dGu9$+G90-(<;BBOS_RVH%lnb0-p@w*;FK1o+*-Hm1NPtM`Ts|+>it6Z|C>9{ z@A&_%*7n-}e~xA@2lKuLClI{n(WYW+v7$YsIdrJ-i*W}sxxxFqqUdk3ZuWA6cK{Jd zK5#0op|xk;{mb0-i~?WO%|3N{qif;~^#(~lYvc(tP$O6@);{EP@IS`uPrbTqguZq3P5B752I2E%$tH1I6Hq3GK zKP&nTsQ>r&pWoN|oc6!HwYLA?s{ObAZ`;4VH~$Z7|6C22KXlMF!NN7yU*`T_b7yyJ z+5THQ>-hg`-2c09`T8-+rMk!@#X&51!^0BZNK&QK(UoC-skzgsShEXTe5zQFp+E^9 z2U%GDLBRY5!aaAFi#y1SWrnJ}+Rum{^S1)`z$pq