+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-cp_stage.py — copy a staged tree into a root, after showing exactly what will happen.
-
-Behavior:
- - Shows plan (to-copy / ignored) before executing.
- - Requires root (UID 0) by default.
- - Recreates symlinks as symlinks; regular files via shutil.copy2().
- - If a destination exists, it is first renamed in-place to '<name>_YYYYMMDDTHHMMSSZ'.
- The backup preserves the original owner/mode/xattrs.
- - Prints shell-equivalent commands for each operation.
-"""
-
-from __future__ import annotations
-
-# no bytecode anywhere (works under sudo/root shells too)
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse
-import datetime as _dt
-import re
-import shutil
-import shlex as _shlex
-
-# === tiny helpers ===
-
-def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]:
- """Return POSIX-relative paths for all file-like entries under `dir_tree_root`.
- Notes:
- - Recurses all levels; includes hidden files.
- - Includes symlinks as entries; does NOT traverse into symlinked directories.
- - Returns paths relative to `dir_tree_root` (no leading slash).
- """
- read_file_relative_path_list: list[Path] = []
- for p in dir_tree_root.rglob("*"):
- try:
- if p.is_symlink() or p.is_file():
- read_file_relative_path_list.append(p.relative_to(dir_tree_root))
- except FileNotFoundError:
- continue
- return sorted(read_file_relative_path_list,key=lambda x: x.as_posix())
-
-def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool:
- s = rel.as_posix()
- return any(r.search(s) for r in ignore_res)
-
-def _mode_octal(path: Path)-> str:
- try:
- return f"0{(path.stat().st_mode & 0o777):03o}"
- except FileNotFoundError:
- return "0644"
-
-def _cmd_install(src: Path ,dst: Path)-> str:
- return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}"
-
-def _cmd_symlink(target: str ,dst: Path)-> str:
- return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}"
-
-def _iso_utc()-> str:
- return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def _maybe_backup(dst: Path ,backups: list[Path])-> None:
- """If dst exists (file or symlink), rename it to dst_<ISO> and record it."""
- try:
- if dst.exists() or dst.is_symlink():
- ts = _iso_utc()
- backup = dst.with_name(dst.name + f"_{ts}")
- dst.rename(backup) # preserves owner/mode/xattrs
- backups.append(backup)
- except FileNotFoundError:
- pass
-
-def _validate_main_args(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]:
- errs: list[str] = []
- if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0:
- errs.append("must run as root (UID 0) to write into the destination root; use sudo")
- if not stage.exists():
- errs.append(f"--stage does not exist: {stage}")
- elif not stage.is_dir():
- errs.append(f"--stage is not a directory: {stage}")
- if not root.exists():
- errs.append(f"--root does not exist: {root}")
- elif not root.is_dir():
- errs.append(f"--root is not a directory: {root}")
- # regex syntax (report all)
- for p in ignore_patterns:
- try:
- re.compile(p)
- except re.error as e:
- errs.append(f"--ignore '{p}': invalid regex: {e}")
- try:
- if stage.resolve() == root.resolve():
- errs.append("--stage and --root resolve to the same path (refusing to copy onto self)")
- except Exception:
- pass
- if not sys.dont_write_bytecode:
- errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)")
- return errs
-
-
-def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]:
- """Given: dir_tree_root.
- Does: walk all levels under dir_tree_root, collect file-like entries (files or symlinks),
- return their paths relative to dir_tree_root in POSIX form ordering.
- Returns: list of relative Path objects (no leading slash), sorted lexicographically by as_posix().
- """
- read_file_relative_path_list: list[Path] = []
- for p in dir_tree_root.rglob("*"):
- try:
- if p.is_symlink() or p.is_file():
- read_file_relative_path_list.append(p.relative_to(dir_tree_root))
- except FileNotFoundError:
- continue
- return sorted(read_file_relative_path_list,key=lambda x: x.as_posix())
-
-def _select_read_paths(read_file_rel_path_list: list[Path],ignore_patterns: list[re.Pattern[str]]
-)-> tuple[list[Path], list[Path]]:
- """Given: stage-relative read paths, compiled ignore patterns (matched against POSIX-style relpaths).
- Does: partition into selected vs ignored.
- Returns: (selected_read_rel_paths,ignored_read_rel_paths).
- """
- selected: list[Path] = []
- ignored: list[Path] = []
- for rel in read_file_rel_path_list:
- (ignored if _is_ignored(rel,ignore_patterns) else selected).append(rel)
- return selected,ignored
-
-def _print_install_plan(stage: Path,root: Path,selected: list[Path],ignored: list[Path]-> None:
- """Given: stage,root,selected,ignored
- Does: print plan and notes about displaced-target policy.
- Returns: None.
- """
- print(f"Stage: {stage}")
- print(f"Root: {root}\n")
- print("If a file already exists at a write point, it is first renamed in place to "
- "'<name>_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.")
- print(f"=== Files to be read ({len(selected)}) ===")
- print("\n".join(p.as_posix() for p in selected) if selected else "(none)")
- print(f"\n=== Files ignored ({len(ignored)}) ===")
- print("\n".join(p.as_posix() for p in ignored) if ignored else "(none)")
-
-def _partition_by_kind(stage: Path,selected_read_rel_paths: list[Path]
-)-> tuple[list[Path], list[Path]]:
- """Given: stage root and selected stage-relative read paths.
- Does: partition into regular files vs symlinks (based on the staged items).
- Returns: (regular_read_rel_paths,symlink_read_rel_paths).
- """
- regular: list[Path] = []
- symlinks: list[Path] = []
- for rel in selected_read_rel_paths:
- (symlinks if (stage/rel).is_symlink() else regular).append(rel)
- return regular,symlinks
-
-def _rewrite_symlink_target(link_target: str,stage: Path,root: Path)-> tuple[str,bool]:
- """Given: textual symlink target from a staged link, and the resolved stage/root.
- Does: if target is absolute and under stage, rewrite to analogous path under root;
- otherwise preserve (absolute outside stage or relative).
- Returns: (new_target_text,is_rewritten).
- """
- try:
- if link_target.startswith("/"):
- stage_str = str(stage)
- if link_target == stage_str or link_target.startswith(stage_str + os.sep):
- rel_str = os.path.relpath(link_target,stage_str)
- return str(root/rel_str),True
- return link_target,False
- else:
- return link_target,False # relative: preserve
- except Exception:
- return link_target,False
-
-def _copy_regular_files(regular_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path])-> int:
- """Given: stage-rooted regular read relpaths, stage, root, and displaced_targets accumulator.
- Does: ensure parents, displace existing targets (rename with UTC suffix), copy via copy2, align mode.
- Returns: number of write operations completed.
- """
- writes = 0
- for rel in regular_read_rel_paths:
- read_abs = stage/rel
- write_abs = root/rel
- write_abs.parent.mkdir(parents=True,exist_ok=True)
- _maybe_backup(write_abs,displaced_targets)
- print(f"+ {_cmd_install(read_abs,write_abs)} # (exec: copy2)")
- shutil.copy2(read_abs,write_abs)
- try:
- os.chmod(write_abs,read_abs.stat().st_mode & 0o777)
- except Exception:
- pass
- writes += 1
- return writes
-
-def _install_symlinks(symlink_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path]
-)-> tuple[int,list[tuple[Path,str]]]:
- """Given: staged symlink relpaths, stage, root, and displaced_targets accumulator.
- Does: ensure parents, displace existing targets, recreate symlinks; rewrite stage-absolute targets to root.
- Returns: (writes_completed,post_link_checks[(installed_path,target_text),...]).
- """
- writes = 0
- post_checks: list[tuple[Path,str]] = []
- for rel in symlink_read_rel_paths:
- read_link_abs = stage/rel
- write_link_abs = root/rel
- write_link_abs.parent.mkdir(parents=True,exist_ok=True)
- _maybe_backup(write_link_abs,displaced_targets)
- target_txt = os.readlink(read_link_abs)
- new_txt,re_written = _rewrite_symlink_target(target_txt,stage,root)
- note = " # rewritten from stage-absolute" if re_written else ""
- print(f"+ {_cmd_symlink(new_txt,write_link_abs)}{note}")
- os.symlink(new_txt,write_link_abs)
- writes += 1
- post_checks.append((write_link_abs,new_txt))
- return writes,post_checks
-
-def _report_broken_symlinks(post_link_checks: list[tuple[Path,str]])-> list[Path]:
- """Given: list of (installed_link_path,target_text).
- Does: detect links whose targets do not exist at install time; prints warnings.
- Returns: list of broken link paths.
- """
- broken: list[Path] = []
- for link_path,target_txt in post_link_checks:
- try:
- exists = Path(target_txt).exists() if target_txt.startswith("/") else (link_path.parent/target_txt).exists()
- if not exists:
- broken.append(link_path)
- except Exception:
- broken.append(link_path)
- if broken:
- print("\nwarn: the following installed symlinks do not resolve at install time:")
- for p in broken:
- print(f" - {p}")
- return broken
-
-
-# === core ===
-
-def cp_stage(
- stage: Path # directory tree of files to be read, then copied
- ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name
- ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths
- ,assume_yes: bool=False # prompt short-circuit
-)-> tuple[list[Path], list[Path], int, list[Path]]:
- """
- Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage,
- and a non-interactive flag.
- Does: enumerates stage file-like entries, filters by ignores, shows a plan, optionally prompts, then performs
- writes in two phases: regular files first (copy2), then symlinks (with stage→root rewriting of absolute targets).
- Pre-existing targets are displaced with an in-place rename to '<name>_YYYYMMDDTHHMMSSZ' (UTC).
- Returns: (selected_read_rel_paths,ignored_read_rel_paths,writes_completed_count,displaced_targets_list)
- """
- stage = stage.resolve()
- root = root.resolve()
-
- read_file_rel_path_list = _read_file_relative_paths_under(stage)
- selected_read_rel_paths,ignored_read_rel_paths = _select_read_paths(read_file_rel_path_list,ignore_patterns)
-
- _print_install_plan(stage,root,selected_read_rel_paths,ignored_read_rel_paths)
- if not assume_yes:
- print("")
- ans = input("Proceed with copy? [y/N] ").strip().lower()
- if ans not in ("y","yes"):
- print("Aborted. Nothing copied.")
- return selected_read_rel_paths,ignored_read_rel_paths,0,[]
-
- regular_read_rel_paths,symlink_read_rel_paths = _partition_by_kind(stage,selected_read_rel_paths)
-
- displaced_targets: list[Path] = []
- writes_completed = 0
-
- writes_completed += _copy_regular_files(regular_read_rel_paths,stage,root,displaced_targets)
- link_writes,post_link_checks = _install_symlinks(symlink_read_rel_paths,stage,root,displaced_targets)
- writes_completed += link_writes
-
- _report_broken_symlinks(post_link_checks)
-
- print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.")
- if displaced_targets:
- print("These files were displaced and renamed:")
- for p in displaced_targets:
- print(f" - {p}")
-
- return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets
-
-
-def main(argv: list[str] | None=None)-> int:
- ap = argparse.ArgumentParser(
- prog="cp_stage.py"
- ,description="Copy staged files into a destination root, after showing the exact plan."
- )
- ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
- ap.add_argument("--root" , default="/" ,help="destination root (default: /)")
- ap.add_argument("--ignore",action="append",default=[]
- ,help="regex matched against POSIX-style relative paths under --stage; can be repeated")
-
- ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt")
-
- args = ap.parse_args(argv)
-
- stage = Path(args.stage)
- root = Path(args.root)
-
- # validate against raw patterns (strings)
- errs = _validate_main_args(stage ,root ,args.ignore ,require_root=True)
- if errs:
- print("error(s):" ,file=sys.stderr)
- for e in errs:
- print(f" - {e}" ,file=sys.stderr)
- return 2
-
- # compile patterns
- ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore]
-
- try:
- cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes)
- return 0
- except KeyboardInterrupt:
- print(
- "\nKeyboard interrupt — copy may be left in a partial state.\n"
- "Some targets might have been backed up or replaced; review outputs before retrying.",
- file=sys.stderr,
- )
- return 130
- except Exception as e:
- print(f"error: {e}" ,file=sys.stderr)
- return 2
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-cp_stage.py — copy a staged tree into a root, after showing exactly what will happen.
-
-Behavior:
- - Shows plan (to-copy / ignored) before executing.
- - Requires root (UID 0) by default.
- - Recreates symlinks as symlinks; regular files via shutil.copy2().
- - If a destination exists, it is first renamed in-place to '<name>_YYYYMMDDTHHMMSSZ'.
- The backup preserves the original owner/mode/xattrs.
- - Prints shell-equivalent commands for each operation.
-"""
-
-from __future__ import annotations
-
-# no bytecode anywhere (works under sudo/root shells too)
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse
-import datetime as _dt
-import re
-import shutil
-import shlex as _shlex
-
-# === tiny helpers ===
-
-def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]:
- """Return POSIX-relative paths for all file-like entries under `dir_tree_root`.
- Notes:
- - Recurses all levels; includes hidden files.
- - Includes symlinks as entries; does NOT traverse into symlinked directories.
- - Returns paths relative to `dir_tree_root` (no leading slash).
- """
- read_file_relative_path_list: list[Path] = []
- for p in dir_tree_root.rglob("*"):
- try:
- if p.is_symlink() or p.is_file():
- read_file_relative_path_list.append(p.relative_to(dir_tree_root))
- except FileNotFoundError:
- continue
- return sorted(read_file_relative_path_list,key=lambda x: x.as_posix())
-
-def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool:
- s = rel.as_posix()
- return any(r.search(s) for r in ignore_res)
-
-def _mode_octal(path: Path)-> str:
- try:
- return f"0{(path.stat().st_mode & 0o777):03o}"
- except FileNotFoundError:
- return "0644"
-
-def _cmd_install(src: Path ,dst: Path)-> str:
- return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}"
-
-def _cmd_symlink(target: str ,dst: Path)-> str:
- return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}"
-
-def _iso_utc()-> str:
- return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def _maybe_backup(dst: Path ,backups: list[Path])-> None:
- """If dst exists (file or symlink), rename it to dst_<ISO> and record it."""
- try:
- if dst.exists() or dst.is_symlink():
- ts = _iso_utc()
- backup = dst.with_name(dst.name + f"_{ts}")
- dst.rename(backup) # preserves owner/mode/xattrs
- backups.append(backup)
- except FileNotFoundError:
- pass
-
-def _validate_main_args(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]:
- errs: list[str] = []
- if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0:
- errs.append("must run as root (UID 0) to write into the destination root; use sudo")
- if not stage.exists():
- errs.append(f"--stage does not exist: {stage}")
- elif not stage.is_dir():
- errs.append(f"--stage is not a directory: {stage}")
- if not root.exists():
- errs.append(f"--root does not exist: {root}")
- elif not root.is_dir():
- errs.append(f"--root is not a directory: {root}")
- # regex syntax (report all)
- for p in ignore_patterns:
- try:
- re.compile(p)
- except re.error as e:
- errs.append(f"--ignore '{p}': invalid regex: {e}")
- try:
- if stage.resolve() == root.resolve():
- errs.append("--stage and --root resolve to the same path (refusing to copy onto self)")
- except Exception:
- pass
- if not sys.dont_write_bytecode:
- errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)")
- return errs
-
-
-def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]:
- """Given: dir_tree_root.
- Does: walk all levels under dir_tree_root, collect file-like entries (files or symlinks),
- return their paths relative to dir_tree_root in POSIX form ordering.
- Returns: list of relative Path objects (no leading slash), sorted lexicographically by as_posix().
- """
- read_file_relative_path_list: list[Path] = []
- for p in dir_tree_root.rglob("*"):
- try:
- if p.is_symlink() or p.is_file():
- read_file_relative_path_list.append(p.relative_to(dir_tree_root))
- except FileNotFoundError:
- continue
- return sorted(read_file_relative_path_list,key=lambda x: x.as_posix())
-
-def _select_read_paths(read_file_rel_path_list: list[Path],ignore_patterns: list[re.Pattern[str]]
-)-> tuple[list[Path], list[Path]]:
- """Given: stage-relative read paths, compiled ignore patterns (matched against POSIX-style relpaths).
- Does: partition into selected vs ignored.
- Returns: (selected_read_rel_paths,ignored_read_rel_paths).
- """
- selected: list[Path] = []
- ignored: list[Path] = []
- for rel in read_file_rel_path_list:
- (ignored if _is_ignored(rel,ignore_patterns) else selected).append(rel)
- return selected,ignored
-
-def _print_install_plan(stage: Path,root: Path,selected: list[Path],ignored: list[Path]-> None:
- """Given: stage,root,selected,ignored
- Does: print plan and notes about displaced-target policy.
- Returns: None.
- """
- print(f"Stage: {stage}")
- print(f"Root: {root}\n")
- print("If a file already exists at a write point, it is first renamed in place to "
- "'<name>_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.")
- print(f"=== Files to be read ({len(selected)}) ===")
- print("\n".join(p.as_posix() for p in selected) if selected else "(none)")
- print(f"\n=== Files ignored ({len(ignored)}) ===")
- print("\n".join(p.as_posix() for p in ignored) if ignored else "(none)")
-
-def _partition_by_kind(stage: Path,selected_read_rel_paths: list[Path]
-)-> tuple[list[Path], list[Path]]:
- """Given: stage root and selected stage-relative read paths.
- Does: partition into regular files vs symlinks (based on the staged items).
- Returns: (regular_read_rel_paths,symlink_read_rel_paths).
- """
- regular: list[Path] = []
- symlinks: list[Path] = []
- for rel in selected_read_rel_paths:
- (symlinks if (stage/rel).is_symlink() else regular).append(rel)
- return regular,symlinks
-
-def _rewrite_symlink_target(link_target: str,stage: Path,root: Path)-> tuple[str,bool]:
- """Given: textual symlink target from a staged link, and the resolved stage/root.
- Does: if target is absolute and under stage, rewrite to analogous path under root;
- otherwise preserve (absolute outside stage or relative).
- Returns: (new_target_text,is_rewritten).
- """
- try:
- if link_target.startswith("/"):
- stage_str = str(stage)
- if link_target == stage_str or link_target.startswith(stage_str + os.sep):
- rel_str = os.path.relpath(link_target,stage_str)
- return str(root/rel_str),True
- return link_target,False
- else:
- return link_target,False # relative: preserve
- except Exception:
- return link_target,False
-
-def _copy_regular_files(regular_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path])-> int:
- """Given: stage-rooted regular read relpaths, stage, root, and displaced_targets accumulator.
- Does: ensure parents, displace existing targets (rename with UTC suffix), copy via copy2, align mode.
- Returns: number of write operations completed.
- """
- writes = 0
- for rel in regular_read_rel_paths:
- read_abs = stage/rel
- write_abs = root/rel
- write_abs.parent.mkdir(parents=True,exist_ok=True)
- _maybe_backup(write_abs,displaced_targets)
- print(f"+ {_cmd_install(read_abs,write_abs)} # (exec: copy2)")
- shutil.copy2(read_abs,write_abs)
- try:
- os.chmod(write_abs,read_abs.stat().st_mode & 0o777)
- except Exception:
- pass
- writes += 1
- return writes
-
-def _install_symlinks(symlink_read_rel_paths: list[Path],stage: Path,root: Path,displaced_targets: list[Path]
-)-> tuple[int,list[tuple[Path,str]]]:
- """Given: staged symlink relpaths, stage, root, and displaced_targets accumulator.
- Does: ensure parents, displace existing targets, recreate symlinks; rewrite stage-absolute targets to root.
- Returns: (writes_completed,post_link_checks[(installed_path,target_text),...]).
- """
- writes = 0
- post_checks: list[tuple[Path,str]] = []
- for rel in symlink_read_rel_paths:
- read_link_abs = stage/rel
- write_link_abs = root/rel
- write_link_abs.parent.mkdir(parents=True,exist_ok=True)
- _maybe_backup(write_link_abs,displaced_targets)
- target_txt = os.readlink(read_link_abs)
- new_txt,re_written = _rewrite_symlink_target(target_txt,stage,root)
- note = " # rewritten from stage-absolute" if re_written else ""
- print(f"+ {_cmd_symlink(new_txt,write_link_abs)}{note}")
- os.symlink(new_txt,write_link_abs)
- writes += 1
- post_checks.append((write_link_abs,new_txt))
- return writes,post_checks
-
-def _report_broken_symlinks(post_link_checks: list[tuple[Path,str]])-> list[Path]:
- """Given: list of (installed_link_path,target_text).
- Does: detect links whose targets do not exist at install time; prints warnings.
- Returns: list of broken link paths.
- """
- broken: list[Path] = []
- for link_path,target_txt in post_link_checks:
- try:
- exists = Path(target_txt).exists() if target_txt.startswith("/") else (link_path.parent/target_txt).exists()
- if not exists:
- broken.append(link_path)
- except Exception:
- broken.append(link_path)
- if broken:
- print("\nwarn: the following installed symlinks do not resolve at install time:")
- for p in broken:
- print(f" - {p}")
- return broken
-
-
-# === core ===
-
-def cp_stage(
- stage: Path # directory tree of files to be read, then copied
- ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name
- ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths
- ,assume_yes: bool=False # prompt short-circuit
-)-> tuple[list[Path], list[Path], int, list[Path]]:
- """
- Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage,
- and a non-interactive flag.
- Does: enumerates stage file-like entries, filters by ignores, shows a plan, optionally prompts, then performs
- writes in two phases: regular files first (copy2), then symlinks (with stage→root rewriting of absolute targets).
- Pre-existing targets are displaced with an in-place rename to '<name>_YYYYMMDDTHHMMSSZ' (UTC).
- Returns: (selected_read_rel_paths,ignored_read_rel_paths,writes_completed_count,displaced_targets_list)
- """
- stage = stage.resolve()
- root = root.resolve()
-
- read_file_rel_path_list = _read_file_relative_paths_under(stage)
- selected_read_rel_paths,ignored_read_rel_paths = _select_read_paths(read_file_rel_path_list,ignore_patterns)
-
- _print_install_plan(stage,root,selected_read_rel_paths,ignored_read_rel_paths)
- if not assume_yes:
- print("")
- ans = input("Proceed with copy? [y/N] ").strip().lower()
- if ans not in ("y","yes"):
- print("Aborted. Nothing copied.")
- return selected_read_rel_paths,ignored_read_rel_paths,0,[]
-
- regular_read_rel_paths,symlink_read_rel_paths = _partition_by_kind(stage,selected_read_rel_paths)
-
- displaced_targets: list[Path] = []
- writes_completed = 0
-
- writes_completed += _copy_regular_files(regular_read_rel_paths,stage,root,displaced_targets)
- link_writes,post_link_checks = _install_symlinks(symlink_read_rel_paths,stage,root,displaced_targets)
- writes_completed += link_writes
-
- _report_broken_symlinks(post_link_checks)
-
- print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.")
- if displaced_targets:
- print("These files were displaced and renamed:")
- for p in displaced_targets:
- print(f" - {p}")
-
- return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets
-
-
-def main(argv: list[str] | None=None)-> int:
- ap = argparse.ArgumentParser(
- prog="cp_stage.py"
- ,description="Copy staged files into a destination root, after showing the exact plan."
- )
- ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
- ap.add_argument("--root" , default="/" ,help="destination root (default: /)")
- ap.add_argument("--ignore",action="append",default=[]
- ,help="regex matched against POSIX-style relative paths under --stage; can be repeated")
-
- ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt")
-
- args = ap.parse_args(argv)
-
- stage = Path(args.stage)
- root = Path(args.root)
-
- # validate against raw patterns (strings)
- errs = _validate_main_args(stage ,root ,args.ignore ,require_root=True)
- if errs:
- print("error(s):" ,file=sys.stderr)
- for e in errs:
- print(f" - {e}" ,file=sys.stderr)
- return 2
-
- # compile patterns
- ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore]
-
- try:
- cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes)
- return 0
- except KeyboardInterrupt:
- print(
- "\nKeyboard interrupt — copy may be left in a partial state.\n"
- "Some targets might have been backed up or replaced; review outputs before retrying.",
- file=sys.stderr,
- )
- return 130
- except Exception as e:
- print(f"error: {e}" ,file=sys.stderr)
- return 2
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-#!/usr/bin/env -S python3 -B
-"""
-cp_stage.py — copy a staged tree into a root, after showing exactly what will happen.
-
-Behavior:
- - Shows plan (to-copy / ignored) before executing.
- - Requires root (UID 0) by default.
- - Recreates symlinks as symlinks; regular files via shutil.copy2().
- - If a destination exists, it is first renamed in-place to '<name>_YYYYMMDDTHHMMSSZ'.
- The backup preserves the original owner/mode/xattrs.
- - Prints shell-equivalent commands for each operation.
-"""
-
-from __future__ import annotations
-
-# no bytecode anywhere (works under sudo/root shells too)
-import sys ,os
-sys.dont_write_bytecode = True
-os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
-
-from pathlib import Path
-import argparse
-import datetime as _dt
-import re
-import shutil
-import shlex as _shlex
-
-# === tiny helpers ===
-
-def _read_file_relative_paths_under(dir_tree_root: Path)-> list[Path]:
- """Return POSIX-relative paths for all file-like entries under `dir_tree_root`.
- Notes:
- - Recurses all levels; includes hidden files.
- - Includes symlinks as entries; does NOT traverse into symlinked directories.
- - Returns paths relative to `dir_tree_root` (no leading slash).
- """
- read_file_relative_path_list: list[Path] = []
- for p in dir_tree_root.rglob("*"):
- try:
- if p.is_symlink() or p.is_file():
- read_file_relative_path_list.append(p.relative_to(dir_tree_root))
- except FileNotFoundError:
- continue
- return sorted(read_file_relative_path_list,key=lambda x: x.as_posix())
-
-def _is_ignored(rel: Path ,ignore_res: list[re.Pattern[str]])-> bool:
- s = rel.as_posix()
- return any(r.search(s) for r in ignore_res)
-
-def _mode_octal(path: Path)-> str:
- try:
- return f"0{(path.stat().st_mode & 0o777):03o}"
- except FileNotFoundError:
- return "0644"
-
-def _cmd_install(src: Path ,dst: Path)-> str:
- return f"install -m {_shlex.quote(_mode_octal(src))} -D {_shlex.quote(str(src))} {_shlex.quote(str(dst))}"
-
-def _cmd_symlink(target: str ,dst: Path)-> str:
- return f"ln -sfn -- {_shlex.quote(target)} {_shlex.quote(str(dst))}"
-
-def _iso_utc()-> str:
- return _dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
-
-def _maybe_backup(dst: Path ,backups: list[Path])-> None:
- """If dst exists (file or symlink), rename it to dst_<ISO> and record it."""
- try:
- if dst.exists() or dst.is_symlink():
- ts = _iso_utc()
- backup = dst.with_name(dst.name + f"_{ts}")
- dst.rename(backup) # preserves owner/mode/xattrs
- backups.append(backup)
- except FileNotFoundError:
- pass
-
-def _validate(stage: Path ,root: Path ,ignore_patterns: list[str] ,require_root: bool)-> list[str]:
- errs: list[str] = []
- if require_root and hasattr(os ,"geteuid") and os.geteuid() != 0:
- errs.append("must run as root (UID 0) to write into the destination root; use sudo")
- if not stage.exists():
- errs.append(f"--stage does not exist: {stage}")
- elif not stage.is_dir():
- errs.append(f"--stage is not a directory: {stage}")
- if not root.exists():
- errs.append(f"--root does not exist: {root}")
- elif not root.is_dir():
- errs.append(f"--root is not a directory: {root}")
- # regex syntax (report all)
- for p in ignore_patterns:
- try:
- re.compile(p)
- except re.error as e:
- errs.append(f"--ignore '{p}': invalid regex: {e}")
- try:
- if stage.resolve() == root.resolve():
- errs.append("--stage and --root resolve to the same path (refusing to copy onto self)")
- except Exception:
- pass
- if not sys.dont_write_bytecode:
- errs.append("internal: bytecode suppression did not engage (sys.dont_write_bytecode is False)")
- return errs
-
-# === core ===
-
-def cp_stage(
- stage: Path # directory tree of files to be read, then copied
- ,root: Path # directory tree of files to be written with root ownership, defaults to '/', hence the name
- ,ignore_patterns: list[re.Pattern[str]] # match against POSIX-style read *relative* paths
- ,assume_yes: bool=False # if yes don't stop to ask if the user really wants to do the copy
- ,displaced_target_manifest: Path|None=None # optional file that will list targets displaced (renamed) before writes
-)-> tuple[list[Path], list[Path], int, list[Path]]:
- """
- Given: stage (read dir tree), root (write dir tree), compiled ignore patterns for POSIX-style relpaths under stage,
- an optional non-interactive flag, and an optional path to a manifest that records displaced targets.
- Does: enumerates all file-like entries under stage (files and symlinks), filters by ignore patterns, shows a plan,
- optionally prompts, then for each selected read path ensures the write parent exists, renames any existing
- write target to '<name>_YYYYMMDDTHHMMSSZ' (UTC) in place (preserving owner/mode/xattrs), and finally writes:
- reproduces symlinks as symlinks or copies regular files via shutil.copy2() and aligns mode.
- Returns: (selected_read_rel_paths, ignored_read_rel_paths, writes_completed_count, displaced_targets_list)
- """
- stage = stage.resolve()
- root = root.resolve()
-
- read_file_rel_path_list: list[Path] = _read_file_relative_paths_under(stage)
-
- selected_read_rel_paths: list[Path] = []
- ignored_read_rel_paths: list[Path] = []
- for read_file_rel_path in read_file_rel_path_list:
- (ignored_read_rel_paths if _is_ignored(read_file_rel_path ,ignore_patterns) else selected_read_rel_paths).append(read_file_rel_path)
-
- print(f"Stage: {stage}")
- print(f"Root: {root}\n")
- print("Backups: if a destination exists, it is first renamed in place to "
- "'<name>_YYYYMMDDTHHMMSSZ' (UTC); owner/mode/xattrs are preserved.")
- if displaced_target_manifest:
- print(f"Backup manifest will be written to: {displaced_target_manifest}")
-
- print(f"=== Files to be read ({len(selected_read_rel_paths)}) ===")
- print("\n".join(p.as_posix() for p in selected_read_rel_paths) if selected_read_rel_paths else "(none)")
-
- print(f"\n=== Files ignored ({len(ignored_read_rel_paths)}) ===")
- print("\n".join(p.as_posix() for p in ignored_read_rel_paths) if ignored_read_rel_paths else "(none)")
-
- print("")
- if not assume_yes:
- ans = input("Proceed with copy? [y/N] ").strip().lower()
- if ans not in ("y","yes"):
- print("Aborted. Nothing copied.")
- return selected_read_rel_paths,ignored_read_rel_paths,0,[]
-
- displaced_targets: list[Path] = []
- writes_completed = 0
-
- for read_file_rel_path in selected_read_rel_paths:
- read_file_abs_path = stage / read_file_rel_path
- write_file_abs_path = root / read_file_rel_path
- write_file_abs_path.parent.mkdir(parents=True ,exist_ok=True)
-
- # backup if target exists (rename in place with UTC suffix)
- _maybe_backup(write_file_abs_path ,displaced_targets)
-
- if read_file_abs_path.is_symlink():
- link_target = os.readlink(read_file_abs_path)
- print(f"+ {_cmd_symlink(link_target ,write_file_abs_path)}")
- os.symlink(link_target ,write_file_abs_path)
- writes_completed += 1
- continue
-
- print(f"+ {_cmd_install(read_file_abs_path ,write_file_abs_path)} # (exec: copy2)")
- shutil.copy2(read_file_abs_path ,write_file_abs_path)
- try:
- os.chmod(write_file_abs_path ,read_file_abs_path.stat().st_mode & 0o777)
- except Exception:
- pass
- writes_completed += 1
-
- # optional manifest write (list displaced targets, one per line)
- if displaced_target_manifest:
- try:
- displaced_target_manifest.parent.mkdir(parents=True ,exist_ok=True)
- with open(displaced_target_manifest ,"w" ,encoding="utf-8") as fh:
- for p in displaced_targets:
- fh.write(str(p) + "\n")
- print(f"\nBackup manifest: {displaced_target_manifest} ({len(displaced_targets)} entr{'y' if len(displaced_targets)==1 else 'ies'})")
- except Exception as e:
- print(f"warn: could not write manifest {displaced_target_manifest}: {e}" ,file=sys.stderr)
-
- print(f"\nDone. Copied {writes_completed} file(s). Backups made: {len(displaced_targets)}.")
- if displaced_targets:
- print("Backups:")
- for p in displaced_targets:
- print(f" - {p}")
-
- return selected_read_rel_paths,ignored_read_rel_paths,writes_completed,displaced_targets
-
-
-def main(argv: list[str] | None=None)-> int:
- ap = argparse.ArgumentParser(
- prog="cp_stage.py"
- ,description="Copy staged files into a destination root, after showing the exact plan."
- )
- ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
- ap.add_argument("--root" , default="/" ,help="destination root (default: /)")
- ap.add_argument("--ignore",action="append",default=[]
- ,help="regex matched against POSIX-style relative paths under --stage; can be repeated")
- ap.add_argument("--displaced_target_manifest" ,default=None
- ,help="write list of backup files to this path (optional)")
-
- ap.add_argument("--yes" ,action="store_true" ,help="assume yes; do not prompt")
-
- args = ap.parse_args(argv)
-
- stage = Path(args.stage)
- root = Path(args.root)
- displaced_target_manifest = Path(args.displaced_target_manifest) if args.displaced_target_manifest else None
-
- # validate against raw patterns (strings)
- errs = _validate(stage ,root ,args.ignore ,require_root=True)
- if errs:
- print("error(s):" ,file=sys.stderr)
- for e in errs:
- print(f" - {e}" ,file=sys.stderr)
- return 2
-
- # compile patterns
- ignore_res: list[re.Pattern[str]] = [re.compile(p) for p in args.ignore]
-
- try:
- cp_stage(stage ,root ,ignore_res ,assume_yes=args.yes ,displaced_target_manifest=displaced_target_manifest)
- return 0
- except KeyboardInterrupt:
- print(
- "\nKeyboard interrupt — copy may be left in a partial state.\n"
- "Some targets might have been backed up or replaced; review outputs before retrying.",
- file=sys.stderr,
- )
- return 130
- except Exception as e:
- print(f"error: {e}" ,file=sys.stderr)
- return 2
-
-if __name__ == "__main__":
- sys.exit(main())
--- /dev/null
+stage_test_0/
\ No newline at end of file
+++ /dev/null
-[Unit]
-Description=Unbound DNS instance for %i (per-subu tunnel egress)
-After=network-online.target wg-quick@%i.service
-Requires=wg-quick@%i.service
-Wants=network-online.target
-
-[Service]
-Type=simple
-ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf
-User=unbound
-Group=unbound
-Restart=on-failure
-RestartSec=2s
-AmbientCapabilities=CAP_NET_BIND_SERVICE
-CapabilityBoundingSet=CAP_NET_BIND_SERVICE
-NoNewPrivileges=true
-
-[Install]
-WantedBy=multi-user.target
+++ /dev/null
-server:
- username: "unbound"
- chroot: ""
- directory: "/etc/unbound"
- do-daemonize: no
- interface: 127.0.0.1@5301
- hide-identity: yes
- hide-version: yes
- harden-glue: yes
- harden-dnssec-stripped: yes
- qname-minimisation: yes
- prefetch: yes
- outgoing-interface: 10.0.0.1
-
-forward-zone:
- name: "."
- forward-addr: 1.1.1.1
- forward-addr: 1.0.0.1
+++ /dev/null
-server:
- username: "unbound"
- chroot: ""
- directory: "/etc/unbound"
- do-daemonize: no
- interface: 127.0.0.1@5302
- hide-identity: yes
- hide-version: yes
- harden-glue: yes
- harden-dnssec-stripped: yes
- qname-minimisation: yes
- prefetch: yes
- outgoing-interface: 10.8.0.2
-
-forward-zone:
- name: "."
- forward-addr: 1.1.1.1
- forward-addr: 1.0.0.1
+++ /dev/null
-#!/usr/bin/env bash
-set -euo pipefail
-echo "== DNS status =="
-systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true
-echo
-echo "== nftables =="
-nft list table inet NAT-DNS-REDIRECT || true
-echo
-echo "== Unbound logs (last 50 lines each) =="
-journalctl -u unbound@US -n 50 --no-pager || true
-echo
-journalctl -u unbound@x6 -n 50 --no-pager || true
--- /dev/null
+#!/usr/bin/env -S python3 -B
+"""
+ls_stage.py — list staged files and their header-declared install metadata.
+
+Header line format (first line of each file):
+ <owner> <permissions> <write_file_name> <target_directory_path>
+
+- owner: username string (need not exist until install time)
+- permissions: four octal digits, e.g. 0644
+- write_file_name: '.' means use the read file's basename, else use the given POSIX filename
+- target_directory_path: POSIX directory path (usually absolute, e.g. /etc/unbound)
+
+Output formats:
+- list (default): "read_file_path: owner permissions write_file_name target_directory_path"
+- table: columns aligned for readability
+"""
+
+from __future__ import annotations
+
+# never write bytecode (root/sudo friendly)
+import sys ,os
+sys.dont_write_bytecode = True
+os.environ.setdefault("PYTHONDONTWRITEBYTECODE" ,"1")
+
+from dataclasses import dataclass
+from pathlib import Path
+import argparse
+import re
+
+# === Stage utilities (importable) ===
+
+def stage_read_file_paths(stage_root: Path)-> list[Path]:
+ """Given: stage_root directory.
+ Does: recursively enumerate regular files (follows symlinks to files), keep paths relative to stage_root.
+ Returns: list[Path] of POSIX-order sorted relative paths (no leading slash).
+ """
+ rels: list[Path] = []
+ for p in stage_root.rglob("*"):
+ try:
+ if p.is_file(): # follows symlink-to-file
+ rels.append(p.relative_to(stage_root))
+ except (FileNotFoundError ,RuntimeError):
+ # broken link or race; skip conservatively
+ continue
+ return sorted(rels ,key=lambda x: x.as_posix())
+
+@dataclass
+class StageRow:
+ read_rel: Path # e.g. Path("etc/unbound/unbound.conf.staged")
+ owner: str # token[0]
+ perm_octal_str: str # token[1], exactly as in header (validated ####)
+ perm_int: int # token[1] parsed as base-8
+ write_name: str # token[2] ('.' resolved to read_rel.name)
+ target_dir: Path # token[3] (Path)
+ header_raw: str # original header line (sans newline)
+
+ # convenience
+ def write_abs(self ,root: Path)-> Path:
+ return (root / self.target_dir.relative_to("/")) if self.target_dir.is_absolute() else (root / self.target_dir) / self.write_name
+
+# header parsing rules
+_PERM_RE = re.compile(r"^[0-7]{4}$")
+
+def parse_stage_header_line(header: str ,read_rel: Path)-> tuple[StageRow|None ,str|None]:
+ """Given: raw first line of a staged file and its stage-relative path.
+ Does: parse '<owner> <perm> <write_name> <target_dir>' with max 4 tokens (target_dir may contain spaces if quoted not required).
+ Returns: (StageRow, None) on success, or (None, error_message) on failure. Does NOT touch filesystem.
+ """
+ # strip BOM and trailing newline/spaces
+ h = header.lstrip("\ufeff").strip()
+ if not h:
+ return None ,f"empty header line in {read_rel}"
+ parts = h.split(maxsplit=3)
+ if len(parts) != 4:
+ return None ,f"malformed header in {read_rel}: expected 4 fields, got {len(parts)}"
+ owner ,perm_s ,write_name ,target_dir_s = parts
+
+ if not _PERM_RE.fullmatch(perm_s):
+ return None ,f"invalid permissions '{perm_s}' in {read_rel}: must be four octal digits"
+
+ # resolve '.' → basename
+ resolved_write_name = read_rel.name if write_name == "." else write_name
+
+ # MVP guard: write_name should be a single filename (no '/')
+ if "/" in resolved_write_name:
+ return None ,f"write_file_name must not contain '/': got '{resolved_write_name}' in {read_rel}"
+
+ # target dir may be absolute (recommended) or relative (we treat relative as under the install root)
+ target_dir = Path(target_dir_s)
+
+ try:
+ row = StageRow(
+ read_rel = read_rel
+ ,owner = owner
+ ,perm_octal_str = perm_s
+ ,perm_int = int(perm_s ,8)
+ ,write_name = resolved_write_name
+ ,target_dir = target_dir
+ ,header_raw = h
+ )
+ return row ,None
+ except Exception as e:
+ return None ,f"internal parse error in {read_rel}: {e}"
+
+def read_first_line(p: Path)-> str:
+ """Return the first line (sans newline). UTF-8 with BOM tolerant."""
+ with open(p ,"r" ,encoding="utf-8" ,errors="replace") as fh:
+ line = fh.readline()
+ return line.rstrip("\n\r")
+
+def scan_stage(stage_root: Path)-> tuple[list[StageRow] ,list[str]]:
+ """Given: stage_root.
+ Does: enumerate files, parse each header line, collect rows and errors.
+ Returns: (rows, errors)
+ """
+ rows: list[StageRow] = []
+ errs: list[str] = []
+ for rel in stage_read_file_paths(stage_root):
+ abs_path = stage_root / rel
+ try:
+ header = read_first_line(abs_path)
+ except Exception as e:
+ errs.append(f"read error in {rel}: {e}")
+ continue
+ row ,err = parse_stage_header_line(header ,rel)
+ if err:
+ errs.append(err)
+ else:
+ rows.append(row) # type: ignore[arg-type]
+ return rows ,errs
+
+# === Printers ===
+
+def print_list(rows: list[StageRow])-> None:
+ """Print: 'read_file_path: owner permissions write_file_name target_directory_path' per line."""
+ for r in rows:
+ print(f"{r.read_rel.as_posix()}: {r.owner} {r.perm_octal_str} {r.write_name} {r.target_dir}")
+
+def print_table(rows: list[StageRow])-> None:
+ """Aligned table printer (no headers, just data in columns)."""
+ if not rows:
+ return
+ a = [r.read_rel.as_posix() for r in rows]
+ b = [r.owner for r in rows]
+ c = [r.perm_octal_str for r in rows]
+ d = [r.write_name for r in rows]
+ e = [str(r.target_dir) for r in rows]
+ wa = max(len(s) for s in a)
+ wb = max(len(s) for s in b)
+ wc = max(len(s) for s in c)
+ wd = max(len(s) for s in d)
+ # e (target_dir) left ragged
+ for sa ,sb ,sc ,sd ,se in zip(a ,b ,c ,d ,e):
+ print(f"{sa:<{wa}} {sb:<{wb}} {sc:<{wc}} {sd:<{wd}} {se}")
+
+# === Orchestrator ===
+
+def ls_stage(stage_root: Path ,fmt: str="list")-> int:
+ """Given: stage_root and output format ('list'|'table').
+ Does: scan and parse staged files, print in the requested format; report syntax errors to stderr.
+ Returns: 0 on success; 1 if any syntax errors were encountered.
+ """
+ rows ,errs = scan_stage(stage_root)
+ if fmt == "table":
+ print_table(rows)
+ else:
+ print_list(rows)
+ if errs:
+ print("\nerror(s):" ,file=sys.stderr)
+ for e in errs:
+ print(f" - {e}" ,file=sys.stderr)
+ return 1
+ return 0
+
+# === CLI ===
+
+def main(argv: list[str] | None=None)-> int:
+ ap = argparse.ArgumentParser(
+ prog="ls_stage.py"
+ ,description="List staged files and their header-declared install metadata."
+ )
+ ap.add_argument("--stage" ,default="stage",help="stage directory (default: ./stage)")
+ ap.add_argument("--format" ,choices=["list" ,"table"] ,default="list"
+ ,help="output format (default: list)")
+ args = ap.parse_args(argv)
+ stage_root = Path(args.stage)
+ if not stage_root.exists() or not stage_root.is_dir():
+ print(f"error: stage directory not found or not a directory: {stage_root}" ,file=sys.stderr)
+ return 2
+ return ls_stage(stage_root ,fmt=args.format)
+
+if __name__ == "__main__":
+ sys.exit(main())
--- /dev/null
+table inet NO-IPV6 {
+ chain input {
+ type filter hook input priority raw; policy accept;
+ meta nfproto ipv6 counter comment "drop all IPv6 inbound" drop
+ }
+
+ chain output {
+ type filter hook output priority raw; policy accept;
+ meta nfproto ipv6 counter comment "drop all IPv6 outbound" drop
+ }
+
+ chain forward {
+ type filter hook forward priority raw; policy accept;
+ meta nfproto ipv6 counter comment "drop all IPv6 forward" drop
+ }
+}
--- /dev/null
+table inet SUBU-DNS-REDIRECT {
+ chain output {
+ type nat hook output priority -100; policy accept;
+
+ # Redirect DNS for the subu UIDs to local Unbound listeners
+ meta skuid 2017 udp dport 53 redirect to :5301
+ meta skuid 2018 udp dport 53 redirect to :5302
+ meta skuid 2017 tcp dport 53 redirect to :5301
+ meta skuid 2018 tcp dport 53 redirect to :5302
+ }
+}
+
+table inet SUBU-PORT-EGRESS {
+ chain output {
+ type filter hook output priority 0; policy accept;
+
+ # Always allow loopback on egress
+ oifname "lo" accept
+
+ # No IPv6 for subu (until you reintroduce v6)
+ meta skuid {2017,2018} meta nfproto ipv6 counter comment "no IPv6 for subu" drop
+
+ ##### x6 (UID 2018)
+ # Block some exfil channels regardless of iface
+ meta skuid 2018 tcp dport {25,465,587} counter comment "block SMTP/Submission" drop
+ meta skuid 2018 udp dport {3478,5349,19302-19309} counter comment "block STUN/TURN" drop
+ meta skuid 2018 tcp dport 853 counter comment "block DoT (TCP/853)" drop
+
+ # (Optional) allow ICMP echo out via x6
+ meta skuid 2018 oifname "x6" ip protocol icmp icmp type echo-request accept
+
+ # Enforce interface binding
+ meta skuid 2018 oifname "x6" accept
+ meta skuid 2018 oifname != "x6" counter comment "x6 must use wg x6" drop
+
+ ##### US (UID 2017)
+ meta skuid 2017 tcp dport {25,465,587} counter drop comment "block SMTP/Submission"
+ meta skuid 2017 udp dport {3478,5349,19302-19309} counter drop comment "block STUN/TURN"
+ meta skuid 2017 tcp dport 853 counter drop comment "block DoT (TCP/853)"
+
+ # (Optional) ICMP via US
+ meta skuid 2017 oifname "US" ip protocol icmp icmp type echo-request accept
+
+ meta skuid 2017 oifname "US" accept
+ meta skuid 2017 oifname != "US" counter comment "US must use wg US" drop
+ }
+}
--- /dev/null
+[Unit]
+Description=Unbound DNS instance for %i (per-subu tunnel egress)
+After=network-online.target wg-quick@%i.service
+Requires=wg-quick@%i.service
+Wants=network-online.target
+
+[Service]
+Type=simple
+ExecStart=/usr/sbin/unbound -d -p -c /etc/unbound/unbound-%i.conf
+User=unbound
+Group=unbound
+Restart=on-failure
+RestartSec=2s
+AmbientCapabilities=CAP_NET_BIND_SERVICE
+CapabilityBoundingSet=CAP_NET_BIND_SERVICE
+NoNewPrivileges=true
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+server:
+ username: "unbound"
+ chroot: ""
+ directory: "/etc/unbound"
+ do-daemonize: no
+ interface: 127.0.0.1@5301
+ hide-identity: yes
+ hide-version: yes
+ harden-glue: yes
+ harden-dnssec-stripped: yes
+ qname-minimisation: yes
+ prefetch: yes
+ outgoing-interface: 10.0.0.1
+
+forward-zone:
+ name: "."
+ forward-addr: 1.1.1.1
+ forward-addr: 1.0.0.1
--- /dev/null
+server:
+ username: "unbound"
+ chroot: ""
+ directory: "/etc/unbound"
+ do-daemonize: no
+ interface: 127.0.0.1@5302
+ hide-identity: yes
+ hide-version: yes
+ harden-glue: yes
+ harden-dnssec-stripped: yes
+ qname-minimisation: yes
+ prefetch: yes
+ outgoing-interface: 10.8.0.2
+
+forward-zone:
+ name: "."
+ forward-addr: 1.1.1.1
+ forward-addr: 1.0.0.1
--- /dev/null
+#!/usr/bin/env bash
+set -euo pipefail
+echo "== DNS status =="
+systemctl --no-pager --full status DNS-redirect unbound@US unbound@x6 || true
+echo
+echo "== nftables =="
+nft list table inet NAT-DNS-REDIRECT || true
+echo
+echo "== Unbound logs (last 50 lines each) =="
+journalctl -u unbound@US -n 50 --no-pager || true
+echo
+journalctl -u unbound@x6 -n 50 --no-pager || true
--- /dev/null
+Thomas-developer 0x444 . stage_test_0_out
\ No newline at end of file
--- /dev/null
+Thomas-developer 0640 . stage_test_0_out
\ No newline at end of file
--- /dev/null
+Thomas-developer 0444 . stage_test_0_out
\ No newline at end of file
--- /dev/null
+Thomas-developer 0444 . stage_test_0_out
\ No newline at end of file