From: Thomas Walker Lynch Date: Wed, 17 Sep 2025 01:54:43 +0000 (-0700) Subject: a short lived header line approach, will be replaced with py program config files X-Git-Url: https://git.reasoningtechnology.com/style/static/gitweb.css?a=commitdiff_plain;h=0835a99869f8722c0c81fb903fdeebf40796461e;p=subu a short lived header line approach, will be replaced with py program config files --- diff --git a/developer/source/DNS/cp_stage.py b/developer/source/DNS/cp_stage.py deleted file mode 100755 index 8d85222..0000000 --- a/developer/source/DNS/cp_stage.py +++ /dev/null @@ -1,330 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -cp_stage.py — copy a staged tree into a root, after showing exactly what will happen. - -Behavior: - - Shows plan (to-copy / ignored) before executing. - - Requires root (UID 0) by default. - - Recreates symlinks as symlinks; regular files via shutil.copy2(). - - If a destination exists, it is first renamed in-place to '_YYYYMMDDTHHMMSSZ'. - The backup preserves the original owner/mode/xattrs. - - Prints shell-equivalent commands for each operation. -""" - -from __future__ import annotations - -# no bytecode anywhere (works under sudo/root shells too) -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from pathlib import Path -import argparse -import datetime as _dt -import re -import shutil -import shlex as _shlex - -# === tiny helpers === - -def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: - """Return POSIX-relative paths for all file-like entries under `dir_tree_root`. - Notes: - - Recurses all levels; includes hidden files. - - Includes symlinks as entries; does NOT traverse into symlinked directories. - - Returns paths relative to `dir_tree_root` (no leading slash). - """ - read_file_relative_path_list: list[Path] = [] - for p in dir_tree_root.rglob("*"): - try: - if p.is_symlink() or p.is_file(): - read_file_relative_path_list.append(p.relative_to(dir_tree_root)) - except FileNotFoundError: - continue - return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) - -def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool: - s = rel.as_posix() - return any(r.search(s) for r in ignore_res) - -def _mode_octal(path: Path)-> str: - try: - return f"0{(path.stat().st_mode & 0o777):03o}" - except FileNotFoundError: - return "0644" - -def _cmd_install(src: Path ,dst: Path)-> str: - return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}" - -def _cmd_symlink(target: str ,dst: Path)-> str: - return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}" - -def _iso_utc()-> str: - return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - -def _maybe_backup(dst: Path ,backups: list[Path])-> None: - """If dst exists (file or symlink), rename it to dst_ and record it.""" - try: - if dst.exists() or dst.is_symlink(): - ts = _iso_utc() - backup = dst.with_name(dst.name + f"_{ts}") - dst.rename(backup) # preserves owner/mode/xattrs - backups.append(backup) - except FileNotFoundError: - pass - -def _validate_main_args(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]: - errs: list[str] = [] - if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0: - errs.append("must run as root (UID 0) to write into the destination root; use sudo") - if not stage.exists(): - errs.append(f"--stage does not exist: {stage}") - elif not stage.is_dir(): - errs.append(f"--stage is not a directory: {stage}") - if not root.exists(): - errs.append(f"--root does not exist: {root}") - elif not root.is_dir(): - errs.append(f"--root is not a directory: {root}") - # regex syntax (report all) - for p in ignore_patterns: - try: - re.compile(p) - except re.error as e: - errs.append(f"--ignore '{p}': invalid regex: {e}") - try: - if stage.resolve() == root.resolve(): - errs.append("--stage and --root resolve to the same path (refusing to copy onto self)") - except Exception: - pass - if not sys.dont_write_bytecode: - errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)") - return errs - - -def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: - """Given: dir_tree_root. - Does: walk all levels under dir_tree_root, collect file-like entries (files or symlinks), - return their paths relative to dir_tree_root in POSIX form ordering. - Returns: list of relative Path objects (no leading slash), sorted lexicographically by as_posix(). - """ - read_file_relative_path_list: list[Path] = [] - for p in dir_tree_root.rglob("*"): - try: - if p.is_symlink() or p.is_file(): - read_file_relative_path_list.append(p.relative_to(dir_tree_root)) - except FileNotFoundError: - continue - return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) - -def _select_read_paths(read_file_rel_path_list: list[Path],ignore_patterns: list[re.Pattern[str]] -)-> tuple[list[Path], list[Path]]: - """Given: stage-relative read paths, compiled ignore patterns (matched against POSIX-style relpaths). - Does: partition into selected vs ignored. - Returns: (selected_read_rel_paths,ignored_read_rel_paths). - """ - selected: list[Path] = [] - ignored: list[Path] = [] - for rel in read_file_rel_path_list: - (ignored if _is_ignored(rel,ignore_patterns) else selected).append(rel) - return selected,ignored - -def _print_install_plan(stage: Path,root: Path,selected: list[Path],ignored: list[Path]-> None: - """Given: stage,root,selected,ignored - Does: print plan and notes about displaced-target policy. - Returns: None. - """ - print(f"Stage: {stage}") - print(f"Root: {root}\n") - print("If a file already exists at a write point, it is first renamed in place to " - "'_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.") - print(f"=== Files to be read ({len(selected)}) ===") - print("\n".join(p.as_posix() for p in selected) if selected else "(none)") - print(f"\n=== Files ignored ({len(ignored)}) ===") - print("\n".join(p.as_posix() for p in ignored) if ignored else "(none)") - -def _partition_by_kind(stage: Path,selected_read_rel_paths: list[Path] -)-> tuple[list[Path], list[Path]]: - """Given: stage root and selected stage-relative read paths. - Does: partition into regular files vs symlinks (based on the staged items). - Returns: (regular_read_rel_paths,symlink_read_rel_paths). - """ - regular: list[Path] = [] - symlinks: list[Path] = [] - for rel in selected_read_rel_paths: - (symlinks if (stage/rel).is_symlink() else regular).append(rel) - return regular,symlinks - -def _rewrite_symlink_target(link_target: str,stage: Path,root: Path)-> tuple[str,bool]: - """Given: textual symlink target from a staged link, and the resolved stage/root. - Does: if target is absolute and under stage, rewrite to analogous path under root; - otherwise preserve (absolute outside stage or relative). - Returns: (new_target_text,is_rewritten). - """ - try: - if link_target.startswith("/"): - stage_str = str(stage) - if link_target == stage_str or link_target.startswith(stage_str + os.sep): - rel_str = os.path.relpath(link_target,stage_str) - return str(root/rel_str),True - return link_target,False - else: - return link_target,False # relative: preserve - except Exception: - return link_target,False - -def _copy_regular_files(regular_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path])-> int: - """Given: stage-rooted regular read relpaths, stage, root, and displaced_targets accumulator. - Does: ensure parents, displace existing targets (rename with UTC suffix), copy via copy2, align mode. - Returns: number of write operations completed. - """ - writes = 0 - for rel in regular_read_rel_paths: - read_abs = stage/rel - write_abs = root/rel - write_abs.parent.mkdir(parents=True,exist_ok=True) - _maybe_backup(write_abs,displaced_targets) - print(f"+ {_cmd_install(read_abs,write_abs)} # (exec: copy2)") - shutil.copy2(read_abs,write_abs) - try: - os.chmod(write_abs,read_abs.stat().st_mode & 0o777) - except Exception: - pass - writes += 1 - return writes - -def _install_symlinks(symlink_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path] -)-> tuple[int,list[tuple[Path,str]]]: - """Given: staged symlink relpaths, stage, root, and displaced_targets accumulator. - Does: ensure parents, displace existing targets, recreate symlinks; rewrite stage-absolute targets to root. - Returns: (writes_completed,post_link_checks[(installed_path,target_text),...]). - """ - writes = 0 - post_checks: list[tuple[Path,str]] = [] - for rel in symlink_read_rel_paths: - read_link_abs = stage/rel - write_link_abs = root/rel - write_link_abs.parent.mkdir(parents=True,exist_ok=True) - _maybe_backup(write_link_abs,displaced_targets) - target_txt = os.readlink(read_link_abs) - new_txt,re_written = _rewrite_symlink_target(target_txt,stage,root) - note = " # rewritten from stage-absolute" if re_written else "" - print(f"+ {_cmd_symlink(new_txt,write_link_abs)}{note}") - os.symlink(new_txt,write_link_abs) - writes += 1 - post_checks.append((write_link_abs,new_txt)) - return writes,post_checks - -def _report_broken_symlinks(post_link_checks: list[tuple[Path,str]])-> list[Path]: - """Given: list of (installed_link_path,target_text). - Does: detect links whose targets do not exist at install time; prints warnings. - Returns: list of broken link paths. - """ - broken: list[Path] = [] - for link_path,target_txt in post_link_checks: - try: - exists = Path(target_txt).exists() if target_txt.startswith("/") else (link_path.parent/target_txt).exists() - if not exists: - broken.append(link_path) - except Exception: - broken.append(link_path) - if broken: - print("\nwarn: the following installed symlinks do not resolve at install time:") - for p in broken: - print(f" - {p}") - return broken - - -# === core === - -def cp_stage( - stage: Path # directory tree of files to be read, then copied - ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name - ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths - ,assume_yes: bool=False # prompt short-circuit -)-> tuple[list[Path], list[Path], int, list[Path]]: - """ - Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage, - and a non-interactive flag. - Does: enumerates stage file-like entries, filters by ignores, shows a plan, optionally prompts, then performs - writes in two phases: regular files first (copy2), then symlinks (with stage→root rewriting of absolute targets). - Pre-existing targets are displaced with an in-place rename to '_YYYYMMDDTHHMMSSZ' (UTC). - Returns: (selected_read_rel_paths,ignored_read_rel_paths,writes_completed_count,displaced_targets_list) - """ - stage = stage.resolve() - root = root.resolve() - - read_file_rel_path_list = _read_file_relative_paths_under(stage) - selected_read_rel_paths,ignored_read_rel_paths = _select_read_paths(read_file_rel_path_list,ignore_patterns) - - _print_install_plan(stage,root,selected_read_rel_paths,ignored_read_rel_paths) - if not assume_yes: - print("") - ans = input("Proceed with copy? [y/N] ").strip().lower() - if ans not in ("y","yes"): - print("Aborted. Nothing copied.") - return selected_read_rel_paths,ignored_read_rel_paths,0,[] - - regular_read_rel_paths,symlink_read_rel_paths = _partition_by_kind(stage,selected_read_rel_paths) - - displaced_targets: list[Path] = [] - writes_completed = 0 - - writes_completed += _copy_regular_files(regular_read_rel_paths,stage,root,displaced_targets) - link_writes,post_link_checks = _install_symlinks(symlink_read_rel_paths,stage,root,displaced_targets) - writes_completed += link_writes - - _report_broken_symlinks(post_link_checks) - - print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.") - if displaced_targets: - print("These files were displaced and renamed:") - for p in displaced_targets: - print(f" - {p}") - - return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets - - -def main(argv: list[str] | None=None)-> int: - ap = argparse.ArgumentParser( - prog="cp_stage.py" - ,description="Copy staged files into a destination root, after showing the exact plan." - ) - ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") - ap.add_argument("--root" , default="/" ,help="destination root (default: /)") - ap.add_argument("--ignore",action="append",default=[] - ,help="regex matched against POSIX-style relative paths under --stage; can be repeated") - - ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt") - - args = ap.parse_args(argv) - - stage = Path(args.stage) - root = Path(args.root) - - # validate against raw patterns (strings) - errs = _validate_main_args(stage ,root ,args.ignore ,require_root=True) - if errs: - print("error(s):" ,file=sys.stderr) - for e in errs: - print(f" - {e}" ,file=sys.stderr) - return 2 - - # compile patterns - ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore] - - try: - cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes) - return 0 - except KeyboardInterrupt: - print( - "\nKeyboard interrupt — copy may be left in a partial state.\n" - "Some targets might have been backed up or replaced; review outputs before retrying.", - file=sys.stderr, - ) - return 130 - except Exception as e: - print(f"error: {e}" ,file=sys.stderr) - return 2 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/cp_stage.py_2 b/developer/source/DNS/cp_stage.py_2 deleted file mode 100644 index 8d85222..0000000 --- a/developer/source/DNS/cp_stage.py_2 +++ /dev/null @@ -1,330 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -cp_stage.py — copy a staged tree into a root, after showing exactly what will happen. - -Behavior: - - Shows plan (to-copy / ignored) before executing. - - Requires root (UID 0) by default. - - Recreates symlinks as symlinks; regular files via shutil.copy2(). - - If a destination exists, it is first renamed in-place to '_YYYYMMDDTHHMMSSZ'. - The backup preserves the original owner/mode/xattrs. - - Prints shell-equivalent commands for each operation. -""" - -from __future__ import annotations - -# no bytecode anywhere (works under sudo/root shells too) -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from pathlib import Path -import argparse -import datetime as _dt -import re -import shutil -import shlex as _shlex - -# === tiny helpers === - -def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: - """Return POSIX-relative paths for all file-like entries under `dir_tree_root`. - Notes: - - Recurses all levels; includes hidden files. - - Includes symlinks as entries; does NOT traverse into symlinked directories. - - Returns paths relative to `dir_tree_root` (no leading slash). - """ - read_file_relative_path_list: list[Path] = [] - for p in dir_tree_root.rglob("*"): - try: - if p.is_symlink() or p.is_file(): - read_file_relative_path_list.append(p.relative_to(dir_tree_root)) - except FileNotFoundError: - continue - return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) - -def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool: - s = rel.as_posix() - return any(r.search(s) for r in ignore_res) - -def _mode_octal(path: Path)-> str: - try: - return f"0{(path.stat().st_mode & 0o777):03o}" - except FileNotFoundError: - return "0644" - -def _cmd_install(src: Path ,dst: Path)-> str: - return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}" - -def _cmd_symlink(target: str ,dst: Path)-> str: - return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}" - -def _iso_utc()-> str: - return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - -def _maybe_backup(dst: Path ,backups: list[Path])-> None: - """If dst exists (file or symlink), rename it to dst_ and record it.""" - try: - if dst.exists() or dst.is_symlink(): - ts = _iso_utc() - backup = dst.with_name(dst.name + f"_{ts}") - dst.rename(backup) # preserves owner/mode/xattrs - backups.append(backup) - except FileNotFoundError: - pass - -def _validate_main_args(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]: - errs: list[str] = [] - if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0: - errs.append("must run as root (UID 0) to write into the destination root; use sudo") - if not stage.exists(): - errs.append(f"--stage does not exist: {stage}") - elif not stage.is_dir(): - errs.append(f"--stage is not a directory: {stage}") - if not root.exists(): - errs.append(f"--root does not exist: {root}") - elif not root.is_dir(): - errs.append(f"--root is not a directory: {root}") - # regex syntax (report all) - for p in ignore_patterns: - try: - re.compile(p) - except re.error as e: - errs.append(f"--ignore '{p}': invalid regex: {e}") - try: - if stage.resolve() == root.resolve(): - errs.append("--stage and --root resolve to the same path (refusing to copy onto self)") - except Exception: - pass - if not sys.dont_write_bytecode: - errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)") - return errs - - -def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: - """Given: dir_tree_root. - Does: walk all levels under dir_tree_root, collect file-like entries (files or symlinks), - return their paths relative to dir_tree_root in POSIX form ordering. - Returns: list of relative Path objects (no leading slash), sorted lexicographically by as_posix(). - """ - read_file_relative_path_list: list[Path] = [] - for p in dir_tree_root.rglob("*"): - try: - if p.is_symlink() or p.is_file(): - read_file_relative_path_list.append(p.relative_to(dir_tree_root)) - except FileNotFoundError: - continue - return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) - -def _select_read_paths(read_file_rel_path_list: list[Path],ignore_patterns: list[re.Pattern[str]] -)-> tuple[list[Path], list[Path]]: - """Given: stage-relative read paths, compiled ignore patterns (matched against POSIX-style relpaths). - Does: partition into selected vs ignored. - Returns: (selected_read_rel_paths,ignored_read_rel_paths). - """ - selected: list[Path] = [] - ignored: list[Path] = [] - for rel in read_file_rel_path_list: - (ignored if _is_ignored(rel,ignore_patterns) else selected).append(rel) - return selected,ignored - -def _print_install_plan(stage: Path,root: Path,selected: list[Path],ignored: list[Path]-> None: - """Given: stage,root,selected,ignored - Does: print plan and notes about displaced-target policy. - Returns: None. - """ - print(f"Stage: {stage}") - print(f"Root: {root}\n") - print("If a file already exists at a write point, it is first renamed in place to " - "'_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.") - print(f"=== Files to be read ({len(selected)}) ===") - print("\n".join(p.as_posix() for p in selected) if selected else "(none)") - print(f"\n=== Files ignored ({len(ignored)}) ===") - print("\n".join(p.as_posix() for p in ignored) if ignored else "(none)") - -def _partition_by_kind(stage: Path,selected_read_rel_paths: list[Path] -)-> tuple[list[Path], list[Path]]: - """Given: stage root and selected stage-relative read paths. - Does: partition into regular files vs symlinks (based on the staged items). - Returns: (regular_read_rel_paths,symlink_read_rel_paths). - """ - regular: list[Path] = [] - symlinks: list[Path] = [] - for rel in selected_read_rel_paths: - (symlinks if (stage/rel).is_symlink() else regular).append(rel) - return regular,symlinks - -def _rewrite_symlink_target(link_target: str,stage: Path,root: Path)-> tuple[str,bool]: - """Given: textual symlink target from a staged link, and the resolved stage/root. - Does: if target is absolute and under stage, rewrite to analogous path under root; - otherwise preserve (absolute outside stage or relative). - Returns: (new_target_text,is_rewritten). - """ - try: - if link_target.startswith("/"): - stage_str = str(stage) - if link_target == stage_str or link_target.startswith(stage_str + os.sep): - rel_str = os.path.relpath(link_target,stage_str) - return str(root/rel_str),True - return link_target,False - else: - return link_target,False # relative: preserve - except Exception: - return link_target,False - -def _copy_regular_files(regular_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path])-> int: - """Given: stage-rooted regular read relpaths, stage, root, and displaced_targets accumulator. - Does: ensure parents, displace existing targets (rename with UTC suffix), copy via copy2, align mode. - Returns: number of write operations completed. - """ - writes = 0 - for rel in regular_read_rel_paths: - read_abs = stage/rel - write_abs = root/rel - write_abs.parent.mkdir(parents=True,exist_ok=True) - _maybe_backup(write_abs,displaced_targets) - print(f"+ {_cmd_install(read_abs,write_abs)} # (exec: copy2)") - shutil.copy2(read_abs,write_abs) - try: - os.chmod(write_abs,read_abs.stat().st_mode & 0o777) - except Exception: - pass - writes += 1 - return writes - -def _install_symlinks(symlink_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path] -)-> tuple[int,list[tuple[Path,str]]]: - """Given: staged symlink relpaths, stage, root, and displaced_targets accumulator. - Does: ensure parents, displace existing targets, recreate symlinks; rewrite stage-absolute targets to root. - Returns: (writes_completed,post_link_checks[(installed_path,target_text),...]). - """ - writes = 0 - post_checks: list[tuple[Path,str]] = [] - for rel in symlink_read_rel_paths: - read_link_abs = stage/rel - write_link_abs = root/rel - write_link_abs.parent.mkdir(parents=True,exist_ok=True) - _maybe_backup(write_link_abs,displaced_targets) - target_txt = os.readlink(read_link_abs) - new_txt,re_written = _rewrite_symlink_target(target_txt,stage,root) - note = " # rewritten from stage-absolute" if re_written else "" - print(f"+ {_cmd_symlink(new_txt,write_link_abs)}{note}") - os.symlink(new_txt,write_link_abs) - writes += 1 - post_checks.append((write_link_abs,new_txt)) - return writes,post_checks - -def _report_broken_symlinks(post_link_checks: list[tuple[Path,str]])-> list[Path]: - """Given: list of (installed_link_path,target_text). - Does: detect links whose targets do not exist at install time; prints warnings. - Returns: list of broken link paths. - """ - broken: list[Path] = [] - for link_path,target_txt in post_link_checks: - try: - exists = Path(target_txt).exists() if target_txt.startswith("/") else (link_path.parent/target_txt).exists() - if not exists: - broken.append(link_path) - except Exception: - broken.append(link_path) - if broken: - print("\nwarn: the following installed symlinks do not resolve at install time:") - for p in broken: - print(f" - {p}") - return broken - - -# === core === - -def cp_stage( - stage: Path # directory tree of files to be read, then copied - ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name - ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths - ,assume_yes: bool=False # prompt short-circuit -)-> tuple[list[Path], list[Path], int, list[Path]]: - """ - Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage, - and a non-interactive flag. - Does: enumerates stage file-like entries, filters by ignores, shows a plan, optionally prompts, then performs - writes in two phases: regular files first (copy2), then symlinks (with stage→root rewriting of absolute targets). - Pre-existing targets are displaced with an in-place rename to '_YYYYMMDDTHHMMSSZ' (UTC). - Returns: (selected_read_rel_paths,ignored_read_rel_paths,writes_completed_count,displaced_targets_list) - """ - stage = stage.resolve() - root = root.resolve() - - read_file_rel_path_list = _read_file_relative_paths_under(stage) - selected_read_rel_paths,ignored_read_rel_paths = _select_read_paths(read_file_rel_path_list,ignore_patterns) - - _print_install_plan(stage,root,selected_read_rel_paths,ignored_read_rel_paths) - if not assume_yes: - print("") - ans = input("Proceed with copy? [y/N] ").strip().lower() - if ans not in ("y","yes"): - print("Aborted. Nothing copied.") - return selected_read_rel_paths,ignored_read_rel_paths,0,[] - - regular_read_rel_paths,symlink_read_rel_paths = _partition_by_kind(stage,selected_read_rel_paths) - - displaced_targets: list[Path] = [] - writes_completed = 0 - - writes_completed += _copy_regular_files(regular_read_rel_paths,stage,root,displaced_targets) - link_writes,post_link_checks = _install_symlinks(symlink_read_rel_paths,stage,root,displaced_targets) - writes_completed += link_writes - - _report_broken_symlinks(post_link_checks) - - print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.") - if displaced_targets: - print("These files were displaced and renamed:") - for p in displaced_targets: - print(f" - {p}") - - return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets - - -def main(argv: list[str] | None=None)-> int: - ap = argparse.ArgumentParser( - prog="cp_stage.py" - ,description="Copy staged files into a destination root, after showing the exact plan." - ) - ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") - ap.add_argument("--root" , default="/" ,help="destination root (default: /)") - ap.add_argument("--ignore",action="append",default=[] - ,help="regex matched against POSIX-style relative paths under --stage; can be repeated") - - ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt") - - args = ap.parse_args(argv) - - stage = Path(args.stage) - root = Path(args.root) - - # validate against raw patterns (strings) - errs = _validate_main_args(stage ,root ,args.ignore ,require_root=True) - if errs: - print("error(s):" ,file=sys.stderr) - for e in errs: - print(f" - {e}" ,file=sys.stderr) - return 2 - - # compile patterns - ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore] - - try: - cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes) - return 0 - except KeyboardInterrupt: - print( - "\nKeyboard interrupt — copy may be left in a partial state.\n" - "Some targets might have been backed up or replaced; review outputs before retrying.", - file=sys.stderr, - ) - return 130 - except Exception as e: - print(f"error: {e}" ,file=sys.stderr) - return 2 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/cp_stage.py_old b/developer/source/DNS/cp_stage.py_old deleted file mode 100644 index d5aafb9..0000000 --- a/developer/source/DNS/cp_stage.py_old +++ /dev/null @@ -1,243 +0,0 @@ -#!/usr/bin/env -S python3 -B -""" -cp_stage.py — copy a staged tree into a root, after showing exactly what will happen. - -Behavior: - - Shows plan (to-copy / ignored) before executing. - - Requires root (UID 0) by default. - - Recreates symlinks as symlinks; regular files via shutil.copy2(). - - If a destination exists, it is first renamed in-place to '_YYYYMMDDTHHMMSSZ'. - The backup preserves the original owner/mode/xattrs. - - Prints shell-equivalent commands for each operation. -""" - -from __future__ import annotations - -# no bytecode anywhere (works under sudo/root shells too) -import sys ,os -sys.dont_write_bytecode = True -os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") - -from pathlib import Path -import argparse -import datetime as _dt -import re -import shutil -import shlex as _shlex - -# === tiny helpers === - -def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]: - """Return POSIX-relative paths for all file-like entries under `dir_tree_root`. - Notes: - - Recurses all levels; includes hidden files. - - Includes symlinks as entries; does NOT traverse into symlinked directories. - - Returns paths relative to `dir_tree_root` (no leading slash). - """ - read_file_relative_path_list: list[Path] = [] - for p in dir_tree_root.rglob("*"): - try: - if p.is_symlink() or p.is_file(): - read_file_relative_path_list.append(p.relative_to(dir_tree_root)) - except FileNotFoundError: - continue - return sorted(read_file_relative_path_list,key=lambda x: x.as_posix()) - -def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool: - s = rel.as_posix() - return any(r.search(s) for r in ignore_res) - -def _mode_octal(path: Path)-> str: - try: - return f"0{(path.stat().st_mode & 0o777):03o}" - except FileNotFoundError: - return "0644" - -def _cmd_install(src: Path ,dst: Path)-> str: - return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}" - -def _cmd_symlink(target: str ,dst: Path)-> str: - return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}" - -def _iso_utc()-> str: - return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") - -def _maybe_backup(dst: Path ,backups: list[Path])-> None: - """If dst exists (file or symlink), rename it to dst_ and record it.""" - try: - if dst.exists() or dst.is_symlink(): - ts = _iso_utc() - backup = dst.with_name(dst.name + f"_{ts}") - dst.rename(backup) # preserves owner/mode/xattrs - backups.append(backup) - except FileNotFoundError: - pass - -def _validate(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]: - errs: list[str] = [] - if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0: - errs.append("must run as root (UID 0) to write into the destination root; use sudo") - if not stage.exists(): - errs.append(f"--stage does not exist: {stage}") - elif not stage.is_dir(): - errs.append(f"--stage is not a directory: {stage}") - if not root.exists(): - errs.append(f"--root does not exist: {root}") - elif not root.is_dir(): - errs.append(f"--root is not a directory: {root}") - # regex syntax (report all) - for p in ignore_patterns: - try: - re.compile(p) - except re.error as e: - errs.append(f"--ignore '{p}': invalid regex: {e}") - try: - if stage.resolve() == root.resolve(): - errs.append("--stage and --root resolve to the same path (refusing to copy onto self)") - except Exception: - pass - if not sys.dont_write_bytecode: - errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)") - return errs - -# === core === - -def cp_stage( - stage: Path # directory tree of files to be read, then copied - ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name - ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths - ,assume_yes: bool=False # if yes don't stop to ask if the user really wants to do the copy - ,displaced_target_manifest: Path|None=None # optional file that will list targets displaced (renamed) before writes -)-> tuple[list[Path], list[Path], int, list[Path]]: - """ - Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage, - an optional non-interactive flag, and an optional path to a manifest that records displaced targets. - Does: enumerates all file-like entries under stage (files and symlinks), filters by ignore patterns, shows a plan, - optionally prompts, then for each selected read path ensures the write parent exists, renames any existing - write target to '_YYYYMMDDTHHMMSSZ' (UTC) in place (preserving owner/mode/xattrs), and finally writes: - reproduces symlinks as symlinks or copies regular files via shutil.copy2() and aligns mode. - Returns: (selected_read_rel_paths, ignored_read_rel_paths, writes_completed_count, displaced_targets_list) - """ - stage = stage.resolve() - root = root.resolve() - - read_file_rel_path_list: list[Path] = _read_file_relative_paths_under(stage) - - selected_read_rel_paths: list[Path] = [] - ignored_read_rel_paths: list[Path] = [] - for read_file_rel_path in read_file_rel_path_list: - (ignored_read_rel_paths if _is_ignored(read_file_rel_path ,ignore_patterns) else selected_read_rel_paths).append(read_file_rel_path) - - print(f"Stage: {stage}") - print(f"Root: {root}\n") - print("Backups: if a destination exists, it is first renamed in place to " - "'_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.") - if displaced_target_manifest: - print(f"Backup manifest will be written to: {displaced_target_manifest}") - - print(f"=== Files to be read ({len(selected_read_rel_paths)}) ===") - print("\n".join(p.as_posix() for p in selected_read_rel_paths) if selected_read_rel_paths else "(none)") - - print(f"\n=== Files ignored ({len(ignored_read_rel_paths)}) ===") - print("\n".join(p.as_posix() for p in ignored_read_rel_paths) if ignored_read_rel_paths else "(none)") - - print("") - if not assume_yes: - ans = input("Proceed with copy? [y/N] ").strip().lower() - if ans not in ("y","yes"): - print("Aborted. Nothing copied.") - return selected_read_rel_paths,ignored_read_rel_paths,0,[] - - displaced_targets: list[Path] = [] - writes_completed = 0 - - for read_file_rel_path in selected_read_rel_paths: - read_file_abs_path = stage / read_file_rel_path - write_file_abs_path = root / read_file_rel_path - write_file_abs_path.parent.mkdir(parents=True ,exist_ok=True) - - # backup if target exists (rename in place with UTC suffix) - _maybe_backup(write_file_abs_path ,displaced_targets) - - if read_file_abs_path.is_symlink(): - link_target = os.readlink(read_file_abs_path) - print(f"+ {_cmd_symlink(link_target ,write_file_abs_path)}") - os.symlink(link_target ,write_file_abs_path) - writes_completed += 1 - continue - - print(f"+ {_cmd_install(read_file_abs_path ,write_file_abs_path)} # (exec: copy2)") - shutil.copy2(read_file_abs_path ,write_file_abs_path) - try: - os.chmod(write_file_abs_path ,read_file_abs_path.stat().st_mode & 0o777) - except Exception: - pass - writes_completed += 1 - - # optional manifest write (list displaced targets, one per line) - if displaced_target_manifest: - try: - displaced_target_manifest.parent.mkdir(parents=True ,exist_ok=True) - with open(displaced_target_manifest ,"w" ,encoding="utf-8") as fh: - for p in displaced_targets: - fh.write(str(p) + "\n") - print(f"\nBackup manifest: {displaced_target_manifest} ({len(displaced_targets)} entr{'y' if len(displaced_targets)==1 else 'ies'})") - except Exception as e: - print(f"warn: could not write manifest {displaced_target_manifest}: {e}" ,file=sys.stderr) - - print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.") - if displaced_targets: - print("Backups:") - for p in displaced_targets: - print(f" - {p}") - - return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets - - -def main(argv: list[str] | None=None)-> int: - ap = argparse.ArgumentParser( - prog="cp_stage.py" - ,description="Copy staged files into a destination root, after showing the exact plan." - ) - ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") - ap.add_argument("--root" , default="/" ,help="destination root (default: /)") - ap.add_argument("--ignore",action="append",default=[] - ,help="regex matched against POSIX-style relative paths under --stage; can be repeated") - ap.add_argument("--displaced_target_manifest" ,default=None - ,help="write list of backup files to this path (optional)") - - ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt") - - args = ap.parse_args(argv) - - stage = Path(args.stage) - root = Path(args.root) - displaced_target_manifest = Path(args.displaced_target_manifest) if args.displaced_target_manifest else None - - # validate against raw patterns (strings) - errs = _validate(stage ,root ,args.ignore ,require_root=True) - if errs: - print("error(s):" ,file=sys.stderr) - for e in errs: - print(f" - {e}" ,file=sys.stderr) - return 2 - - # compile patterns - ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore] - - try: - cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes ,displaced_target_manifest=displaced_target_manifest) - return 0 - except KeyboardInterrupt: - print( - "\nKeyboard interrupt — copy may be left in a partial state.\n" - "Some targets might have been backed up or replaced; review outputs before retrying.", - file=sys.stderr, - ) - return 130 - except Exception as e: - print(f"error: {e}" ,file=sys.stderr) - return 2 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/developer/source/DNS/stage b/developer/source/DNS/stage new file mode 120000 index 0000000..6b42c53 --- /dev/null +++ b/developer/source/DNS/stage @@ -0,0 +1 @@ +stage_test_0/ \ No newline at end of file diff --git a/developer/source/DNS/stage/etc/systemd/system/unbound@.service b/developer/source/DNS/stage/etc/systemd/system/unbound@.service deleted file mode 100644 index ba2919b..0000000 --- a/developer/source/DNS/stage/etc/systemd/system/unbound@.service +++ /dev/null @@ -1,19 +0,0 @@ -[Unit] -Description=Unbound DNS instance for %i (per-subu tunnel egress) -After=network-online.target wg-quick@%i.service -Requires=wg-quick@%i.service -Wants=network-online.target - -[Service] -Type=simple -ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf -User=unbound -Group=unbound -Restart=on-failure -RestartSec=2s -AmbientCapabilities=CAP_NET_BIND_SERVICE -CapabilityBoundingSet=CAP_NET_BIND_SERVICE -NoNewPrivileges=true - -[Install] -WantedBy=multi-user.target diff --git a/developer/source/DNS/stage/etc/unbound/unbound-US.conf b/developer/source/DNS/stage/etc/unbound/unbound-US.conf deleted file mode 100644 index 1995438..0000000 --- a/developer/source/DNS/stage/etc/unbound/unbound-US.conf +++ /dev/null @@ -1,18 +0,0 @@ -server: - username: "unbound" - chroot: "" - directory: "/etc/unbound" - do-daemonize: no - interface: 127.0.0.1@5301 - hide-identity: yes - hide-version: yes - harden-glue: yes - harden-dnssec-stripped: yes - qname-minimisation: yes - prefetch: yes - outgoing-interface: 10.0.0.1 - -forward-zone: - name: "." - forward-addr: 1.1.1.1 - forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/stage/etc/unbound/unbound-x6.conf b/developer/source/DNS/stage/etc/unbound/unbound-x6.conf deleted file mode 100644 index ed49241..0000000 --- a/developer/source/DNS/stage/etc/unbound/unbound-x6.conf +++ /dev/null @@ -1,18 +0,0 @@ -server: - username: "unbound" - chroot: "" - directory: "/etc/unbound" - do-daemonize: no - interface: 127.0.0.1@5302 - hide-identity: yes - hide-version: yes - harden-glue: yes - harden-dnssec-stripped: yes - qname-minimisation: yes - prefetch: yes - outgoing-interface: 10.8.0.2 - -forward-zone: - name: "." - forward-addr: 1.1.1.1 - forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/stage/usr/local/sbin/DNS_status.sh b/developer/source/DNS/stage/usr/local/sbin/DNS_status.sh deleted file mode 100755 index d4db58e..0000000 --- a/developer/source/DNS/stage/usr/local/sbin/DNS_status.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail -echo "== DNS status ==" -systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true -echo -echo "== nftables ==" -nft list table inet NAT-DNS-REDIRECT || true -echo -echo "== Unbound logs (last 50 lines each) ==" -journalctl -u unbound@US -n 50 --no-pager || true -echo -journalctl -u unbound@x6 -n 50 --no-pager || true diff --git a/developer/source/DNS/stage_ls.py b/developer/source/DNS/stage_ls.py new file mode 100755 index 0000000..93dd3d2 --- /dev/null +++ b/developer/source/DNS/stage_ls.py @@ -0,0 +1,193 @@ +#!/usr/bin/env -S python3 -B +""" +ls_stage.py — list staged files and their header-declared install metadata. + +Header line format (first line of each file): + + +- owner: username string (need not exist until install time) +- permissions: four octal digits, e.g. 0644 +- write_file_name: '.' means use the read file's basename, else use the given POSIX filename +- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound) + +Output formats: +- list (default): "read_file_path: owner permissions write_file_name target_directory_path" +- table: columns aligned for readability +""" + +from __future__ import annotations + +# never write bytecode (root/sudo friendly) +import sys ,os +sys.dont_write_bytecode = True +os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1") + +from dataclasses import dataclass +from pathlib import Path +import argparse +import re + +# === Stage utilities (importable) === + +def stage_read_file_paths(stage_root: Path)-> list[Path]: + """Given: stage_root directory. + Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root. + Returns: list[Path] of POSIX-order sorted relative paths (no leading slash). + """ + rels: list[Path] = [] + for p in stage_root.rglob("*"): + try: + if p.is_file(): # follows symlink-to-file + rels.append(p.relative_to(stage_root)) + except (FileNotFoundError ,RuntimeError): + # broken link or race; skip conservatively + continue + return sorted(rels ,key=lambda x: x.as_posix()) + +@dataclass +class StageRow: + read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged") + owner: str # token[0] + perm_octal_str: str # token[1], exactly as in header (validated ####) + perm_int: int # token[1] parsed as base-8 + write_name: str # token[2] ('.' resolved to read_rel.name) + target_dir: Path # token[3] (Path) + header_raw: str # original header line (sans newline) + + # convenience + def write_abs(self ,root: Path)-> Path: + return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name + +# header parsing rules +_PERM_RE = re.compile(r"^[0-7]{4}$") + +def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]: + """Given: raw first line of a staged file and its stage-relative path. + Does: parse ' ' with max 4 tokens (target_dir may contain spaces if quoted not required). + Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem. + """ + # strip BOM and trailing newline/spaces + h = header.lstrip("\ufeff").strip() + if not h: + return None ,f"empty header line in {read_rel}" + parts = h.split(maxsplit=3) + if len(parts) != 4: + return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}" + owner ,perm_s ,write_name ,target_dir_s = parts + + if not _PERM_RE.fullmatch(perm_s): + return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits" + + # resolve '.' → basename + resolved_write_name = read_rel.name if write_name == "." else write_name + + # MVP guard: write_name should be a single filename (no '/') + if "/" in resolved_write_name: + return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}" + + # target dir may be absolute (recommended) or relative (we treat relative as under the install root) + target_dir = Path(target_dir_s) + + try: + row = StageRow( + read_rel = read_rel + ,owner = owner + ,perm_octal_str = perm_s + ,perm_int = int(perm_s ,8) + ,write_name = resolved_write_name + ,target_dir = target_dir + ,header_raw = h + ) + return row ,None + except Exception as e: + return None ,f"internal parse error in {read_rel}: {e}" + +def read_first_line(p: Path)-> str: + """Return the first line (sans newline). UTF-8 with BOM tolerant.""" + with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh: + line = fh.readline() + return line.rstrip("\n\r") + +def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]: + """Given: stage_root. + Does: enumerate files, parse each header line, collect rows and errors. + Returns: (rows, errors) + """ + rows: list[StageRow] = [] + errs: list[str] = [] + for rel in stage_read_file_paths(stage_root): + abs_path = stage_root / rel + try: + header = read_first_line(abs_path) + except Exception as e: + errs.append(f"read error in {rel}: {e}") + continue + row ,err = parse_stage_header_line(header ,rel) + if err: + errs.append(err) + else: + rows.append(row) # type: ignore[arg-type] + return rows ,errs + +# === Printers === + +def print_list(rows: list[StageRow])-> None: + """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line.""" + for r in rows: + print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}") + +def print_table(rows: list[StageRow])-> None: + """Aligned table printer (no headers, just data in columns).""" + if not rows: + return + a = [r.read_rel.as_posix() for r in rows] + b = [r.owner for r in rows] + c = [r.perm_octal_str for r in rows] + d = [r.write_name for r in rows] + e = [str(r.target_dir) for r in rows] + wa = max(len(s) for s in a) + wb = max(len(s) for s in b) + wc = max(len(s) for s in c) + wd = max(len(s) for s in d) + # e (target_dir) left ragged + for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e): + print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}") + +# === Orchestrator === + +def ls_stage(stage_root: Path ,fmt: str="list")-> int: + """Given: stage_root and output format ('list'|'table'). + Does: scan and parse staged files, print in the requested format; report syntax errors to stderr. + Returns: 0 on success; 1 if any syntax errors were encountered. + """ + rows ,errs = scan_stage(stage_root) + if fmt == "table": + print_table(rows) + else: + print_list(rows) + if errs: + print("\nerror(s):" ,file=sys.stderr) + for e in errs: + print(f" - {e}" ,file=sys.stderr) + return 1 + return 0 + +# === CLI === + +def main(argv: list[str] | None=None)-> int: + ap = argparse.ArgumentParser( + prog="ls_stage.py" + ,description="List staged files and their header-declared install metadata." + ) + ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)") + ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list" + ,help="output format (default: list)") + args = ap.parse_args(argv) + stage_root = Path(args.stage) + if not stage_root.exists() or not stage_root.is_dir(): + print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr) + return 2 + return ls_stage(stage_root ,fmt=args.format) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft b/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft new file mode 100644 index 0000000..eaee5be --- /dev/null +++ b/developer/source/DNS/stage_orig/etc/nftables.d/10-block-IPv6.nft @@ -0,0 +1,16 @@ +table inet NO-IPV6 { + chain input { + type filter hook input priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop + } + + chain output { + type filter hook output priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop + } + + chain forward { + type filter hook forward priority raw; policy accept; + meta nfproto ipv6 counter comment "drop all IPv6 forward" drop + } +} diff --git a/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft b/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft new file mode 100644 index 0000000..6c31446 --- /dev/null +++ b/developer/source/DNS/stage_orig/etc/nftables.d/20-SUBU-ports.nft @@ -0,0 +1,47 @@ +table inet SUBU-DNS-REDIRECT { + chain output { + type nat hook output priority -100; policy accept; + + # Redirect DNS for the subu UIDs to local Unbound listeners + meta skuid 2017 udp dport 53 redirect to :5301 + meta skuid 2018 udp dport 53 redirect to :5302 + meta skuid 2017 tcp dport 53 redirect to :5301 + meta skuid 2018 tcp dport 53 redirect to :5302 + } +} + +table inet SUBU-PORT-EGRESS { + chain output { + type filter hook output priority 0; policy accept; + + # Always allow loopback on egress + oifname "lo" accept + + # No IPv6 for subu (until you reintroduce v6) + meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop + + ##### x6 (UID 2018) + # Block some exfil channels regardless of iface + meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop + meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop + meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop + + # (Optional) allow ICMP echo out via x6 + meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept + + # Enforce interface binding + meta skuid 2018 oifname "x6" accept + meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop + + ##### US (UID 2017) + meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission" + meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN" + meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)" + + # (Optional) ICMP via US + meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept + + meta skuid 2017 oifname "US" accept + meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop + } +} diff --git a/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service b/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service new file mode 100644 index 0000000..ba2919b --- /dev/null +++ b/developer/source/DNS/stage_orig/etc/systemd/system/unbound@.service @@ -0,0 +1,19 @@ +[Unit] +Description=Unbound DNS instance for %i (per-subu tunnel egress) +After=network-online.target wg-quick@%i.service +Requires=wg-quick@%i.service +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf +User=unbound +Group=unbound +Restart=on-failure +RestartSec=2s +AmbientCapabilities=CAP_NET_BIND_SERVICE +CapabilityBoundingSet=CAP_NET_BIND_SERVICE +NoNewPrivileges=true + +[Install] +WantedBy=multi-user.target diff --git a/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf b/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf new file mode 100644 index 0000000..1995438 --- /dev/null +++ b/developer/source/DNS/stage_orig/etc/unbound/unbound-US.conf @@ -0,0 +1,18 @@ +server: + username: "unbound" + chroot: "" + directory: "/etc/unbound" + do-daemonize: no + interface: 127.0.0.1@5301 + hide-identity: yes + hide-version: yes + harden-glue: yes + harden-dnssec-stripped: yes + qname-minimisation: yes + prefetch: yes + outgoing-interface: 10.0.0.1 + +forward-zone: + name: "." + forward-addr: 1.1.1.1 + forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf b/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf new file mode 100644 index 0000000..ed49241 --- /dev/null +++ b/developer/source/DNS/stage_orig/etc/unbound/unbound-x6.conf @@ -0,0 +1,18 @@ +server: + username: "unbound" + chroot: "" + directory: "/etc/unbound" + do-daemonize: no + interface: 127.0.0.1@5302 + hide-identity: yes + hide-version: yes + harden-glue: yes + harden-dnssec-stripped: yes + qname-minimisation: yes + prefetch: yes + outgoing-interface: 10.8.0.2 + +forward-zone: + name: "." + forward-addr: 1.1.1.1 + forward-addr: 1.0.0.1 diff --git a/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh b/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh new file mode 100755 index 0000000..d4db58e --- /dev/null +++ b/developer/source/DNS/stage_orig/usr/local/sbin/DNS_status.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail +echo "== DNS status ==" +systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true +echo +echo "== nftables ==" +nft list table inet NAT-DNS-REDIRECT || true +echo +echo "== Unbound logs (last 50 lines each) ==" +journalctl -u unbound@US -n 50 --no-pager || true +echo +journalctl -u unbound@x6 -n 50 --no-pager || true diff --git a/developer/source/DNS/stage_test_0/a.conf b/developer/source/DNS/stage_test_0/a.conf new file mode 100644 index 0000000..a3153de --- /dev/null +++ b/developer/source/DNS/stage_test_0/a.conf @@ -0,0 +1 @@ +Thomas-developer 0x444 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/b.conf b/developer/source/DNS/stage_test_0/b.conf new file mode 100644 index 0000000..9e8fc11 --- /dev/null +++ b/developer/source/DNS/stage_test_0/b.conf @@ -0,0 +1 @@ +Thomas-developer 0640 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/c.conf b/developer/source/DNS/stage_test_0/c.conf new file mode 100644 index 0000000..7d4330b --- /dev/null +++ b/developer/source/DNS/stage_test_0/c.conf @@ -0,0 +1 @@ +Thomas-developer 0444 . stage_test_0_out \ No newline at end of file diff --git a/developer/source/DNS/stage_test_0/name_space/c.conf b/developer/source/DNS/stage_test_0/name_space/c.conf new file mode 100644 index 0000000..7d4330b --- /dev/null +++ b/developer/source/DNS/stage_test_0/name_space/c.conf @@ -0,0 +1 @@ +Thomas-developer 0444 . stage_test_0_out \ No newline at end of file