"""
# init
ap = subparsers.add_parser("init")
- ap.add_argument("token", nargs ="?")
# make: path[0] is masu, remaining elements are the subu chain
ap = subparsers.add_parser("make")
# lo
ap = subparsers.add_parser("lo")
- ap.add_argument("state", choices =["up","down"])
+< ap.add_argument("state", choices =["up","down"])
ap.add_argument("subu_id")
def register_wireguard_commands(subparsers):
try:
if ns.verb == "init":
- return dispatch.init(ns.token)
+ return dispatch.init()
if ns.verb == "make":
# ns.path is ['masu', 'subu', ...]
from infrastructure.options_store import set_option
-def init(token =None):
+def init():
"""
Handle: subu init <TOKEN>
from infrastructure.unix import (
ensure_unix_user,
+ ensure_user_in_group,
remove_unix_user_and_group,
user_exists,
)
Build the Unix username for a subu.
Examples:
- masu = "Thomas", path = ["S0"] -> "Thomas_S0"
- masu = "Thomas", path = ["S0","S1"] -> "Thomas_S0_S1"
+ masu = "Thomas", path = ["S0"] -> "Thomas_S0"
+ masu = "Thomas", path = ["S0","S1"] -> "Thomas_S0_S1"
The path is:
masu subu subu ...
Return the Unix username of the parent subu, or None if this is top-level.
Examples:
- masu="Thomas", path=["S0"] -> None (parent is just the masu)
- masu="Thomas", path=["S0","S1"] -> "Thomas_S0"
+ masu="Thomas", path=["S0"] -> None (parent is just the masu)
+ masu="Thomas", path=["S0","S1"] -> "Thomas_S0"
"""
if len(path_components) <= 1:
return None
return subu_username(masu, parent_path)
+def _ancestor_group_names(masu: str, path_components: list[str]) -> list[str]:
+ """
+ Compute ancestor groups that a subu must join for directory traversal.
+
+ For path:
+ [masu, s1, s2, ..., sk]
+
+ we return:
+ [masu,
+ masu_s1,
+ masu_s1_s2,
+ ...,
+ masu_s1_..._s{k-1}]
+
+ The last element (full username) is NOT included, because that is
+ the subu's own primary group.
+ """
+ groups: list[str] = []
+ # masu group (allows traversal of /home/masu and /home/masu/subu_data)
+ groups.append(_validate_token("masu", masu))
+
+ # For deeper subu, add each ancestor subu's group
+ for depth in range(1, len(path_components)):
+ prefix = path_components[:depth]
+ groups.append(subu_username(masu, prefix))
+
+ return groups
+
+
def make_subu(masu: str, path_components: list[str]) -> str:
"""
Make the Unix user and group for this subu.
- tokens must not contain '_'
- parent must exist:
* for first-level subu: Unix user 'masu' must exist
- * for deeper subu: parent subu Unix user must exist
+ * for deeper subu: parent subu unix user must exist
Returns:
Unix username, for example 'Thomas_S0' or 'Thomas_S0_S1'.
raise SystemExit("subu: make requires at least one subu component")
# Normalize and validate tokens (this will raise SystemExit on error).
+ # subu_username will call _validate_token internally.
username = subu_username(masu, path_components)
# Enforce parent existence
# For now, group and user share the same name.
ensure_unix_user(username, username)
+
+ # Add this subu to the ancestor groups so that directory traversal works:
+ # /home/masu
+ # /home/masu/subu_data
+ # /home/masu/subu_data/<parent>/subu_data/...
+ ancestor_groups = _ancestor_group_names(masu, path_components)
+ for gname in ancestor_groups:
+ ensure_user_in_group(username, gname)
+
return username
run(["useradd", "-m", "-g", primary_group, "-s", "/bin/bash", name])
+def ensure_user_in_group(user: str, group: str):
+ """
+ Ensure 'user' is a member of supplementary group 'group'.
+
+ - Raises if either user or group does not exist.
+ - No-op if the membership is already present.
+ """
+ if not user_exists(user):
+ raise RuntimeError(f"ensure_user_in_group: user '{user}' does not exist")
+ if not group_exists(group):
+ raise RuntimeError(f"ensure_user_in_group: group '{group}' does not exist")
+
+ g = grp.getgrnam(group)
+ if user in g.gr_mem:
+ return
+
+ # usermod -a -G adds the group, preserving existing ones.
+ run(["usermod", "-a", "-G", group, user])
+
+
def remove_unix_user_and_group(name: str):
"""
Remove a Unix user and group that match this name, if they exist.
{program_name} example # example workflow
{program_name} version # print version
- {program_name} init <TOKEN>
+ {program_name} init
{program_name} make <masu> <subu> [_<subu>]*
{program_name} list
{program_name} info <Subu_ID> | {program_name} information <Subu_ID>
return f"""Subu manager (v{current_version()})
1) Init
- {program_name} init <TOKEN>
- Makes ./subu.db. Refuses to run if db exists.
+ {program_name} init
+ Gives an error if the db file already exits, otherwise creates it. The db file
+ path is set in env.py.
2) Subu
{program_name} make <masu> <subu> [_<subu>]*
def example(self):
program_name = self.program_name
return f"""# 0) Initialise the subu database (once per directory)
-{program_name} init dzkq7b
+{program_name} init
# 1) Make Subu
{program_name} make Thomas US
#!/usr/bin/env -S python3 -B
# -*- mode: python; coding: utf-8; python-indent-offset: 2; indent-tabs-mode: nil -*-
-import os, sys, shutil, stat, pwd, grp, glob, tempfile
+import os, sys, shutil, stat, pwd, grp, glob, tempfile, datetime
HELP = """usage: release {write|clean|ls|help|dry write} [DIR]
write [DIR] Write released files.
clean [DIR] Remove the contents of the release directories.
- For DIR=manager: clean $REPO_HOME/release/manager.
- For other DIR values: clean only that subdirectory under the release root.
- ls List $REPO_HOME/release as an indented tree: PERMS OWNER NAME.
+ ls List $REPO_HOME/release as an indented tree: PERMS OWNER DATE NAME.
help Show this message.
dry write [DIR]
Preview what write would do without modifying the filesystem.
ENV_MUST_BE = "developer/tool/env"
DEFAULT_DIR_MODE = 0o700 # 077-congruent dirs
-
def exit_with_status(msg, code=1):
print(f"release: {msg}", file=sys.stderr)
sys.exit(code)
+def format_mtime_utc(ts: float) -> str:
+ dt = datetime.datetime.fromtimestamp(ts, datetime.timezone.utc)
+ return dt.strftime("%Y-%m-%d_%H:%M:%S_Z")
def assert_env():
env = os.environ.get("ENV", "")
os.makedirs(path, exist_ok=True)
ensure_mode(path, mode)
-
def filemode(m):
try:
return stat.filemode(m)
except Exception:
return oct(m & 0o777)
-
def owner_group(st):
try:
- return f"{pwd.getpwuid(st.st_uid).pw_name}:{grp.getgrgid(st.st_gid).gr_name}"
+ user = pwd.getpwuid(st.st_uid).pw_name
+ group = grp.getgrgid(st.st_gid).gr_name
except Exception:
- return f"{st.st_uid}:{st.st_gid}"
+ user = str(st.st_uid)
+ group = str(st.st_gid)
+ return user if user == group else f"{user}:{group}"
+# ---------- LS (two-pass owner:group width) ----------
# ---------- LS (two-pass owner:group width) ----------
def list_tree(root):
if not os.path.isdir(root):
files.sort(key=lambda e: e.name)
if is_root:
+ # dotfiles at root first
for f in (e for e in files if e.name.startswith(".")):
st = os.lstat(f.path)
- entries.append((False, depth, filemode(st.st_mode), owner_group(st), f.name))
+ perms = filemode(st.st_mode)
+ og = owner_group(st)
+ mtime = format_mtime_utc(st.st_mtime)
+ entries.append((False, depth, perms, og, mtime, f.name))
+ # dirs
for d in dirs:
st = os.lstat(d.path)
- entries.append((True, depth, filemode(st.st_mode), owner_group(st), d.name + "/"))
+ perms = filemode(st.st_mode)
+ og = owner_group(st)
+ mtime = format_mtime_utc(st.st_mtime)
+ entries.append((True, depth, perms, og, mtime, d.name + "/"))
gather(d.path, depth + 1, False)
+ # other files
for f in (e for e in files if not e.name.startswith(".")):
st = os.lstat(f.path)
- entries.append((False, depth, filemode(st.st_mode), owner_group(st), f.name))
+ perms = filemode(st.st_mode)
+ og = owner_group(st)
+ mtime = format_mtime_utc(st.st_mtime)
+ entries.append((False, depth, perms, og, mtime, f.name))
else:
+ # normal subdirs: dirs then files
for d in dirs:
st = os.lstat(d.path)
- entries.append((True, depth, filemode(st.st_mode), owner_group(st), d.name + "/"))
+ perms = filemode(st.st_mode)
+ og = owner_group(st)
+ mtime = format_mtime_utc(st.st_mtime)
+ entries.append((True, depth, perms, og, mtime, d.name + "/"))
gather(d.path, depth + 1, False)
for f in files:
st = os.lstat(f.path)
- entries.append((False, depth, filemode(st.st_mode), owner_group(st), f.name))
+ perms = filemode(st.st_mode)
+ og = owner_group(st)
+ mtime = format_mtime_utc(st.st_mtime)
+ entries.append((False, depth, perms, og, mtime, f.name))
gather(root, depth=1, is_root=True)
ogw = 0
- for (_isdir, _depth, _perms, ownergrp, _name) in entries:
+ for (_isdir, _depth, _perms, ownergrp, _mtime, _name) in entries:
if len(ownergrp) > ogw:
ogw = len(ownergrp)
print("release/")
- for (_isdir, depth, perms, ownergrp, name) in entries:
+ for (_isdir, depth, perms, ownergrp, mtime, name) in entries:
indent = " " * depth
- print(f"{perms} {ownergrp:<{ogw}} {indent}{name}")
+ print(f"{perms} {ownergrp:<{ogw}} {mtime} {indent}{name}")
# ---------- end LS ----------
-
def iter_src_files(topdir, src_root):
"""
Yield (src_abs, rel) pairs.
from infrastructure.unix import (
ensure_unix_user,
+ ensure_user_in_group,
remove_unix_user_and_group,
user_exists,
)
-def _validate_token(label: str, token: str):
+def _validate_token(label: str, token: str) -> str:
"""
Validate a single path token (masu or subu).
Build the Unix username for a subu.
Examples:
- masu = "Thomas", path = ["S0"] -> "Thomas_S0"
- masu = "Thomas", path = ["S0","S1"] -> "Thomas_S0_S1"
+ masu = "Thomas", path = ["S0"] -> "Thomas_S0"
+ masu = "Thomas", path = ["S0","S1"] -> "Thomas_S0_S1"
The path is:
masu subu subu ...
"""
masu_s = _validate_token("masu", masu).replace(" ", "_")
- subu_parts = []
+ subu_parts: list[str] = []
for s in path_components:
subu_parts.append(_validate_token("subu", s).replace(" ", "_"))
parts = [masu_s] + subu_parts
Return the Unix username of the parent subu, or None if this is top-level.
Examples:
- masu="Thomas", path=["S0"] -> None (parent is just the masu)
- masu="Thomas", path=["S0","S1"] -> "Thomas_S0"
+ masu="Thomas", path=["S0"] -> None (parent is just the masu)
+ masu="Thomas", path=["S0","S1"] -> "Thomas_S0"
"""
if len(path_components) <= 1:
return None
return subu_username(masu, parent_path)
+def _ancestor_group_names(masu: str, path_components: list[str]) -> list[str]:
+ """
+ Compute ancestor groups that a subu must join for directory traversal.
+
+ For path:
+ [masu, s1, s2, ..., sk]
+
+ we return:
+ [masu,
+ masu_s1,
+ masu_s1_s2,
+ ...,
+ masu_s1_..._s{k-1}]
+
+ The last element (full username) is NOT included, because that is
+ the subu's own primary group.
+ """
+ groups: list[str] = []
+ # masu group (allows traversal of /home/masu and /home/masu/subu_data)
+ groups.append(_validate_token("masu", masu))
+
+ # For deeper subu, add each ancestor subu's group
+ for depth in range(1, len(path_components)):
+ prefix = path_components[:depth]
+ groups.append(subu_username(masu, prefix))
+
+ return groups
+
+
def make_subu(masu: str, path_components: list[str]) -> str:
"""
Make the Unix user and group for this subu.
masu_name = _validate_token("masu", masu)
if not user_exists(masu_name):
raise SystemExit(
- f"subu: cannot make '{username}': masu Unix user '{masu_name}' does not exist"
+ f"subu: cannot make '{username}': "
+ f"masu Unix user '{masu_name}' does not exist"
)
else:
# Deeper subu: require parent subu Unix user to exist
if not user_exists(parent_uname):
raise SystemExit(
- f"subu: cannot make '{username}': parent subu unix user '{parent_uname}' does not exist"
+ f"subu: cannot make '{username}': "
+ f"parent subu unix user '{parent_uname}' does not exist"
)
# For now, group and user share the same name.
ensure_unix_user(username, username)
+
+ # Add this subu to the ancestor groups so that directory traversal works:
+ # /home/masu
+ # /home/masu/subu_data
+ # /home/masu/subu_data/<parent>/subu_data/...
+ ancestor_groups = _ancestor_group_names(masu, path_components)
+ for gname in ancestor_groups:
+ ensure_user_in_group(username, gname)
+
return username
run(["useradd", "-m", "-g", primary_group, "-s", "/bin/bash", name])
+def ensure_user_in_group(user: str, group: str):
+ """
+ Ensure 'user' is a member of supplementary group 'group'.
+
+ - Raises if either user or group does not exist.
+ - No-op if the membership is already present.
+ """
+ if not user_exists(user):
+ raise RuntimeError(f"ensure_user_in_group: user '{user}' does not exist")
+ if not group_exists(group):
+ raise RuntimeError(f"ensure_user_in_group: group '{group}' does not exist")
+
+ g = grp.getgrnam(group)
+ if user in g.gr_mem:
+ return
+
+ # usermod -a -G adds the group, preserving existing ones.
+ run(["usermod", "-a", "-G", group, user])
+
+
def remove_unix_user_and_group(name: str):
"""
Remove a Unix user and group that match this name, if they exist.
1) Init
{program_name} init <TOKEN>
- Makes ./subu.db. Refuses to run if db exists.
+ Gives an error if the db file already exits, otherwise creates it. The db file
+ path is set in env.py.
2) Subu
{program_name} make <masu> <subu> [_<subu>]*